Spring Integration Java DSL с использованием JMS retry / redlivery

Как я могу эффективно поддерживать повторную доставку JMS, когда обработка сообщений вызывает исключение?

У меня есть поток, использующий JMS (ActiveMQ) с connectionFactory, который настроен так, чтобы разрешить n попыток повторной доставки.

Я хотел бы, чтобы любая ошибка, возникающая при обработке сообщения, заставляла сообщение возвращаться для повторной доставки столько раз, сколько позволяет конфигурация connectionFactory, а затем, когда максимальное количество попыток повторной доставки исчерпано, доставить в DLQ. как обычно с AMQ.

Ответ на связанный вопрос SO подразумевает, что у меня может быть errorChannel, который повторно генерирует, что должно вызывать повторную доставку: Spring Обработка ошибок DSL при интеграции

Но в следующих случаях этого не происходит:

/***
 * Dispatch msgs from JMS queue to a handler using a rate-limit
 * @param connectionFactory
 * @return
 */
@Bean
public IntegrationFlow flow2(@Qualifier("spring-int-connection-factory") ConnectionFactory connectionFactory) {

    IntegrationFlow flow =  IntegrationFlows.from(
            Jms.inboundAdapter(connectionFactory)
                    .configureJmsTemplate(t -> t.receiveTimeout(1000))
                    .destination(INPUT_DIRECT_QUEUE),
            e -> e.poller(Pollers
                    .fixedDelay(5000)
                    .errorChannel("customErrorChannel")
                    //.errorHandler(this.msgHandler)
                    .maxMessagesPerPoll(2))
    ).handle(this.msgHandler).get();

    return flow;
}

@Bean
public MessageChannel customErrorChannel() {
    return MessageChannels.direct("customErrorChannel").get();
}

@Bean
public IntegrationFlow customErrorFlow() {
    return IntegrationFlows.from(customErrorChannel())
            .handle ("simpleMessageHandler","handleError")
            .get();
}

Метод errorChannel подразумевает:

   public void handleError(Throwable t) throws Throwable {
        log.warn("got error from customErrorChannel");
        throw t;
    }

Когда исключение генерируется обработчиком в потоке 2, errorChannel действительно получает исключение, но затем повторное генерирование вызывает исключение MessageHandlingException:

2018-08-13 09:00:34.221  WARN 98425 --- [ask-scheduler-5] c.v.m.i.jms.SimpleMessageHandler         : got error from customErrorChannel
2018-08-13 09:00:34.224  WARN 98425 --- [ask-scheduler-5] o.s.i.c.MessagePublishingErrorHandler    : Error message was not delivered.

org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [simpleMessageHandler]; nested exception is java.lang.IllegalArgumentException: dont want first try, failedMessage=GenericMessage [payload=Enter some text here for the message body..., headers={jms_redelivered=false, jms_destination=queue://_dev.directQueue, jms_correlationId=, jms_type=, id=c2dbffc8-8ab0-486f-f2e5-e8d613d62b6a, priority=0, jms_timestamp=1534176031021, jms_messageId=ID:che2-39670-1533047293479-4:9:1:1:8, timestamp=1534176034205}]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.integration.handler.BeanNameMessageProcessor.processMessage(BeanNameMessageProcessor.java:61) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.0.7.RELEASE.jar:5.0.7.RELEASE]

person mmeyer    schedule 13.08.2018    source источник


Ответы (1)


Он будет работать с адаптером канала, управляемым сообщениями, но я предполагаю, что это не то, что вам нужно, из-за этот вопрос.

Поскольку опрашиваемый адаптер использует операцию JmsTemplate.receive(), сообщение уже подтверждено к моменту вызова потока.

Вам необходимо использовать транзакционный опросчик с JmsTransactionManager, чтобы исключение, вызванное потоком ошибок, откатало транзакцию, и сообщение было доставлено повторно.

person Gary Russell    schedule 13.08.2018
comment
Имеет смысл, что у опросчика уже есть сообщение. Я изучу использование txn. Спасибо! - person mmeyer; 13.08.2018
comment
Некоторые связанные вопросы: stackoverflow.com/questions/51828441/ - person Artem Bilan; 13.08.2018