Celery: игнорирует имя биржи при отправке задачи

У меня есть очень простой код:

celery = Celery(broker="amqp://guest:[email protected]:5672/")                                                                                                                                                                                                           
celery.send_task(                                                                                                                                                                                                                            
    "robot.worker",                                                                                                                                                                                                                
    kwargs={},                                                                                                                                                                                                                               
    exchange="I_am_useless", # with  exchange=Exchange("I_am_useless") I got the same results                                                                                                                                                                                                          
)                                                                                                                                                                                                                                            

Мне очень нужно, чтобы задача была отправлена ​​на биржу "I_am_useless", однако она туда не отправляется, при отладке на уровне протокола AMQP я вижу, что событие публикации отправляется (что нормально), но со следующей характеристикой

reserved-1': 0,
'exchange-name': '',
'routing-key': 'celery',
'mandatory': False,
'immediate': False

поэтому кажется, что параметр полностью игнорируется, так как даже во время объявления обмена используется имя «сельдерей». но согласно документации send_task принимает те же параметры, что и https://docs.celeryproject.org/en/latest/reference/celery.app.task.html#celery.app.task.Task.apply_async , и мы видим, что параметр «exchange», который должен принимать строку.

однако, если я позвоню комбу напрямую

rabbit_url = "amqp://guest:[email protected]:5672/"                                                                                                                                                                                           
conn = Connection(rabbit_url)                                                                                                                                                                                                               
channel = conn.channel()                                                                                                                                                                                                                    
exchange = Exchange("example-exchange", type="direct")                                                                                                                                                                                      
producer = Producer(exchange=exchange, channel=channel, routing_key="BOB")                                                                                                                                                                  
queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")                                                                                                                                                                   
queue.maybe_bind(conn)                                                                                                                                                                                                                      
queue.declare()                                                                                                                                                                                                                             
producer.publish("Hello there!")                                                                                                                                                                                                            

Я правильно вижу имя биржи, поэтому мне интересно, что я делаю неправильно?


person allan.simon    schedule 24.09.2019    source источник


Ответы (1)


Изучив код и немного экспериментов, кажется, что вам нужно также указать аргументы exchange_type и routing_key при вызове send_task.

person Tomáš Linhart    schedule 24.09.2019
comment
большое спасибо ! Я планировал сделать это в крайнем случае на случай, если я не получу ответа, но вы сделали это до меня, поэтому я думаю, что открою ошибку (ошибку документации?), поскольку из документации по крайней мере не ясно, что это фактическое поведение - person allan.simon; 24.09.2019
comment
Некоторые подсказки можно найти в документах, однако Я признаю, что это не явно. - person Tomáš Linhart; 25.09.2019
comment
Однако интересно то, что приведенный выше код объявляет обмен сельдереем прямого типа, но затем не использует его. Мне нужно покопаться в разделах AMQP/Rabbitmq, потому что прямо сейчас это вызывает ошибку, так как я использую пользовательскую реализацию протокола AMQP, где анонимный обмен не существует (поэтому сообщение не может быть опубликовано, поэтому мне нужно чтобы проверить, является ли это ошибкой в ​​​​моей реализации amqp или недокументированной функцией rabbitmq) - person allan.simon; 25.09.2019
comment
хорошо, я понимаю лучше AMQP0.9.1 определяет пустую строку обмена по умолчанию, которая привязана ко всей очереди. (чего не делала моя пользовательская реализация), поэтому celery с приведенным выше кодом определяет обмен celery , очередь celery (с ключом маршрутизации celery), которая привязана на стороне rabbitmq к exchange , поэтому публикация выше работала, потому что сообщение было опубликовано на этом обмене по умолчанию, поэтому все еще поступает в правильную очередь, хотя и не через обмен, объявленный кодом. (Я собираюсь отредактировать ваш ответ с вашим разрешением, чтобы добавить эту точность?) - person allan.simon; 25.09.2019
comment
Не стесняйтесь редактировать ответ, чтобы лучше объяснить, что происходит внутри. - person Tomáš Linhart; 25.09.2019