активный обмен сообщениями с помощью stomp и activemq.prefetchSize=1

У меня есть ситуация, когда у меня есть один брокер activemq с двумя очередями, Q1 и Q2. У меня есть два потребителя на основе ruby, использующие активный обмен сообщениями. Назовем их С1 и С2. Оба потребителя подписываются на каждую очередь. Я устанавливаю activemq.prefetchSize=1 при подписке на каждую очередь. Я также устанавливаю ack=client.

Рассмотрим следующую последовательность событий:

1) Сообщение, запускающее длительное задание, публикуется в очереди Q1. Назовите это М1.

2) M1 отправляется потребителю C1, запуская длительную операцию.

3) Два сообщения, запускающие короткие задания, публикуются в очереди Q2. Назовите их М2 и М3.

4) M2 отправляется C2, который быстро выполняет короткую работу.

5) M3 отправляется на C1, хотя C1 все еще выполняет M1. Он может выполнять отправку на C1, поскольку prefetchSize=1 задан для подписки на очередь, а не для соединения. Таким образом, тот факт, что сообщение Q1 уже было отправлено, не мешает отправить одно сообщение Q2.

Поскольку потребители ActiveMessaging являются однопоточными, конечным результатом является то, что M3 сидит и ожидает на C1 в течение длительного времени, пока C1 не закончит обработку M1. Итак, М3 долго не обрабатывается, несмотря на то, что потребитель С2 сидит без дела (поскольку быстро заканчивает с сообщением М2).

По сути, всякий раз, когда выполняется длинное задание Q1, а затем создается целая куча коротких заданий Q2, ровно одно из коротких заданий Q2 застревает на потребителе, ожидающем завершения длинного задания Q1.

Есть ли способ установить prefetchSize на уровне подключения, а не на уровне подписки? Я действительно не хочу, чтобы какие-либо сообщения отправлялись на C1, пока он обрабатывает M1. Другая альтернатива заключается в том, что я мог бы создать потребителя, предназначенного для обработки Q1, а затем выделить других потребителей для обработки Q2. Но я бы предпочел не делать этого, поскольку сообщения Q1 нечасты — преданные потребители Q1 будут сидеть без дела большую часть дня, занимая память.


person Clint Miller    schedule 03.06.2010    source источник


Ответы (2)


Activemq.prefetchSize доступен только в сообщении SUBSCRIBE, а не в сообщении CONNECT, в соответствии с документацией ActiveMQ для их расширенных заголовков stomp (http://activemq.apache.org/stomp.html). Вот соответствующая информация:

глагол: ПОДПИСАТЬСЯ

заголовок: activemq.prefetchSize

тип: инт

описание: указывает максимальное количество ожидающих сообщений, которые будут отправлены клиенту. Как только этот максимум достигнут, сообщения больше не отправляются, пока клиент не подтвердит сообщение. Установите значение 1 для очень справедливого распределения сообщений между потребителями, где обработка сообщений может быть медленной.

Мое чтение и опыт в этом заключаются в том, что, поскольку M1 не был подтвержден (потому что у вас включено подтверждение клиента), это M1 должно быть 1 сообщением, разрешенным prefetchSize=1, установленным в подписке. Я удивлен, узнав, что это не сработало, но, возможно, мне нужно провести более подробный тест. Ваши настройки должны соответствовать желаемому поведению.

Я слышал от других о ненадежности по поводу отправки ActiveMQ, поэтому, возможно, это ошибка используемой вами версии.

У меня есть одно предложение: либо прослушивать сетевой трафик, чтобы увидеть, получает ли M1 подтверждение по какой-либо причине, либо добавить несколько операторов puts в ruby ​​stomp gem, чтобы наблюдать за общением (это то, что я обычно делаю, когда отладка проблем с топанием).

Если у меня будет возможность попробовать это, я обновлю свой комментарий своими собственными результатами.

Одно предложение: вполне возможно, что может быть отправлено несколько длинных сообщений обработки, и если количество длинных сообщений обработки превышает количество ваших процессов, вы окажетесь в этом исправлении, где ожидают сообщения быстрой обработки.

У меня обычно есть по крайней мере один выделенный процесс, который просто выполняет быструю работу, или, другими словами, выделяю набор # процессов, которые просто выполняют более длительную работу. Если все процессы-потребители опроса прослушивают как длинные, так и короткие, это может привести к неоптимальным результатам независимо от того, что делает диспетчеризация. Группы процессов — это способ настроить потребителя для прослушивания подмножества адресатов: http://code.google.com/p/activemessaging/wiki/Configuration

имя группы_процессоров, *list_of_processors

A processor group is a way to run the poller to only execute a subset of

процессоры, передав имя группы в аргументах командной строки опросчика.

You specify the name of the processor as its underscored lowercase

версия. Итак, если у вас есть FooBarProcessor и BarFooProcessor в группе процессоров, это будет выглядеть так:

    ActiveMessaging::Gateway.define do |s|
      ...
      s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor
    end

The processor group is passed into the poller like the following:

    ./script/poller start -- process-group=my_group
person Andrew Kuklewicz    schedule 03.06.2010
comment
Спасибо за ответ. Мы используем группы процессоров, чтобы различать задания с высоким и низким приоритетом. Задания с высоким приоритетом запускаются нашим пользовательским интерфейсом, а задания с низким приоритетом запускаются cron. Мы могли бы использовать здесь группу процессоров, но длительное задание на самом деле запускается только один раз в день. Мы могли бы вообще удалить его из activemq. Для долгой работы мы в основном просто используем activemq для обеспечения отказоустойчивости, поскольку у нас есть потребители на нескольких серверах. - person Clint Miller; 04.06.2010

Я не уверен, поддерживает ли ActiveMessaging это, но вы можете отказаться от подписки на других ваших потребителей, когда придет сообщение с длинной обработкой, а затем повторно подписаться на них после того, как оно будет обработано.

Это должно дать вам желаемый эффект.

person Hiram Chirino    schedule 04.06.2010