Может ли подписчик выступать в роли издателя?

С точки зрения реактивных потоков, есть издатель, и у него может быть столько же подписчиков.

Но предположим, что подписчик получает сообщение от Publisher. Теперь этот подписчик (скажем, Subs1) изменяет/модифицирует сообщение и передает его другому подписчику (скажем, Subs2), который использует измененное сообщение.

Так может ли этот подписчик Subs1 выступать в роли издателя, который может передавать сообщение новому подписчику Subs2?

Я не уверен, возможно это или нет, но сценарий возможен, я думаю.

Если это возможно, пожалуйста, предложите возможный способ сделать это.


person KayV    schedule 22.12.2017    source источник


Ответы (1)


Если мы хотим преобразовать входящее сообщение и передать его дальше следующему подписчику, нам нужно реализовать интерфейс Processor. Он действует как подписчик, поскольку получает сообщения, и как издатель, поскольку обрабатывает эти сообщения и отправляет их для дальнейшей обработки.

Вот полная реализация для этого:

  1. Создайте класс MyTransformer, который реализует процессор и расширяет SubmissionPublisher, поскольку он будет действовать как подписчик и издатель:

    import java.util.concurrent.Flow;
    import java.util.concurrent.Flow.Subscription;
    import java.util.concurrent.SubmissionPublisher;
    import java.util.function.Function;
    
    public class MyTransformer<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
    
    private Function<T, R> function;
    private Flow.Subscription subscription;
    
    public MyTransformer(Function<T, R> function) {
        super();
        this.function = function;
    }
    
    @Override
    public void onComplete() {
        System.out.println("Transformer Completed");
    }
    
    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }
    
    @Override
    public void onNext(T item) {
        System.out.println("Transformer Got : "+item);
        submit(function.apply(item));
        subscription.request(1);
    
    }
    
    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }
    
    
    
    }
    
  2. Создайте класс TestSubscriber, который реализует интерфейс Subscriber и реализует необходимые методы:

Метод onSubscribe() вызывается перед началом обработки. Экземпляр подписки передается в качестве аргумента. Это класс, который используется для управления потоком сообщений между подписчиком и издателем.

Основным методом здесь является onNext() — он вызывается всякий раз, когда издатель публикует новое сообщение.

Мы используем класс SubmissionPublisher, реализующий интерфейс Publisher.

Мы собираемся отправить N элементов издателю, которые получит наш TestSubscriber.

Обратите внимание, что мы вызываем метод close() для экземпляра TestSubscriber. Он будет вызывать обратный вызов onComplete() для каждого подписчика данного издателя.

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;

public class TestSubscriber<T> implements Subscriber<T> {

    private Subscription subscription;

    public List<T> consumed = new LinkedList<>();

    @Override
    public void onComplete() {
        System.out.println("Subsciber Completed");
    }

    @Override
    public void onError(Throwable arg0) {
        arg0.printStackTrace();
    }

    @Override
    public void onNext(T item) {
        System.out.println("In Subscriber Got : "+item);
        subscription.request(1);

    }

    @Override
    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

}
  1. Вот поток обработки, в котором издатель публикует элементы String.

MyTransformer анализирует строку как целое число, что означает, что здесь должно произойти преобразование.

import java.util.List;
import java.util.concurrent.SubmissionPublisher;;

public class TestTransformer {

    public static void main(String... args) {
        SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        MyTransformer<String, Integer> transformProcessor = new MyTransformer<>(Integer::parseInt);

        TestSubscriber<Integer> subscriber = new TestSubscriber<>();
        List<String> items = List.of("1", "2", "3");

        List<Integer> expectedResult = List.of(1, 2, 3);

        publisher.subscribe(transformProcessor);
        transformProcessor.subscribe(subscriber);
        items.forEach(publisher::submit);
        publisher.close();

    }
}
person KayV    schedule 22.12.2017