Project Reactor: мне нужен процессор?

Я пытаюсь создать структуру конвейера поверх Reactor.

На каждом этапе (не считая первого и последнего) у нас есть задачи, которые преобразуют объект (то есть строку по ее длине или URL-адрес ее HTML-содержимого и т. Д.). Вот пример:

введите описание изображения здесь

Вы можете видеть, что средний уровень имеет 3 задачи, и каждая задача преобразует объект X в объект Y (кстати, это всегда полностью связанные слои)

Мой вопрос / дилемма: Моя первая мысль заключалась в том, что все, что мне нужно, это Flux.merge(), а затем подключить его к каждому подписчику. Например:

Flux<X> source = Flux.merge(x1Flux, x2Flux)  
source.subscribe(y1Subscriber)
source.subscribe(y2Subscriber)

Другой вариант - установить процессор (TopicProcessor?), Который будет действовать как промежуточное ПО (как в шаблоне pub-sub).

Мне не хватает понимания, какое решение лучше всего подходит для моей проблемы. Логически это одно и то же, но каковы практические последствия каждой архитектуры?

Спасибо!


person yaseco    schedule 24.07.2019    source источник


Ответы (1)


Мой общий подход здесь - использовать ConnectableFlux, чтобы отложить публикацию до тех пор, пока у вас не будет настроен весь конвейер, а затем вызывать connect() для каждого потока после того, как вы настроили конвейер.

Вы можете использовать процессор, но я бы посоветовал по возможности избегать этого.

Общая суть (не проверенная на синтаксис) будет примерно такой:

ConnectableFlux<String> x1 = Flux.just("x1").publish();
ConnectableFlux<String> x2 = Flux.just("x2").publish();

ConnectableFlux<String> y1 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y2 = Flux.<String>from(Flux.merge(x1, x2)).publish();
ConnectableFlux<String> y3 = Flux.<String>from(Flux.merge(x1, x2)).publish();

ConnectableFlux<String> z3 = Flux.<String>from(Flux.merge(y1, y2, y3)).publish();

x1.connect();
x2.connect();
y1.connect();
//...etc.

Также обратите внимание, что вы можете использовать concat() или mergeSequential() вместо merge(), в зависимости от вашего варианта использования (merge() будет охотно подписываться на издателей, concat() нет, а mergeSequential() будет объединяться в полученном порядке, потенциально чередуя значения.)

person Michael Berry    schedule 24.07.2019
comment
Большое спасибо, Майкл! На самом деле, это должен быть неопределенный процесс, поэтому я думаю, что merge или mergeSequential необходимы, не так ли? - person yaseco; 24.07.2019
comment
@yaseco Правильно. concat() будет ждать завершения первого потока, прежде чем перейти к следующему, поэтому в случае бесконечных потоков он никогда не завершится. - person Michael Berry; 24.07.2019
comment
эта Flux.from(...::subscribe) часть кажется подозрительной (обычно это запах кода для подписки в середине такой реактивной цепочки). Почему вы включили его вместо просто Flux.merge(x1, x2).publish())? - person Simon Baslé; 24.07.2019