Каково поведение onBackpressureBuffer в RxJava2

Что я хотел сделать, так это иметь Flowable с буфером противодавления одного элемента, который будет хранить последний созданный из потока.

Я пробовал использовать Flowable.onBackpressureBuffer (1, () -> {}, BackpressureOverflowStrategy.DROP_OLDEST). Однако это работает не так, как я ожидал

  Flowable.range(0, 10_000)
      .onBackpressureBuffer(1, {}, BackpressureOverflowStrategy.DROP_OLDEST)
      .observeOn(Schedulers.computation())
      .subscribe {
        println(it)
        Thread.sleep(5)
      }

Я ожидал, что результат будет последовательностью целых чисел, не обязательно смежных, которая должна включать последний элемент 9999. Однако он напечатал только первые несколько смежных чисел, таких как 0, 1, 2, 3, 4 ..., каждый раз разные, но не последнее число 9999.


person Feng Yang    schedule 16.08.2019    source источник


Ответы (1)


Я использую приведенный ниже код, и в конце он всегда печатает 9999. Сначала он печатает последовательные числа (до 127), а затем 9999. Возможно, в вашем случае основной исполняющий поток завершится намного раньше, чем потоки, обрабатывающие номер печати. Чтобы напечатать все числа до 9999, я попытался изменить буфер обратного давления на 10000 (и спящий режим основного потока на гораздо более высокое значение), и это, очевидно, обеспечило печать всех чисел, поскольку буфер довольно большой.

public class FlowableTest {

    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub

        Flowable.range(0, 10_000).onBackpressureBuffer(1, () -> {
        }, BackpressureOverflowStrategy.DROP_OLDEST).observeOn(Schedulers.computation()).subscribe(it -> {
            System.out.println(it);
            Thread.sleep(5);
        });

        Thread.sleep(50000); // wait the main program sufficient time to let the other threads end

    }
person Shailendra    schedule 16.08.2019
comment
Большое спасибо за быстрый ответ. Ваш ответ тут же. Как только я добавил ожидание в основной поток, все работает нормально. Кстати, я установил размер буфера равным 1 из соображений эффективности. Один из вариантов использования - распечатать обновленный журнал на основе уведомления об изменении файла журнала. В таком случае можно (или лучше) пропустить все уведомления при печати журнала на экране, но убедиться, что последнее не потеряно. - person Feng Yang; 17.08.2019