Как получить время выполнения цепочки DAG в Airflow?

Допустим, у меня есть два DAG, где dag2 выполнял dag1 как часть своего потока, используя TriggerDagRunOperator следующим образом:

  • dag1: задача1> задача2> задача3
  • dag2: task4> dag1> task5

Теперь предположим, что dag2 запланирован на один раз в день в 17:00. Есть ли способ получить метку времени выполнения для dag2 (родительский DAG), пока я запускаю dag1? Есть ли какой-либо встроенный параметр, содержащий это значение?

И если что-то произошло и dag2 был запущен позже, чем обычно, скажем, в 18:00 того же дня, тогда я все равно хочу получить исходное время планирования - это 17:00, пока я нахожусь в dag1.


person Maayan    schedule 13.09.2018    source источник


Ответы (1)


Передайте функцию python_callable аргументу TriggerDagRunOperator, которая вводит execution_date в запущенный DAG:

def inject_execution_date(context, dag_run_obj):
  dag_run_obj.payload = {"parent_execution_date": context["execution_date"]}
  return dag_run_obj

[...]

trigger_dro = TriggerDagRunOperator(python_callable=inject_execution_date, [...])

Вы можете получить доступ к этому в дочернем DAG с помощью context["conf"]["parent_execution_date"]

person joebeeson    schedule 13.09.2018
comment
Спасибо, Джоб! а этот executio_date это встроенный параметр? Мне самому нигде не нужно определять это? - person Maayan; 16.09.2018
comment
@Maayan да, он включен в Airflow - person joebeeson; 16.09.2018