ActiveMQ: несколько потребителей подключены к одной очереди, но только один потребитель получает все сообщения

В настоящее время я использую NMS для разработки ActiveMQ (5.6) на основе приложений.

У нас есть несколько потребителей (exe), пытающихся получить сообщения из одной очереди (не по теме). Хотя все сообщения отправляются только одному потребителю, хотя я заставляю потребителя спать в течение нескольких секунд после получения сообщения. Кстати, мы не хотим, чтобы потребители получали те же сообщения, что и другие потребители.

На официальном веб-сайте упоминается, что мы должны установить Prefetch Limit, чтобы решить, сколько сообщений может быть передано потребителю в любой момент времени. И его можно как настроить, так и закодировать.

Один из способов, который я пробовал, — кодировать с использованием класса PrefetchPolicy, связывающего класс ConnectionFactory, как показано ниже.

PrefetchPolicy poli = new PrefetchPolicy();
poli.QueuePrefetch = 0;
ConnectionFactory fac = new ConnectionFactory("activemq:tcp://Localhost:61616?jms.prefetchPolicy.queuePrefetch=1");
fac.PrefetchPolicy = poli;
using (IConnection con = fac.CreateConnection())
{
   using (ISession se = con.CreateSession())
   {
       IDestination destination = SessionUtil.GetDestination(se, queue, DestinationType.Queue);
       using (IMessageConsumer consumer = se.CreateConsumer(queue1))
       {
          con.Start();
          while (true)
          {
              ITextMessage message = consumer.Receive() as ITextMessage;
              Thread.Sleep(2000);
              if (message != null)
              {
                Task.Factory.StartNew(() => extractAndSend(message.Text));   //do something
              }
              else
              {
                Console.WriteLine("No message received~");
              }
        }
       }
   }
}

Но независимо от того, какое значение предварительной выборки я установил, поведение потребителей остается таким же, как и раньше.

И я попробовал второй способ привязки, чтобы получить результат, а именно настроить файл conf сервера. Я изменяю файл activemq.xml сервера, как показано ниже. " ProducerFlowControl="true" memoryLimit="5mb" /> " ProducerFlowControl="true" memoryLimit="5mb"> Но хотя я установил политику отправки, сообщения по-прежнему отправляются одному потребителю.

Я хочу знать, что: можно ли добиться такого поведения, просто настроив файл xml сервера, чтобы все потребители могли получать сообщения из одной очереди? Если да, то как это настроить и что не так с моей конфигурацией? Если нет, то как я могу использовать коды для достижения цели? Спасибо.


person ladygaga    schedule 18.06.2013    source источник


Ответы (1)


Взгляните на функцию «Группы сообщений».

У меня такая же проблема. Только один потребитель обрабатывал все сообщения. Я обнаружил в своем коде, что использовал заголовок группы во время отправки:

request.Properties["NMSXGroupID"] = "cheese";

Согласно официальным документам:

Стандартный заголовок JMS JMSXGroupID используется для определения того, к какой группе сообщений принадлежит сообщение. Затем функция группы сообщений гарантирует, что все сообщения для одной и той же группы сообщений будут отправлены одному и тому же потребителю JMS, пока этот потребитель остается активным. Как только потребитель умрет, будет выбран другой.

Полную информацию см. на странице http://activemq.apache.org/message-groups.html.

person Igor Cherednichenko    schedule 26.02.2014