У меня есть DAG
, а затем всякий раз, когда он успешен или терпит неудачу, я хочу, чтобы он запускал метод, который отправляет сообщения в Slack.
Мой DAG args
выглядит следующим образом:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
И само определение DAG
:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
Но когда я проверяю Slack, каждую минуту появляется более 100 сообщений, как если бы они оценивались при каждом пульсе планировщика, и для каждого журнала он выполнял метод успеха и отказа, как если бы он работал и не работал для одного и того же экземпляра задачи (не отлично).
Как мне правильно использовать on_failure_callback
и on_success_callback
для обработки статусов dags и вызова пользовательского метода?
on_failure_callback
иon_success_callback
, почему бы просто не сделать слабое сообщение задачей в вашей DAG, поскольку вы запрашиваете сообщение, является ли задача успешной или неудачной. - person Zack   schedule 03.07.2018