Spring Reactive Stream - неожиданное отключение

Мы используем Spring Cloud Reactive Streams с RabbitMQ.

Spring Reactive Stream, похоже, подтверждает сообщение, как только удаляет его из очереди. Таким образом, любые ошибки, необработанные исключения, которые происходят во время обработки сообщения, должны обрабатываться в приложении (которое отличается от нереактивного потока, в котором могут быть выброшены необработанные исключения, и сообщение будет отклонено, тем самым отправив его в очередь недоставленных сообщений. ).

Как мы должны справиться с внезапным завершением работы приложения, когда сообщение находится в полете?

Например:

  • Приложение извлекает сообщение из очереди
  • Приложение отмечает сообщение как подтвержденное
  • Приложение начинает обработку сообщения
  • Приложение закрывается до завершения обработки сообщения

Когда это происходит, сообщение кажется полностью утерянным, поскольку оно отсутствует в очереди, но приложение остановлено. Как мы можем восстановить эти сообщения?


person nosson    schedule 09.04.2020    source источник


Ответы (1)


Вам необходимо использовать ручные подтверждения и отложить подтверждение до тех пор, пока обработка не будет завершена асинхронно. Для этого вам нужно использовать все сообщение:

    @Bean
    public Consumer<Flux<Message<String>>> async() {
        return inbound -> inbound

                ...

                .map(msg -> {
                    try {
                        msg.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                                .basicAck(msg.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                    return msg.getPayload();
                })
                .subscribe(System.out::println);
    }
spring:
  cloud:
    stream:
      function.definition: async
      bindings:
        async-in-0:
          destination: testtock
          group: async
      rabbit:
        bindings:
          async-in-0:
            consumer:
              acknowledge-mode: MANUAL
              prefetch: 10

Используйте basicReject для повторной постановки в очередь или отправки в DLQ.

person Gary Russell    schedule 09.04.2020
comment
Кроме того, имейте в виду, что в отличие от обработчиков сообщений императивного стиля, где единицей обработки является одно сообщение, в реактивном режиме единицей является весь поток. У нас нет абсолютно никакого контроля над отдельными сообщениями, кроме выявления некоторых ловушек, как показал Гэри. - person Oleg Zhurakousky; 10.04.2020