Airflow — правильный способ обработки обратных вызовов DAG

У меня есть DAG, а затем всякий раз, когда он успешен или терпит неудачу, я хочу, чтобы он запускал метод, который отправляет сообщения в Slack.

Мой DAG args выглядит следующим образом:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

И само определение DAG:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

Но когда я проверяю Slack, каждую минуту появляется более 100 сообщений, как если бы они оценивались при каждом пульсе планировщика, и для каждого журнала он выполнял метод успеха и отказа, как если бы он работал и не работал для одного и того же экземпляра задачи (не отлично).

Как мне правильно использовать on_failure_callback и on_success_callback для обработки статусов dags и вызова пользовательского метода?


person Julinho da Adelaide    schedule 03.07.2018    source источник
comment
Дубликат этого stackoverflow.com/questions/44586356/? Вместо того, чтобы использовать on_failure_callback и on_success_callback, почему бы просто не сделать слабое сообщение задачей в вашей DAG, поскольку вы запрашиваете сообщение, является ли задача успешной или неудачной.   -  person Zack    schedule 03.07.2018
comment
Не дубликат, этот вопрос конкретно касается использования обратных вызовов успеха/неудачи.   -  person cwurtz    schedule 03.07.2018


Ответы (3)


Причина, по которой он создает сообщения, заключается в том, что когда вы определяете свой default_args, вы выполняете функции. Вам нужно просто передать определение функции, не выполняя его.

Поскольку у функции есть аргумент, это станет немного сложнее. Вы можете либо определить две частичные функции, либо определить две функции-оболочки.

Итак, вы можете сделать:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

or

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

Обратите внимание, что в любом методе передаются только определения функций failure_msg и success_msg, а не результат, который они дают при выполнении.

person cwurtz    schedule 03.07.2018
comment
Это сработало. Но, кажется, порождает сообщение для каждой задачи, а не для каждого запуска дага. Мой код прямо сейчас создан для запуска при каждом запуске дага, я думаю адаптировать его для каждого запуска задачи, но тогда мне нужно получить его task_id, возможно ли это? Другой способ, который я думаю, - это сделать это один раз за прогон, поэтому мне не нужно менять код. Это тоже возможно? Огромное спасибо. - person Julinho da Adelaide; 03.07.2018
comment
@JulinhodaAdelaide Я думаю, что 1.9.0 может определить это только для каждого уровня задачи, 1.10.0 будет иметь доступное определение на основе DAG. - person tobi6; 03.07.2018
comment
tobi6, ты знаешь, как я могу получить task_id? - person Julinho da Adelaide; 03.07.2018

default_args расширяется на уровне задачи, поэтому он становится обратным вызовом для каждой задачи.

применить атрибут на уровне флага DAG за пределами «default_args»

person the pillow    schedule 14.05.2020
comment
Недооцененный ответ прямо здесь! - person javatarz; 24.06.2021

О каком методе slack вы говорите? Планировщик анализирует ваш файл DAG каждый такт, поэтому, если slack какая-то функция определена в вашем коде, она будет запускаться каждый такт.

Несколько вещей, которые вы можете попробовать:

  • Определите функции, которые вы хотите вызывать как PythonOperators, а затем вызывайте их на уровне задачи, а не на уровне DAG.

  • Вы также можете использовать TriggerRules для установки задач ниже по течению от вашей задачи ETL, которые будут запускаться в зависимости от сбоя или успеха родительской задачи.

Из документации: defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}

Вы можете найти пример того, как это будет выглядеть здесь (полное раскрытие - я м автор).

person Viraj Parekh    schedule 03.07.2018