У меня есть объект, который генерирует разные строки в случайные моменты времени, и мне нужно подключиться к этому генератору, чтобы взять эти строки и предоставить их пользовательскому интерфейсу (возможно, это будет несколько подписчиков в разных действиях). Предположим, я получил следующий код:
генератор:
class Generator {
private var stringToGenerate = ""
var subject: BehaviorSubject<String> = BehaviorSubject.create<String>()
init {
//seems like these instructions are skipped
subject
.subscribeOn(AndroidSchedulers.mainThread())
.doOnNext { t -> Log.i("subject doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
.observeOn(AndroidSchedulers.mainThread())
.map { _ -> Log.i("subject map", Thread.currentThread().name + " " + Thread.currentThread().id) }
//imitation of async creating of strings in separate thread
timer("timerThread", false, 2000L, 2000L) {
stringToGenerate = System.currentTimeMillis().toString()
subject.onNext(stringToGenerate)
}
}
}
Одно из действий, которое должно потреблять сгенерированные строки:
class TestActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_test)
val wrongThreadObserver = object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.i("wrongThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
}
override fun onError(e: Throwable) {
}
}
val generator = Generator()
generator.subject.subscribe(wrongThreadObserver)
//for correct work illustration
val correctThreadObserver = object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.i("correctThreadObserver", Thread.currentThread().name + " " + Thread.currentThread().id)
}
override fun onError(e: Throwable) {
}
}
val mainThreadSubject = BehaviorSubject.create<String>()
mainThreadSubject
.doOnNext { obj -> Log.i("correctThread doOnNext", Thread.currentThread().name + " " + Thread.currentThread().id) }
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(correctThreadObserver)
mainThreadSubject.onNext("test thread")
val handler = Handler()
handler.postDelayed({ mainThreadSubject.onNext("test thread 2") }, 1000)
handler.postDelayed({ mainThreadSubject.onNext("test thread 3") }, 2000)
}
}
В этом случае correctThreadObserver, созданный прямо в активности, работает нормально, но wrongThreadObserver продолжает работать в потоке таймера, как бы игнорируя инструкции subscribeOn, ObserveOn, doOnNext< /strong> в Генераторе, независимо от того, где вызываются эти инструкции - в инициализации, в потоке таймера, в действии путем получения объекта из генератора - wrongThreadObserver все равно работает в потоке таймера. Итак, журнал:
I/correctThread doOnNext: main 2
I/correctThreadObserver: RxNewThreadScheduler-1 941
I/correctThread doOnNext: main 2
I/correctThreadObserver: RxNewThreadScheduler-1 941
I/wrongThreadObserver: timerThread 937
I/correctThread doOnNext: main 2
I/correctThreadObserver: RxNewThreadScheduler-1 941
I/wrongThreadObserver: timerThread 937
I/wrongThreadObserver: timerThread 937
I/wrongThreadObserver: timerThread 937
Нет doOnNext
и нет основного потока для wrongThreadObserver
Что я делаю не так?
subscribeOn
наobserveOn
и все должно работать, я думаю - person EpicPandaForce   schedule 10.08.2018