Загрузка сущностей из базы данных и серверного API с помощью RxJava

В вашем приложении могут быть репозитории для обработки запросов API базы данных и сервера, но возникали ли у вас когда-нибудь проблемы с описанием этого процесса с помощью RxJava 2? Я использовал «описать», потому что RxJava, как инструмент функционального программирования, предназначенный для того, чтобы быть необязательным, что немного усложняет управление, но после того, как вы достигнете решения, вы знаете, что ваше решение более читабельно, элегантно и переносимо на другие платформы, потому что вы написали его на универсальном языке математики (теория категорий, морфизм, функтор, монада и т. д.), и вы можете видеть всю красоту этого.

Старый путь

Таким образом, Mert imşek написал сообщение здесь. Основная функция его решения такова:

Но в его сценарии Observable становится завершенным после ответа сети, что не является тем, что мы хотим, поскольку база данных Room предоставляет Flowables, которые не завершены, для доставки дополнительных обновлений данных. Здесь мы хотим поддержать это преимущество, заключающееся в том, что канал остается открытым и доставляет обновленные данные.

Сценарий

Мы хотим загрузить данные из базы данных, затем после первого ответа базы данных нам нужно запросить API и, в конце концов, нам нужно оставить канал открытым для дополнительных возможных обновлений данных. Для этого нам просто нужно избегать вызова onComplete метода наблюдателей. Имея это в виду и зная, что поток из базы данных не завершается, мы всегда будем отправлять обновления из базы данных, в том числе когда мы получаем из API. Чтобы достичь этого, когда мы получим ответ API, вместо того, чтобы напрямую распространять это обновление, мы сохраним его и позволим базе данных отвечать за излучение. Кроме того, после ответа базы данных нам нужно знать, что идет работа, поэтому пользовательский интерфейс показывает загрузку до тех пор, пока не вернется ответ API.

Наш путь

Прежде всего, мы предполагаем, что каждый поток равен Flowable, а не Observable. Это правильное предположение, потому что база данных Room не может выдавать Observable, плюс, таким образом, у нас есть преимущество обработки противодавления с помощью Flowable. Во-вторых, для этого сценария нам нужно определить класс ResultState следующим образом.

Здесь мы предполагаем, что состояние Loading испускается возвращающим Flowable, когда приходит ответ базы данных, тогда пользовательский интерфейс знает, что нужно продолжать показывать загрузку. После того, как ответ серверного API возвращается, мы сохраняем его в базе данных, и база данных выдает новое обновление, которое мы должны перевести в состояние Success, чтобы пользовательский интерфейс знал, что загрузка завершена. После этого, если каким-либо неизвестным процессом данные в базе данных обновляются, возвращающийся Flowable излучает Success состояние с обновленными данными. Пока мы выполняем эти процессы, если произошла какая-либо ошибка, Flowable выдает Error состояние с последними доступными данными, которые были кэшированы. Мы храним последние данные в Error состоянии, потому что LiveData может кэшировать только одно значение, которое излучается через него, а пользовательскому интерфейсу нужны все данные для восстановления своего состояния.

Базовый репозиторий

В чистой архитектуре нашего приложения каждый репозиторий на уровне данных отвечает за одну сущность на уровне домена, как показано ниже.

В этом BaseRepository классе вы можете видеть, что у нас есть метод с именем perform, который отвечает за загрузку данных из базы данных и сети. Но вы можете видеть в аргументах метода perform, что база данных выдает нам список сущностей, что на первый взгляд кажется проблемой, но после прочтения поста Комната и тайна утраченных событий вы поймете, что для чтобы избежать потери событий, нам нужно использовать список сущностей только для одной сущности (совет: вы можете думать об этом списке как о Optional<T>, который, к сожалению, поддерживается только Android API 24, поэтому мы не можем использовать его в нашем базовый класс.)

Общее поведение этой функции тестируется здесь, и мы скоро к нему вернемся.

Метод perform начинается со строки val cachedData = AtomicReference<D?>(). Как вы знаете, здесь у нас есть Flowable и Single в аргументах этого метода, поэтому мы должны учитывать многопоточные потоки, а затем мы используем AtomicReference для его поддержки. Также переменная cachedData отвечает за кеширование выдаваемого значения через базу данных Flowable.

Следующая строка

val processor = PublishProcessor.create<ResultState<D>>()

который мы будем использовать как Flowable без конца. Как мы описали сценарий, который мы хотим поддерживать, нам нужен Flowable, который никогда не вызывает onCompletemethod своих наблюдателей, поэтому мы будем использовать processor для этой цели.

val flowable = databaseFlowable.doOnNext {
    cachedData.set(it.firstOrNull())
}.share()

В приведенной выше строке мы кэшируем генерируемые значения базы данных в cachedData, затем мы создаем ConnectableFlowable с помощью оператора share, поэтому это гарантирует, что в двух вызовах Flowable.merge лямбда-функция doOnNext будет вызываться только один раз. Чтобы лучше понять оператор share, вы можете просмотреть соответствующий документ здесь.

Если мы проигнорируем flatMaps и onErrorResumeNextmethod, то возвращаемое значение perform будет

return Flowable.merge(flowable.take(1), flowable.skip(1))

Эта строка просто делит пар на первое испускаемое значение и остальные, а затем, обработав их по-другому, снова объединяет их в один поток. Вот документация операторов слияния, взять и пропустить.

.onErrorResumeNext{ 
    concatJustFlowable(ResultState.Error(...), processor)
}

Вышеупомянутая строка преобразует все ошибки в этом потоке в ResultState.Error, поэтому мы уверены, что onError наблюдателей возвращающегося Flowable никогда не будут вызваны. Это важно для нас, потому что после вызова onError поток закроется, что плохо, если мы хотим отправлять обновления данных в пользовательский интерфейс. Здесь мы используем комбинацию .onErrorResumeNext{ concatJustFlowable(..., processor)}, чтобы точно избежать завершения возврата Flowable.

Если база данных выдает свое первое значение, то выполняется следующий код.

if (it.isEmpty()) {
    handleNetSingle(netSingle, persist, cachedData)                                           } else {
    concatJustFlowable(
        ResultState.Loading(it.first()),
        handleNetSingle(netSingle, persist, cachedData)
    )
}

Эта часть кода просто проверяет, нет ли в базе данных значения, а затем просто запрашивает серверный API, не выдавая состояние Loading. В противном случае, если база данных нашла значение, то выдает Loading state и запрашивает у API сервера обновленное значение.

Однако для сброса потока мы просто отправляем Success состояние с помощью следующего кода, если в базе данных действительно найдено значение обновления, поэтому список не пуст.

if (it.isNotEmpty()) {
    concatJustFlowable(ResultState.Success(it.first()), processor)
} else {
    processor
}

Также у нас есть метод с именем concatJustFlowable, который отвечает за выдачу значения, а затем позволяет Flowable испускать его значения. Другой метод - handleNetSingle, который отвечает за создание Flowable из сети Single и сохранение данных. В случае ошибки этот метод должен заключить ошибку в

.onErrorReturn {
    ResultState.Error(it, cachedData)
}

Здесь нам нужно обрабатывать ошибки сети и API, чтобы избежать обработки ошибок с помощью метода Flowable.merge, который нарушает его функциональность.

Тестирование

Как упоминалось ранее, методы тестирования находятся здесь, поэтому давайте кратко объясним, как они работают. В большинстве этих тестов у нас есть процессор базы данных, который имитирует поведение базы данных, как это

val database = PublishProcessor.create<List<EntityTest>>()

а также у нас есть возможность имитировать серверный API, как это

val net = PublishSubject.create<EntityTest>()

Кроме того, в большинстве этих тестов у нас есть persist функция, подобная этой

val persist = { d: EntityTest -> database.onNext(listOf(d)) }

который отвечает за отправку сохраненного объекта в базу данных Flowable, а затем после подписки на возвращаемый Flowable, как это

val testSubscriber = usecaseFlowable.test()

мы начинаем генерировать значения базы данных и сети и тестируем, что происходит. В большинстве случаев мы хотим утверждать, что Flowable не соответствует или не распространяет Throwable.

Наконец, я надеюсь, что с этим сценарием вы и пользователи вашего приложения почувствуете себя лучше и получите лучший опыт. В конце хочу поблагодарить свою команду, в том числе Реза Бигдели, Dr. jacky и Elmira farahani за их стремление достичь такого результата.