Определение происхождения сообщений rabbitmq

Я использую kombu для получения сообщений rabbitmq из нескольких очередей. Можно ли для любого заданного сообщения определить, какая очередь его доставила?


person nerd31415926535    schedule 06.09.2018    source источник


Ответы (2)


Можно (и просто) различать очереди на основе очереди, из которой пришло сообщение, используя kombu. Вам просто нужно получить имя очереди через:

 print message.properties.get('user_id','no user id in message')

на стороне потребителя. Более подробное руководство по этому вопросу доступно по адресу https://adam.younglogic.com/2016/03/id-message-sender-kombu/

person Philip DiSarro    schedule 06.09.2018

Насколько я могу судить, в комбу нет прямого доступа к имени очереди, на которую приходит сообщение. решение, предложенное Филиппом, не является достаточно общим и, как ни странно, зависит от user_id, указанного в вызове публикации. Это представляет две проблемы:

  • Не каждый производитель может отправлять информацию user_id вместе с сообщением. В нашем тестовом примере user_id никогда не устанавливался по умолчанию. Кроме того, было бы не более выгодно использовать поле user_id, чем просто кодировать имя очереди в самом сообщении.
  • Преимущество использования ключей маршрутизации при публикации сообщений заключается в том, что с точки зрения издателя нам не нужно заботиться о том, куда отправляется сообщение. Если мы кодируем имена очередей (каким-либо образом, user_id или иным образом) в опубликованных сообщениях, то маршрутизация теряет смысл.

Мы определили два возможных решения при использовании ConsumerMixin:

  • Установка поля kombu Consumer_tag в объектах Потребителей, а затем определение Потребителей для потребления только в одной очереди. Если вам нужно прослушивать несколько очередей, определите несколько потребителей. Этот атрибут далеко не идеален, поскольку он не задокументирован и включает сопоставление строк, поскольку к полю добавляется целое число.
  • Использование частичных функций Python для переноса функции обратного вызова Consumer и передачи имени очереди:

    con = Consumer(queue=queue, callback=[partial(self.callback, queue_name=queue.name)])
    ...
    def callback(self, body, message, queue_name):
    

Ни одно из этих решений не является особенно элегантным. Лучше было бы, если бы kombu просто включала бы ссылку на очередь, в которой было получено сообщение, вместе с сообщением, отправленным обратному вызову Потребителя.

person Zack Panitzke    schedule 11.09.2018