Как связать нереактивный Spring EventListener и реактивный Flux

В чем разница между созданием потока напрямую путем вызова Flux.push и использованием приемника в выражении лямбады push по сравнению с использованием приемника, предоставленного DirectProcessor?

В минимальном примере, где Flux просто генерирует пару событий, я мог бы сделать

Flux.<String>push(emitter -> {
   emitter.next("One");
   emitter.next("Two");
   emitter.complete();
 });

по сравнению с использованием DirectProcessor

var emitter = DirectProcessor.<String>create().sink();
emitter.next("One");
emitter.next("Two");
emitter.complete();

Просто чтобы уточнить: я знаю, что могу использовать здесь Flux.just, но мой вариант использования на самом деле строит мост между Spring @EventListeners и Spring WebFlux, где я хочу создать Flux для каждого входящего запроса SSE для определенного ресурса, а затем публиковать события в этот Flux.

Может ли кто-нибудь сказать мне, действительны ли оба подхода? Конечно, разница должна быть. В частности, раздел Справочное руководство по Reactor на DirectProcessor гласит:

С другой стороны, у него есть ограничение на отсутствие противодавления. Как следствие, DirectProcessor сигнализирует своим подписчикам об исключении IllegalStateException, если вы проталкиваете через него N элементов, но по крайней мере один из его подписчиков запросил меньше N.

Что это обозначает?

[EDIT:] В более ранней версии вопроса я использовал Flux.generate() вместо Flux.push(), что, очевидно, неверно, поскольку сгенерировать может создать не более одного события.

[EDIT 2:] @ 123 попросил меня привести полный пример того, чего я пытаюсь достичь. Потерпите меня, это изрядное количество кода для SO-вопроса:

Полный пример того, что я на самом деле пытаюсь сделать

Я хотел бы построить мост между (нереактивным) прослушивателем событий домена Spring и реактивным Flux, который я затем могу использовать в конечной точке WebFlux для публикации SSE. В следующих фрагментах кода для краткости используются аннотации Lombok.

Предположим, что я в конечном итоге хочу опубликовать состояние пользователя в процессе адаптации как SSE. Вот перечисление:

public enum ProcessState {
  CREATED(false),
  VERIFIED(false),
  AUTHORIZATION_PENDING(false),
  AUTHORIZED(false),
  ACTIVE(true);

  @Getter
  private final boolean terminalState;

  ProcessState(boolean terminalState) {
    this.terminalState = terminalState;
  }

}

Нереактивная бизнес-логика будет публиковать StateChangedEvents всякий раз, когда изменяется состояние любого пользователя:

@Data
@RequiredArgsConstructor
public class StateChangedEvent {
  private final UUID userId;
  private final ProcessState newState;
}

Отсюда и мой первоначальный вопрос. Как мне построить мост, который переводит события этой области в поток Flux? Мои требования:

  • Текущее состояние процесса должно быть передано, как только новый клиент зарегистрируется.
  • Поток Flux должен завершаться всякий раз, когда достигается "терминальное" состояние подключения.

Вот что у меня есть на данный момент:

@Component
@RequiredArgsConstructor
class EventBridge {

  @RequiredArgsConstructor(access = PRIVATE)
  private static class Subscriber {
    private final UUID userId;
    private final FluxSink<ProcessState> sink;
    private boolean eventEmitted;
  }

  private final UserRepository repository;
  private final Map<UUID, Subscriber> subscribers = new ConcurrentHashMap<>();

  @EventListener
  void stateChanged(StateChangedEvent event) {
    notifySubscribers(event);
  }

  Flux<ProcessState> register(UUID userId) {
    return Flux.push(emitter -> addSubscriber(userId, emitter));
  }

  private Subscriber addSubscriber(UUID userId, FluxSink<ProcessState> sink) {
    var subscriptionId = randomUUID();
    var subscriber = new Subscriber(userId, sink);
    subscribers.put(subscriptionId, subscriber);
    sink
      .onRequest(n -> poll(subscriber))
      .onDispose(() -> removeSubscriber(subscriptionId));
    return subscriber;
  }

  private void poll(Subscriber subscriber) {
    emit(subscriber, loadCurrentState(subscriber), true);
  }

  private ProcessState loadCurrentState(Subscriber subscriber) {
    return repository.findById(subscriber.userId).getProcessState();
  }

  private void removeSubscriber(UUID subscriptionId) {
    subscribers.remove(subscriptionId);
  }

  private void notifySubscribers(StateChangedEvent event) {
    subscribers.values().stream()
      .filter(subscriber -> subscriber.userId.equals(event.getUserId()))
      .forEach(subscriber -> emit(subscriber, event.getNewState(), false));
  }

  private void emit(Subscriber subscriber, ProcessState processState, boolean onlyIfFirst) {
    synchronized (subscriber) {
      if (onlyIfFirst && subscriber.eventEmitted) {
        return;
      }
      subscriber.sink.next(processState);
      if (processState.isTerminalState()) {
        subscriber.sink.complete();
      }
      subscriber.eventEmitted = true;
    }
  }

}

И, наконец, контроллер, на котором используется мост:

@RestController
@RequiredArgsConstructor
class UserController {

  private final EventBridge eventBridge;

  @GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
  Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
    return eventBridge.register(userId).map(response -> ServerSentEvent.builder((ProcessState) response).build());
  }

}

В моем коде моста есть пара проблем, которые я не могу понять:

  • Мне действительно нужно синхронизировать свой Subscriber экземпляр, чтобы избежать записи устаревших событий из poll исходного состояния? Если я этого не сделаю, произойдет так, что событие StateChange прибывает и публикуется до чтения текущего состояния из репозитория, который затем выталкивается не по порядку. Конечно, должен существовать более элегантный способ решения этой проблемы в стиле Flux без ключевого слова synchronized.

  • Мы уже исключили Flux.generate, похоже, это работает с Flux.push, Flux.create будет генерировать намного больше событий SSE? Почему? Боюсь, я не понимаю различий между этими тремя.

  • Вместо того, чтобы использовать статические методы на Flux, следует ли мне использовать здесь DirectProcessor или любой другой процессор? Я новичок во всем реактивном стеке, и документация Spring Reactor для меня слишком расплывчата, tbh. Еще раз: в чем разница? Как насчет того комментария о противодавлении, о котором я упоминал выше?


person Stefan Haberl    schedule 01.04.2020    source источник
comment
Это Flux.generate даже недействительно, оно должно вызывать исключение. Вы можете использовать next только один раз за проход.   -  person 123    schedule 05.04.2020
comment
Спасибо, что указали на это, я перефразировал свой вопрос, используя Flux.push вместо Flux.generate   -  person Stefan Haberl    schedule 06.04.2020
comment
Можете ли вы привести пример того, что вы пытаетесь с ними сделать? Во втором примере создается просто FluxSink, которое в примере Flux.push просто обернуто в Consumer.   -  person 123    schedule 07.04.2020
comment
Добавлен полный пример кода того, что у меня есть   -  person Stefan Haberl    schedule 07.04.2020
comment
Ура, сегодня нет времени, но я посмотрю завтра   -  person 123    schedule 07.04.2020
comment
Я опубликовал свой подход ниже. Что касается ваших подвопросов, я думаю, что они могут потребовать новых вопросов, поскольку, вероятно, потребуется разместить примеры кода для правильного объяснения.   -  person 123    schedule 08.04.2020


Ответы (2)


Итак, если я понимаю, что вы пытаетесь сделать правильно, я думаю, что ваше решение можно было бы значительно упростить.

@Component
public class EventBridge {

    private final UserRepository repository;
    private final ReplayProcessor<StateChangedEvent> processor;
    private final FluxSink<StateChangedEvent> sink;


    EventBridge(UserRepository repository){
        this.repository= repository;
        //Replays events from last 100S for every new subscriber
        this.processor = ReplayProcessor.createTimeout(Duration.ofSeconds(100L));
        //Sink provides thread safe next,complete and error for subscribers
        this.sink = processor.sink();
    }

    public void changeState(StateChangedEvent event) {
        //Literally just pass event into sink, calls onNext on subscribers
        sink.next(event);
    }

    public Flux<ProcessState> streamProcessStateForUser(UUID uuid){
        return
                //Search repository first, this isn't great since it blocks until 
                //the repo returns, although that seems to be what you want
                //Also I added an Unknown to ProcessState, since it's better than 
                //it being null. 
                //Also you should probably return optional from repo. 
            Flux.concat(
                    Flux.just(
                            userRepo.findById(uuid).map(User::getProcessState).orElse(ProcessState.UNKNOWN)
                    ),
                    processor
                            //Check the uuid matches the event
                            .filter(stateChangedEvent -> stateChangedEvent.getUserId().equals(uuid))
                            //Time out after 100 seconds, not needed but may be useful for you
                            .take(Duration.ofSeconds(100L))
                            //Complete flux when at terminal state
                            .takeUntil(stateChangedEvent -> stateChangedEvent.getNewState().isTerminalState())
                            //Convert to ProcessState from StateChangedEvent
                            .map(StateChangedEvent::getNewState)
            );
    }

}

Должен быть в состоянии сохранить все остальное без изменений.

person 123    schedule 08.04.2020
comment
Я попробую это через минуту, но уточнить: зачем мне ReplayProcessor с таймаутом в 100 секунд? Если бы меня не волновали прошлые события, что бы я использовал? - person Stefan Haberl; 08.04.2020
comment
и кстати: блокировка в порядке. Я имею дело с пользовательскими событиями, которые происходят в течение минут, а не миллисекунд. - person Stefan Haberl; 08.04.2020
comment
и еще одна вещь: не могу опубликовать здесь реальный вариант использования, но да, у нас есть состояние UNKNOWN и да, наши репозитории возвращают Optionals, но спасибо за указание на это :) - person Stefan Haberl; 08.04.2020
comment
Почти радует, но первое состояние сейчас публикуется дважды. Я предполагаю, что один раз чтение из репо и второй раз из ReplayProcessor - person Stefan Haberl; 08.04.2020
comment
@StefanHaberl Я предложил процессор воспроизведения, поскольку, если он не используется, вы потеряете все состояния, которые возникают при поиске в репо. Я бы, наверное, использовал в вашем случае DirectProcessor. Должна быть возможность просто переключать части, чтобы лучше соответствовать вашему варианту использования. Вы можете выключить его на месте. т.е. this.processor = DirectProcessor.create() - person 123; 08.04.2020
comment
Ах да, вам придется обновить экземпляр var обратно до DirectProcessor, как вы это делали при редактировании! - person 123; 08.04.2020
comment
DirectProcessor меня не устраивает, потому что я могу снова потерять событие терминала, что приведет к блокировке потока Flux до тех пор, пока не истечет время ожидания через 100 секунд. Так ReplayProcessor это было бы. Еще две вещи: (1) дублированную публикацию, о которой я упоминал в комментарии выше, легко исправить: просто добавьте distinct() в объединенный поток и (2) блокирующий характер первого Flux.just также легко исправить, заменив внешний Flux.concat на а Flux.merge - person Stefan Haberl; 09.04.2020
comment
Еще раз спасибо @ 123 за то, что указал мне в правильном направлении. Поразмыслив, я перешел к совершенно другому, более простому решению. Я обновлю заголовок вопроса и добавлю отдельный ответ для справки в будущем. - person Stefan Haberl; 09.04.2020
comment
@StefanHaberl Wrt (2), он будет (должен) по-прежнему блокироваться, даже если вы используете слияние, поскольку вызов репо не является реактивным. Merge просто охотно подписывается на каждый источник, поэтому результаты должны чередоваться по мере их поступления. Поскольку используется just, тогда он будет эффективно работать так же, как concat со всеми элементами из just, идущими первым Wrt (1), я полностью забыл о существовании этого, хорошая находка! - person 123; 09.04.2020

Спасибо @ 123 за ответ на мой вопрос о том, как построить мост между Spring @EventListener и Flux. Как упоминалось в вопросе, полный вариант использования заключался в отправке событий домена клиенту с использованием поддержки SSE WebFlux.

Немного подумав, я понял, что вообще не имеет смысла строить этот мост, потому что в сценарии с несколькими экземплярами HTTP-запрос может попасть в другой экземпляр, чем тот, где выполняется процесс адаптации, и, следовательно, никакие события вообще не будут отправляться.

В конце концов, я решил опросить единственный источник истины - базу данных - и отправлять события SSE всякий раз, когда изменяется состояние. Было бы здорово, если бы мы могли использовать здесь реактивное хранилище данных, но пока я «застрял» с Spring Data JPA и PostgreSQL.

Итак, если у кого-то такая же проблема, я в итоге построил вот так:

@RestController
@RequiredArgsConstructor
class UserController {

  private final UserRepository userRepository;

  @GetMapping(value = "/{userId}", produces = TEXT_EVENT_STREAM_VALUE)
  Flux<ServerSentEvent<ProcessState>> readAsStream(@PathVariable UUID userId) {
    return Flux.interval(ZERO, ofSeconds(1L))
      .map(n -> userRepository.findById(userId).getProcessState())
      .takeUntil(processState -> processState.isTerminalState())
      .distinctUntilChanged()
      .map(response -> ServerSentEvent.builder((ProcessState) response).build())
      .take(ofMinutes(30L));
  }

}

На всякий случай, если кому-то интересно: это снова упрощено, чтобы проиллюстрировать проблему. У нас гексагональная архитектура, то есть мы не внедряем Repositories в наши @RestController, а вызываем бизнес-фасад, также известный как порт ввода, из нашего веб-уровня для извлечения пользователей.

person Stefan Haberl    schedule 09.04.2020
comment
Не стоит беспокоиться! Из любопытства как обновить репозиторий? Что бы это ни делало, это могло бы передать событие в тему, которую слушают все соответствующие экземпляры. Тогда вам не придется повторно опрашивать репо в течение 30 минут. - person 123; 09.04.2020