Celery: чистый способ отозвать всю цепочку из задачи

Мой вопрос, вероятно, довольно простой, но все же я не могу найти решение в официальном документе. Я определил цепочку Celery внутри своего приложения Django, выполняя набор задач, зависящих друг от друга:

chain(  tasks.apply_fetching_decision.s(x, y),
        tasks.retrieve_public_info.s(z, x, y),
        tasks.public_adapter.s())()

Очевидно, что вторая и третья задачи нуждаются в выводе родителя, поэтому я использовал цепочку.

Теперь вопрос: мне нужно программно отменить 2-ю и 3-ю задачи, если условие проверки в 1-й задаче не выполняется. Как это сделать по-чистому? Я знаю, что могу отозвать задачи цепочки из метода, в котором я определил цепочку (см. thisвопрос и этот документ), но внутри первой задачи я не вижу ни последующих задач, ни самой цепочки.

Временное решение

Мое текущее решение состоит в том, чтобы пропустить вычисления внутри последующих задач на основе результата предыдущей задачи:

@shared_task
def retrieve_public_info(result, x, y):
   if not result:
      return []
   ...

@shared_task
def public_adapter(result, z, x, y):
   for r in result:
       ...

Но у этого «обходного пути» есть недостаток:

  • Добавляет ненужную логику к каждой задаче (на основе результата предыдущей), ставя под угрозу повторное использование
  • По-прежнему выполняет последующие задачи со всеми возникающими накладными расходами

Я не слишком много играл с передачей ссылок на цепочку задачам из-за боязни все испортить. Я также признаю, что не пробовал подход с выбрасыванием исключений, потому что я думаю, что выбор не проходить через цепочку может быть функциональным (таким образом, не исключительным) сценарием...

Спасибо за помощь!


person Alex Gidan    schedule 21.05.2014    source источник
comment
возможный дубликат Celery остановить выполнение цепочки   -  person Maxime Lorant    schedule 22.05.2014
comment
Вместо того, чтобы передавать только значение результата, вы можете передать кортеж результата и сообщение об успешном выполнении. Проще говоря, это может быть реализовано как Error Monad или Maybe Monad: /All_About_Monads#The_Error_monad en.wikipedia.org/wiki/   -  person Joe Frambach    schedule 22.05.2014
comment
Это не мешает выполнению N последующих задач цепочки. Чего я пытаюсь избежать, так это накладных расходов на задачи, которые будут вызываться и выполняться напрасно.   -  person Alex Gidan    schedule 22.05.2014


Ответы (2)


Я думаю, что нашел ответ на этот вопрос: это кажется правильный путь, действительно. Интересно, почему такой распространенный сценарий нигде не задокументирован.

Для полноты я публикую базовый снимок кода:

@app.task(bind=True)  # Note that we need bind=True for self to work
def task1(self, other_args):
    #do_stuff
    if end_chain:
        self.request.callbacks[:] = []
    ....

Обновлять

Я реализовал более элегантный способ справиться с проблемой и хочу поделиться им с вами. Я использую декоратор под названием revoke_chain_authority, чтобы он мог автоматически отзывать цепочку, не переписывая ранее описанный код.

from functools import wraps

class RevokeChainRequested(Exception):
    def __init__(self, return_value):
        Exception.__init__(self, "")

        # Now for your custom code...
        self.return_value = return_value


def revoke_chain_authority(a_shared_task):
    """
    @see: https://gist.github.com/bloudermilk/2173940
    @param a_shared_task: a @shared_task(bind=True) celery function.
    @return:
    """
    @wraps(a_shared_task)
    def inner(self, *args, **kwargs):
        try:
            return a_shared_task(self, *args, **kwargs)
        except RevokeChainRequested, e:
            # Drop subsequent tasks in chain (if not EAGER mode)
            if self.request.callbacks:
                self.request.callbacks[:] = []
            return e.return_value

    return inner

Этот декоратор можно использовать на shared task следующим образом:

@shared_task(bind=True)
@revoke_chain_authority
def apply_fetching_decision(self, latitude, longitude):
    #...

    if condition:
        raise RevokeChainRequested(False)

Пожалуйста, обратите внимание на использование @wraps. Необходимо сохранить сигнатуру исходной функции, иначе эта последняя будет потеряна и celery будет гадить при вызове нужной обернутой задачи (например, всегда будет вызывать первую зарегистрированную функцию вместо нужной)

person Alex Gidan    schedule 22.05.2014

Что касается Celery 4.0, то, что я обнаружил, работает, так это удаление оставшихся задач из текущей запрос экземпляра задачи с помощью инструкции:

self.request.chain = None

Допустим, у вас есть цепочка задач a.s() | b.s() | c.s(). Вы можете получить доступ к переменной self внутри задачи, только если вы свяжете задачу передав bind=True в качестве аргумента декоратору задач.

@app.task(name='main.a', bind=True):
def a(self):
  if something_happened:
    self.request.chain = None

Если something_happened верно, b и c не будут выполнены.

person Bruno Henrique    schedule 13.11.2016
comment
и как его следует использовать? Например: у меня есть идентификатор, который возвращает chain_task.apply_async(), что я могу сделать, чтобы отозвать эту задачу? - person ailin; 15.12.2016
comment
@ailin Только что отредактировал ответ. Я не знаю, как отозвать задачу по ее идентификатору. Это решение полезно, если вам нужно условно остановить цепочку внутри какой-то задачи. - person Bruno Henrique; 15.12.2016
comment
о, понял спасибо. Кстати, я нашел подход, который не был элегантным, но помог мне: stackoverflow.com/a/23908345/6696051 - person ailin; 17.12.2016
comment
Интересно, насколько «официальным» является это решение; кажется мне несколько хакерским. Хотя кажется, что это работает, я не могу найти документацию о том, что это поддерживаемый способ прерывания цепочки задач... - person Jens; 13.01.2017