Spring Cloud Stream Kafka DLQ через повторную попытку / восстановление

Мой вариант использования - получать сообщения Kafka, делать несколько попыток вызова отдыха и при исчерпании ресурсов выгружать неудавшееся сообщение в тему Kafka DLQ.

    @StreamListener(EventSource.SOME_CHANNEL)
    public void processMessage(Message<?> unsolicitedMessage) {
         String aString = .....   
         oneService.act(aString);      
    }

@Retryable отлично работает с точки зрения логики передачи для нескольких попыток.

    @Retryable(value = {OneException.class, TwoException}, maxAttempts = 3, 
    backoff = @Backoff(delay = 1000))
    public boolean act(String message, String endPoint) {
         //do stuff
    }

Для включения публикации Kafka DLQ в Spring Cloud Stream (enableDlq: true) исключение необходимо передать в аннотированный метод @StreamListener, чтобы связыватель Kafka мог выполнить необходимое.

Однако при этом я не могу использовать аннотированный метод @Recover, где поток идеально приземляется после повторной попытки:

@Recover
public boolean recoverOnToDLQ(OneException ex, String message, String 
endPoint) {
    throw ex; //Required for StreamListener Kafka DLQ to kick in! 
}

Вопрос: Есть ли способ запустить публикацию Kakfa DLQ из метода @Recover без повторной генерации исключения?

Потому что, если я использую его только для повторного броска, я считаю, что не смогу эффективно использовать более жесткий контроль, полученный с его помощью. Это также упростит модульные тесты и лучше улавливает логику на уровне кода? Есть какие-нибудь мысли, как с этим лучше справиться?

Я использую все последние версии для spring-cloud, spring-cloud-stream и spring-retry на эту дату.


person barsakus    schedule 20.07.2018    source источник


Ответы (1)


Это можно сделать, но вопрос в том, «а зачем вам это нужно?».

В связывателе есть встроенная функция повтора; просто отправьте исключение обратно в связыватель, и он отправит данные в DLQ после того, как попытки будут исчерпаны. Повторная попытка настраивается с использованием свойств повтора потребителя связывателя. Вам не понадобится ни строчки дополнительного кода.

Используя @Retryable, вы вкладываете 2 RetryTemplates (если вы не отключили повторную попытку связующего, установив для свойства потребителя maxAttempts значение 1).

Конечно, вы можете настроить свое собственное место назначения DLQ и просто записать все, что хотите, в своем восстановителе. Но для использования встроенного в связывателя издателя DLQ вам нужно будет создать специальный ErrorMessage (со свойствами, необходимыми издателю) и отправить его в канал ошибок привязки. Издателю нужна необработанная кафка ConsumerRecord, которую вам придется воссоздать, поскольку она недоступна для слушателя.

В целом, для вашего варианта использования кажется намного проще просто использовать конфигурацию повтора связывателя.

ИЗМЕНИТЬ

В 2.0.x вы можете добавить RetryTemplate @Bean в свое приложение, и он будет использоваться для всех привязок потребителей (переопределив свойства привязки).

Этот шаблон можно настроить с помощью любой политики повторных попыток, повторяющихся исключений и т. Д. И т. Д.

В 2.0.2 он должен быть квалифицирован с помощью @StreamRetryTemplate; это было исправление, потому что любое RetryTemplate переопределяет свойства, что может быть нежелательным.

person Gary Russell    schedule 21.07.2018
comment
Привет @ Гэри Рассел, спасибо. Да, использование нестандартных настроек путем включения параметров потребителя в YML работает как шарм. Механизм retry должен срабатывать только при определенных ошибках (неавторизованных, недоступных). Для всех остальных (неверный запрос, бизнес-ошибка и т. Д.) Сообщение может идти прямо в DLQ. Из коробки (на основе конфигурации) будет повторять каждое сообщение об ошибке? Отсюда возникла мысль о фактической реализации для охвата отдельных сценариев и настройки журнала ошибок. Мы также можем создавать надежные модульные / макетные тесты специально для наших вариантов использования (например, n-повторных попыток против 1-го повторения и т. Д.) - person barsakus; 21.07.2018
comment
См. Изменение моего ответа; вместо свойств привязки можно указать пользовательский RetryTemplate, который будет использоваться связывателем. - person Gary Russell; 22.07.2018
comment
Привет, @GaryRussell, не могли бы вы подсказать, как добавить метод восстановления для @StreamRetryTemplate, чтобы я мог писать логику после исчерпания повторных попыток. Я использую связыватель rabit-mq, ​​и я не хочу отправлять сообщения в dlq, я просто хочу сохранить сообщение об ошибке в db и закончить. - person vikki; 01.06.2020
comment
Не задавайте новых вопросов по старым ответам; в настоящее время вы не можете этого сделать, но вы можете добавить собственный обработчик ошибок в контейнер слушателя; если вам нужно, чтобы я показал вам, как; задать новый вопрос; Я не хочу засорять этот ответ несвязанной информацией. - person Gary Russell; 01.06.2020
comment
Я слежу за тегами, поэтому получаю автоматические уведомления. - person Gary Russell; 01.06.2020