запросить сообщение в rabbitmq с помощью Spring ampq

Я новичок в rabbitmq и пробую следующий сценарий

-> производитель отправляет сообщение
-> потребитель получает сообщение
- Выполнить мою собственную логику

если логика не работает - запросите

-> повторно поставить сообщение в очередь, если потребитель выходит из строя (машина выходит из строя)

Я реализовал базовый отправитель с помощью Spring rabbitTemplate

rabbitTemplate.convertAndSend(.....);

и для потребителя я реализовал прослушиватель сообщений

public class CustomMessageListener implements MessageListener {
@Override
    public void onMessage(Message message) {
       //** my own logic**
   }
}

и добавил в тару через весну

  <bean id="aListener" class="com.sample.CustomMessageListener" autowire="byName"/>

 <rabbit:listener-container id="myListenerContainer" connection-factory="connectionFactory"  acknowledge="auto" prefetch="750" concurrency="5" >
    <rabbit:listener ref="aListener" queues="reportQueue"/>
</rabbit:listener-container>

Он работает нормально до этой части.

теперь, если ** моя собственная логика **, упомянутая в слушателе, не работает. Я хочу повторно поставить сообщение в очередь. как я могу это реализовать. Из блогов, которые я просмотрел, похоже, что returnMessage нужно переопределить. Но я не уверен, как это можно сделать через слушателя.


person Pradeep    schedule 08.07.2015    source источник


Ответы (1)


С acknowledge="auto" сообщение не будет подтверждено до тех пор, пока слушатель не завершит работу в обычном режиме, так что вам больше ничего не нужно делать; если ваш слушатель выдает исключение или сервер выходит из строя, сообщение останется в очереди.

person Gary Russell    schedule 08.07.2015
comment
Спасибо за быстрый ответ. Есть ли свойство или способ игнорировать сообщение или отправить сообщение в очередь недоставленных сообщений после n попыток. (при условии, что будет указано n) - person Pradeep; 08.07.2015
comment
Добавьте retry interceptor к advice-chain слушателя. Настройте перехватчик с RejectAndDontRequeueRecoverer, который будет вызываться, когда количество повторных попыток исчерпано; сообщение будет отправлено в DLX / DLQ, если исходная очередь настроена таким образом. В качестве альтернативы вы можете повторно опубликовать непосредственно в другой очереди с дополнительной информацией (например, трассировкой стека исключения), используя RepublishMessageRecoverer. См. документацию для получения дополнительной информации. - person Gary Russell; 08.07.2015
comment
По умолчанию восстановление просто регистрируется и завершается обычным образом, поэтому сообщение просто регистрируется и удаляется. - person Gary Russell; 08.07.2015