RxJava: наблюдаемый поток и поток по умолчанию

У меня такой код:

Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
                Thread thread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        s.onNext("1");
                        s.onComplete();
                    }
                });
                thread.setName("background-thread-1");
                thread.start();
            }
        }).map(new Function<String, String>() {
            @Override
            public String apply(@NonNull String s) throws Exception {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("map: thread=" + threadName);
                return "map-" + s;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {}

            @Override
            public void onNext(String s) {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
            }

            @Override
            public void onError(Throwable e) {}

            @Override
            public void onComplete() {
                String threadName = Thread.currentThread().getName();
                logger.logDebug("onComplete: thread=" + threadName);
            }
        });

И вот результат:

map: thread=background-thread-1 
onNext: thread=background-thread-1, value=map-1 
onComplete: thread=background-thread-1

Важная деталь: я вызываю метод subscribe из другого потока (поток main в Android).

Итак, похоже, что класс Observable является синхронным и по умолчанию выполняет все (такие операторы, как map + уведомление подписчиков) в том же потоке, который генерирует события (s.onNext), верно? Интересно ... это намеренное поведение или я что-то неправильно понял? На самом деле я ожидал, что по крайней мере onNext и onComplete обратные вызовы будут вызываться в потоке вызывающего абонента, а не в том, который генерирует события. Правильно ли я понимаю, что в данном случае реальная нить звонящего не имеет значения? По крайней мере, когда события генерируются асинхронно.

Еще одна проблема - что, если я получаю некоторый Observable в качестве параметра из какого-то внешнего источника (т.е. я не генерирую его самостоятельно) ... у меня, как у пользователя, нет возможности проверить, является ли он синхронным или асинхронным, и Мне просто нужно явно указать, где я хочу получать обратные вызовы с помощью методов subscribeOn и observeOn, верно?

Спасибо!


person fraggjkee    schedule 16.04.2017    source источник


Ответы (1)


RxJava не знает параллелизма. Он будет генерировать значения в подписывающемся потоке, если вы не используете какой-либо другой механизм, такой как наблюдениеOn / subscribeOn. Пожалуйста, не используйте низкоуровневые конструкции, такие как операторы Thread in, вы можете нарушить контракт.

Из-за использования потока onNext будет вызываться из вызывающего потока ('background-thread-1'). Подписка происходит при вызове (UI-Thread). Каждый оператор в цепочке будет вызываться из вызывающего потока 'background-thread-1'. Подписка onNext также будет вызываться из background-thread-1.

Если вы хотите создавать значения не в вызывающем потоке, используйте команду subscribeOn. Если вы хотите переключить поток обратно в основной, используйте наблюдениеOn где-нибудь в цепочке. Скорее всего до того, как на него подписаться.

Пример:

Observable.just(1,2,3) // creation of observable happens on Computational-Threads
            .subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
            .map(integer -> integer) // map happens on Computational-Threads
            .observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
            .subscribe(integer -> {
                // called from mainThread
            });

Вот хорошее объяснение. http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html

person Hans Wurst    schedule 16.04.2017