RxJava: использование SubscribeOn приводит к выходу программы без завершения

Это мой пример кода

Observable.range(1,5)
            .subscribeOn(Schedulers.computation())
            .map(Observables05::doSomething)
            .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("done"));

Мой метод doSomething:

public static int doSomething(int i) {
    try {
        System.out.println("Processing " + i +
                " on Thread -- " + Thread.currentThread().getName());
        Thread.sleep(500);
        return i;
    } catch (InterruptedException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    }
}

Использование только примера кода в моем основном потоке просто завершает работу программы. Однако, если после этого используется Thread.sleep(3000), программа работает правильно до выхода, когда заканчивается время сна.

Это ожидаемое поведение и почему? Как я могу запустить этот код без использования Thread.sleep?


person Dripto    schedule 25.01.2017    source источник


Ответы (3)


subscribeOn планирует subscribe вызов фонового потока (вы выбрали computation планировщик). После этого планирования ваш основной поток свободен, т.е. завершает вашу программу.

Каким-то образом вам нужно дождаться завершения всех желаемых задач перед выходом. Thread.sleep(3000) подходит для простых тестовых случаев.

Реальные программы обычно не завершаются так быстро. Еще бывают случаи, когда нужно дождаться завершения какой-то фоновой задачи. Существуют различные механизмы синхронизации потоков (например, CountDownLatch), которые вы можете использовать для этого.

person Yaroslav Stavnichiy    schedule 25.01.2017
comment
Это происходит из-за того, что фоновые потоки работают как демоны? - person Juan; 28.05.2019


Это ожидаемое поведение. Простое решение для вас, поскольку вы выполняете метод main, заключается в использовании toBlocking() перед подпиской - как показано ниже -

    Observable.range(1,5)
    .subscribeOn(Schedulers.computation())
    .map(DummyJunk::doSomething)
    .toBlocking()
    .subscribe(System.out::println, Throwable::printStackTrace, () -> System.out.println("done"));
person Pavan Kumar    schedule 27.01.2017