Как организовать цепочку асинхронных вызовов в стиле RxJava2

У меня есть следующий код, завернутый в класс AsyncTask

        AsyncTask<Void, Void, Void> task = new AsyncTask<Void, Void, Void>() {
        @Override
        protected Void doInBackground(Void... params) {
            try {
                if (NetworkUtils.isNetworkConnected(mContext))
                    mIndicatorTable.sync().blockingGet();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            } finally {
                try {
                    final MobileServiceList<IndicatorModel> results = mIndicatorTable.table().read(null).blockingGet();
                    if (isViewAttached()) {
                        getMvpView().refreshIndicatorList(results);
                        getMvpView().hideLoading();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            return null;
        };

mIndicatorTable.sync() и mIndicatorTable.table().read(null) оба возвращают Single<Boolean>. mIndicatorTable.sync() - синхронизация локального хранилища с удаленным, если сеть недоступна, то мы просто читаем из локального хранилища, используя mIndicatorTable.table().read(null), когда сеть готова - мы выполняем синхронизацию, а затем читаем из локального хранилища (нам все равно, была ли синхронизация отменена или прервана). И в конце концов мы должны вызвать View, чтобы обновить RecycleView. Как это можно реализовать с помощью RxJava2?

#ОБНОВЛЕНИЕ

Рабочая версия

getDataManager().syncAll()
     .onErrorResumeNext(Single.just(false))
     .flatMap(list -> toSingle(mIndicatorTable.getTable().read(null)))
     .subscribeOn(Schedulers.newThread())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribe(results -> {
        if (isViewAttached()) {
            getMvpView().refreshIndicatorList(results);
            getMvpView().hideLoading();
        }
     })

person Alexander Hramov    schedule 21.02.2017    source источник


Ответы (1)


rx-java упрощает цепочку асинхронных вызовов, просто вызывая .flatMap().

mIndicatorTable.sync()
        .flatMap(new Function<Boolean, SingleSource<MobileServiceList<IndicatorModel>>>() {
            @Override
            public SingleSource<MobileServiceList<IndicatorModel>> apply(Boolean aBoolean) throws Exception {
                return mIndicatorTable.table().read(null);
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(...)

Но вы хотите продолжить вызов sync() в другом потоке, а доступ к RecycleView можно получить только из основного потока. Вот тут и пригодится rx-android. .subscribeOn(Schedulers.io()) создайте экземпляр потока в другом потоке и .observeOn(AndroidSchedulers.mainThread()) убедитесь, что операторы ниже этой строки (здесь .subscribe(...)) выполняются в основном потоке.

person Geoffrey Marizy    schedule 21.02.2017
comment
Спасибо за решение. Но в этом случае .sync потерпел неудачу, а .read никогда не выполнялся, я прав? В моем случае .read должно выполняться независимо от результата .sync (неудачно или успешно) - person Alexander Hramov; 21.02.2017
comment
В этом случае вы можете поймать после синхронизации: reactivex.io/documentation/operators/catch.html - person Geoffrey Marizy; 21.02.2017
comment
Поскольку sync является Single<Boolean>, также возможно, что сбой представлен значением False, а не ошибкой. - person Geoffrey Marizy; 21.02.2017
comment
Я использовал onErrorResumeNext для возобновления потока выполнения. - person Alexander Hramov; 23.02.2017