Предварительная выборка RabbitMQ игнорируется, когда потребитель не работает и встает

Я игнорирую свои basicQos, когда потребитель не работает, а после этого потребитель встает. Например, предположим, что потребитель не работает, а от производителя поступает 5 сообщений. Если потребитель не работает, эти сообщения будут храниться на диске (я думаю!), если обменник/очередь устойчивы.

если я устанавливаю basicQos как channel.basicQos(0, 3, true), мой потребитель получает более 3 сообщений, когда он получает UP. Почему?!?

С другой стороны, все работает правильно (из очереди читаются только 3 сообщения), если потребитель работает, когда он получает сообщения из очередей... Мой код выглядит следующим образом:

factory = new ConnectionFactory();
factory.setHost(mRabbitMQHost); //may get server address from file configuration.
factory.setUsername(mRabbitMQUsername); 
factory.setPassword(mRabbitMQPassword);
connection = factory.newConnection();
channel = connection.createChannel();

channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
consumer = new QueueingConsumer(channel);

for (QGQueues queue : QGQueues.values()) {
    String queueName = queue.getQueueName();
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
    channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server    
}

channel.basicQos(0, 3, true);

Спасибо!


person Thiago Sales    schedule 01.09.2015    source источник
comment
Возможно, я решил проблему. BasicQos должен быть вызван перед определением потребителей.   -  person Thiago Sales    schedule 02.09.2015


Ответы (1)


Могу поспорить, что вам нужно установить QoS, прежде чем делать что-либо еще.

Измените свой код в следующем порядке:



channel = connection.createChannel();

// set QoS immediately
channel.basicQos(0, 3, true);

channel.exchangeDeclare("exchangeName", "direct", true); //True enables durability
consumer = new QueueingConsumer(channel);

for (QGQueues queue : QGQueues.values()) {
    String queueName = queue.getQueueName();
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, "exchangeName", queue.getRoutingKey());
    channel.basicConsume(queueName, false, consumer); //false enables ACK message to RabbitMQ server    
}

это гарантирует, что предел предварительной выборки будет установлен до того, как вы попытаетесь использовать какие-либо сообщения.

person Derick Bailey    schedule 01.09.2015
comment
Спасибо, Дерик! Это точно решение! - person Thiago Sales; 03.09.2015
comment
Да, basicQos сообщает RabbitMQ, как потребитель ожидает получать сообщения, поэтому оно должно быть отправлено до basicConsume. - person old_sound; 28.09.2015