У меня такой код:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
И вот результат:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
Важная деталь: я вызываю метод subscribe
из другого потока (поток main
в Android).
Итак, похоже, что класс Observable
является синхронным и по умолчанию выполняет все (такие операторы, как map
+ уведомление подписчиков) в том же потоке, который генерирует события (s.onNext
), верно? Интересно ... это намеренное поведение или я что-то неправильно понял? На самом деле я ожидал, что по крайней мере onNext
и onComplete
обратные вызовы будут вызываться в потоке вызывающего абонента, а не в том, который генерирует события. Правильно ли я понимаю, что в данном случае реальная нить звонящего не имеет значения? По крайней мере, когда события генерируются асинхронно.
Еще одна проблема - что, если я получаю некоторый Observable в качестве параметра из какого-то внешнего источника (т.е. я не генерирую его самостоятельно) ... у меня, как у пользователя, нет возможности проверить, является ли он синхронным или асинхронным, и Мне просто нужно явно указать, где я хочу получать обратные вызовы с помощью методов subscribeOn
и observeOn
, верно?
Спасибо!