подписка на scheduler.io () не работает

Моя модель просмотра вызывает методы репозитория для получения некоторых данных из базы данных комнаты, а также из сети.

     class Repository @Inject constructor(
        private val remoteDatasource: IRemoteSource,
        private val localDatasource: ILocalSource,
        private val subscriberScheduler: Scheduler,
        private val observerScheduler: Scheduler
    ) : IRepository {
    //this method fetches data from room
        override fun getData(): Flowable<Boolean> {
            return localDatasource.shouldFetchRemote().subscribeOn(subscriberScheduler)
           .observeOn(observerScheduler)
        }
// makes api call
override fun getRemoteData(): Flowable<Data> {
            return remoteDatasource.getData().subscribeOn(subscriberScheduler)
           .observeOn(observerScheduler)
        }

subscriberScheduler - это Schedulers.io (), а планировщик наблюдателя - AndroidSchedulers.mainThread (). Я получаю исключение, когда делаю запрос из комнаты, говоря, что операция находится в основном потоке. Также, когда я получаю данные из удаленного источника, я проверяю поток, это основной поток, но это не исключение, как сетевой вызов в основном потоке.

Вот мой класс localsource, который использует место:

class Localsource  constructor(private val dataDao: DataDao):ILocalSource {

    override fun shouldFetchRemote(): Flowable<Boolean> {
        if (Looper.getMainLooper().thread == Thread.currentThread()) {
            Log.v("thread","main thread")
            //this log prints
        }
       //exception thrown here
       return Flowable.just(dataDao.isDataPresent() != 0)
}

Вот класс для RemoteSource

@OpenForTesting
class Remotesource @Inject constructor():IRemoteSource{



    override fun getData(): Flowable<Data> {
        if (Looper.getMainLooper().getThread() == Thread.currentThread()) {
            Log.v("thread","main thread") 
    //this log prints but no exception is thrown like network call on main thread.

        }
        return service.getData().flatMap { Flowable.just(it.data) }
    }
    }

person ozmank    schedule 07.09.2019    source источник


Ответы (1)


Ваши предположения о том, что происходит, ошибочны. Это проблема.

Давайте посмотрим на метод shouldFetchRemote().

//This part will always be on the main thread because it is run on it.
//Schedulers applied only for the created reactive 
//stream(Flowable, Observable, Single etc.) but not for the rest of the code in the method.
        if (Looper.getMainLooper().thread == Thread.currentThread()) {
            Log.v("thread","main thread")
            //this log prints
        }
       //exception thrown here
//Yes it is correct that exception is thrown in this line 
//because you do reach for the database on the main thread here.

//It is because Flowable.just() creates stream out of the independent data 
//that does not know anything about scheduler here.

// dataDao.isDataPresent() - is run on the main thread 
//because it is not yet part of the reactive stream - only its result is!!!! 
//That is crucial
       return Flowable.just(dataDao.isDataPresent() != 0)

Чтобы включить функцию в поток, вам нужно использовать другой подход. В комнате есть возможность напрямую возвращать Flowables и сохранять логические значения. Таким образом, вы можете использовать это так

В DAO

@Query(...)
Boolean isDataPresent(): Flowable<Boolean>

В вашем местном источнике

 override fun shouldFetchRemote(): Flowable<Boolean> = dataDao.isDataPresent()

Таким образом, он будет работать так, как ожидалось, потому что теперь вся функция является частью реактивного потока и будет реагировать на планировщики.

То же и с удаленным источником. Retrofit может возвращать Observables или Flowables из коробки

interface Service{

@GET("data")
fun getData(): Flowable<Data>

}

// and the repo will be

val service = retrofit.create(Service::class.java)

override fun getData(): Flowable<Data> = service.getData()

Таким образом, все будет работать так, как ожидалось, потому что теперь это часть потока.

Если вы хотите использовать плановые данные из Room или Retrofit - вы тоже можете это сделать. Единственное, Flowable.just () работать не будет.

Например, для вашего локального источника вам нужно будет сделать что-то вроде

//DAO
@Query(...)
Boolean isDataPresent(): Boolean


 override fun shouldFetchRemote(): Flowable<Boolean> = Flowable.create<Boolean>(
                    { emitter ->
                      emitter.onNext(dataDao.isDataPresent())
                      emitter.onComplete() //This is crucial because without onComplete the emitter won't emit anything
//There is also emitter.onError(throwable: Throwable) to handle errors

 }, BackpressureStrategy.LATEST).toObservable() // there are different Backpressure Strategies

Есть аналогичные фабрики для Obserwable и другого реактивного потока.

И вообще я бы рекомендовал вам прочитать документацию.

person Pavlo Ostasha    schedule 07.09.2019
comment
удаленная служба возвращает мне весь ответ, и я должен отобразить его, чтобы вернуть данные, так что это похоже на переопределение fun getData (): Flowable ‹Data› {} //service.getData возвращает Flowable ‹Response› return service.getData ( ) .flatMap {Flowable.just (it.data)}}} - person ozmank; 08.09.2019
comment
Неа. Вы можете использовать map {it.data} - если сервис возвращает Flowable, метод map () не изменяет его. flatMap () используется только для возврата источников реактивного потока - здесь он вам не нужен - у вас есть только ответ, уже завернутый в реактивный поток. Вам не нужна еще одна упаковка. - person Pavlo Ostasha; 08.09.2019