ActiveMQ: очередь недоставленных сообщений сохраняет порядок моих сообщений

Я использую ActiveMQ в качестве брокера для доставки сообщений. Эти сообщения предназначены для написания в dabatase. Иногда база данных недоступна или недоступна. В этом случае я хочу откатить свое сообщение, чтобы повторить попытку позже, и я хочу продолжить чтение других сообщений.

Этот код работает нормально, за исключением одного момента: откатное сообщение не позволяет мне читать остальные:

private Connection getConnection() throws JMSException {
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setMaximumRedeliveries(3); // will retry 3 times to dequeue rollbacked messages
    redeliveryPolicy.setInitialRedeliveryDelay(5 *1000);  // will wait 5s to read that message

    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    Connection connection = connectionFactory.createConnection();
    ((ActiveMQConnection)connection).setUseAsyncSend(true);
    ((ActiveMQConnection)connection).setDispatchAsync(true);
    ((ActiveMQConnection)connection).setRedeliveryPolicy(redeliveryPolicy);
    ((ActiveMQConnection)connection).setStatsEnabled(true);
    connection.setClientID("myClientID");
    return connection;
}

Я создаю свою сессию таким образом:

session = connection.createSession(true, Session.SESSION_TRANSACTED);

Откат легко спросить:

session.rollback();

Представим, что у меня в очереди 3 сообщения:

1: ok
2: KO (will need to be treated again : the message I want to rollback)
3: ok
4: ok

Мой потребитель сделает (линейная последовательность):

commit 1 
rollback 2
wait 5s
rollback 2
wait 5s
rollback 2
put 2 in dead letter queue (ActiveMQ.DLQ)
commit 3
commit 4

Но я хочу :

commit 1
rollback 2
commit 3
commit 4
wait 5s
rollback 2
wait 5s
rollback 2
wait 5s
put 2 in dead letter queue (ActiveMQ.DLQ)

Итак, как я могу настроить моего потребителя, чтобы откладывать откат моих сообщений позже?


person Community    schedule 13.07.2010    source источник


Ответы (3)


На самом деле это ожидаемое поведение, поскольку повторные попытки сообщения обрабатываются клиентом, а не брокером. Итак, поскольку у вас есть привязка к 1 сеансу, и ваша политика повторных попыток настроена для трех повторных попыток перед DLQ, то весь процесс повторных попыток блокирует этот конкретный поток.

Итак, мой первый вопрос заключается в том, что если вставка в базу данных не удалась, разве вы не ожидали бы, что все остальные вставки в вашу базу данных завершатся сбоем по той же причине?

Если нет, способ обойти это - установить для этой очереди политику повторов, равную 0 попыткам, с определенным DLQ, чтобы сообщения немедленно завершались ошибкой и попадали в DLQ. Затем используйте другой процесс, который отключает DLQ каждые 5 секунд и повторно обрабатывает и / или помещает его обратно в основную очередь для обработки.

person Community    schedule 18.03.2011
comment
спасибо за четкое объяснение поведения. Наконец, мы выбрали RabbitMQ в качестве брокера и самостоятельно реализовали DLQ. - person Jean-Philippe Caruana; 19.03.2011

Вы используете <strictOrderDispatchPolicy /> в XML-файле конфигурации ActiveMQ? Я не уверен, повлияет ли это на порядок сообщений для повторной отправки или нет. Если вы используете строгую отправку заказов, попробуйте закомментировать эту политику, чтобы увидеть, изменит ли это поведение.

Брюс

person Community    schedule 13.07.2010
comment
Нет, я не использую эту политику. Спасибо за ссылку. - person Jean-Philippe Caruana; 15.07.2010

У меня была такая же проблема, я не нашел здесь решения, поэтому решил опубликовать его здесь после того, как нашел его для людей, которые борются с тем же. Это исправлено до версии 5.6, когда вы устанавливаете для свойства nonBlockingRedelivery значение true в фабрике соединений:

<property name="nonBlockingRedelivery" value="true" />
person Community    schedule 25.02.2014