Project Reactor: doOnNext (или другие doOnXXX) асинхронно

Есть ли какой-нибудь метод, подобный doOnNext, но асинхронный? Например, мне нужно сделать длинную регистрацию (уведомление отправлено по электронной почте) для определенного элемента.

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            // For example, we need to do something time-consuming only for 3

            if (v.equals(3)) {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println("LOG FOR " + v);
        });

ints.subscribe(System.out::println);

Но почему я должен ждать регистрации 3? Я хочу сделать эту логику асинхронно.

Теперь у меня есть только это решение

Thread.sleep(10000);

Scheduler myParallel = Schedulers.newParallel("my-parallel", 4);
Scheduler myParallel2 = Schedulers.newParallel("my-parallel2", 4);

Flux<Integer> ints = Flux.just(1, 2, 3, 4, 5)
        .publishOn(myParallel)
        .doOnNext(v -> {
            Mono.just(v).publishOn(myParallel2).subscribe(value -> {
                if (value.equals(3)) {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                System.out.println("LOG FOR " + value);
            });
        });

ints.subscribe(System.out::println);

Есть ли какое-нибудь «хорошее» решение для этого?


person Вадим Парафенюк    schedule 12.12.2018    source источник


Ответы (1)


Если вы абсолютно уверены, что вас не волнует, успешно ли отправится электронное письмо, то вы можете использовать "subscribe-inside-doOnNext", но я почти уверен, что это будет ошибкой.

Чтобы ваш Flux распространял сигнал onError в случае сбоя "регистрации", рекомендуется использовать flatMap.

Хорошей новостью является то, что, поскольку flatMap объединяет результаты внутренних издателей сразу с основной последовательностью, вы все еще можете немедленно выдавать каждый элемент И запускать электронное письмо. Единственное предостережение заключается в том, что все это будет завершено только после завершения отправки электронной почты Mono. Вы также можете проверить в лямбда-выражении flatMap, нужно ли вообще вести журнал (а не во внутреннем Mono):

//assuming sendEmail returns a Mono<Void>, takes care of offsetting any blocking send onto another Scheduler

source //we assume elements are also publishOn as relevant in `source`
   .flatMap(v -> {
       //if we can decide right away wether or not to send email, better do it here
       if (shouldSendEmailFor(v)) {
           //we want to immediately re-emit the value, then trigger email and wait for it to complete
           return Mono.just(v)
               .concatWith(
                   //since Mono<Void> never emits onNext, it is ok to cast it to V
                   //which makes it compatible with concat, keeping the whole thing a Flux<V>
                   sendEmail(v).cast(V.class)
               );
       } else {
           return Mono.just(v);
       }
    });
person Simon Baslé    schedule 12.12.2018
comment
Если вы абсолютно уверены, что вас не волнует, будет ли отправка электронной почты успешной, вы можете использовать subscribe-inside-doOnNext, но я почти уверен, что это будет ошибкой - это то, что я искал! - person Serhii Povísenko; 13.07.2020