Android RxJava и цепочка наблюдаемых

Я пытаюсь связать несколько наблюдаемых вместе и выполнить некоторые действия в зависимости от того, какое наблюдаемое было выполнено. Но я столкнулся со странным поведением.

class MainActivity : AppCompatActivity() {

    val TAG: String = MainActivity::class.java.name

    private lateinit var clicker: TextView

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        clicker = findViewById(R.id.clicker) as TextView
        clicker.setOnClickListener {
            val i = AtomicInteger()

            getFirstObservable()
                .subscribeOn(Schedulers.computation())
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext {
                    showMessage(i, it)
                }
                .flatMap { getSecondObservable() }
                .doOnNext {
                    showMessage(i, it)
                }
                .flatMap { getThirdObservable() }
                .doOnNext {
                    showMessage(i, it)
                }
                .subscribe()
        }
    }

    fun getFirstObservable(): Observable<String> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            "Hello"
        }
    }

    fun getSecondObservable(): Observable<Int> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            3
        }
    }

    fun getThirdObservable(): Observable<String> {
        return Observable.fromCallable {
            Thread.sleep(2000)
            "World!"
        }
    }

    fun showMessage(i: AtomicInteger, obj: Any) {
        val msg = "Message #${i.incrementAndGet()}: from ${Thread.currentThread().name}: $obj"
        Log.e(TAG, msg)
        clicker.text = msg
        Toast.makeText(this, msg, Toast.LENGTH_SHORT).show()
    }
}

В этом примере журналы будут отображаться каждые 2 секунды, но все изменения с представлениями будут выполняться, когда будет завершен последний наблюдаемый объект.

12-04 01:11:30.465 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #1: from main: Hello
12-04 01:11:32.473 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #2: from main: 3
12-04 01:11:34.479 19207-19207/com.googlevsky.rxtest E/com.googlevsky.rxtest.MainActivity: Message #3: from main: World!

Я думаю, что это поведение AndroidScheduler.mainThread(), потому что, когда я удаляю эту строку и оборачиваю изменения такими представлениями

Handler(Looper.getMainLooper()).post {
    clicker.text = msg
    Toast.makeText(this, msg, Toast.LENGTH_SHORT).show()
}

поведение становится правильным. Так может ли кто-нибудь объяснить это поведение и предложить правильный способ решения этой проблемы?


person hluhovskyi    schedule 03.12.2016    source источник
comment
doOnNext/flatMap и другие вызываются не в основном потоке. наблюдатьOn - устанавливает планировщик для параметра, переданного в метод subscribe().   -  person Ufkoku    schedule 04.12.2016
comment
@Ufkoku я добавил журналы к вопросу. doOnNext вызывается в основном потоке   -  person hluhovskyi    schedule 04.12.2016


Ответы (1)


Большая часть вашего кода выполняется в основном потоке, включая спящие. Когда создается наблюдаемое, оно подписывается и наблюдается в текущем потоке, если не указано иное. Когда вы создаете свои вторые и третьи наблюдаемые, они находятся в основном потоке. Кроме того, поскольку наблюдаемая работа не имеет фона, она выполняется сразу же в текущем потоке при подписке. Поэтому вся работа и наблюдения происходят в основном потоке, не уступая ОС Android. Пользовательский интерфейс заблокирован в ожидании времени в основном потоке. Если вы увеличите это время сна, вы можете вызвать ANR. Чтобы исправить это, вы можете указать observeOn и subscribeOn для каждого из ваших наблюдаемых, чтобы передать работу в поток вычислений для каждого из них.

getFirstObservable().subscribeOn(Schedulers.computation())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .flatMap {
                        getSecondObservable().subscribeOn(Schedulers.computation())
                                             .observeOn(AndroidSchedulers.mainThread()) 
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .flatMap { 
                        getThirdObservable().subscribeOn(Schedulers.computation())
                                            .observeOn(AndroidSchedulers.mainThread()) 
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .doOnNext {
                        showMessage(i, it)
                    }
                    .subscribe()
person iagreen    schedule 04.12.2016