Запуск дагов с разной частотой | Поток воздуха

Я оценивал воздушный поток. У меня есть этот вариант использования, когда у меня есть рабочий процесс, который выполняется каждый час для получения почасовых агрегатов данных. и еще один, который запускается каждый день, чтобы получать ежедневные агрегаты того же самого. Можно ли создать комбинированный рабочий процесс, в котором ежедневное агрегирование будет выполняться только в том случае, если все почасовые агрегаты были успешными за прошедший день? Я видел, что вы можете создавать суб-даги, но могут ли эти два дага работать с разной частотой? Если да, то как?


person sidd607    schedule 30.06.2016    source источник
comment
Для уверенности! Посмотрите этот пост: stackoverflow.com/questions/38022323/ Я думаю, что это очень похоже.   -  person p.magalhaes    schedule 03.07.2016


Ответы (2)


Не уверен, как вы хотите, чтобы это работало, но, хотя простого способа сделать это нет, есть несколько способов, которыми вы могли бы использовать обширный набор операторов воздушного потока для создания такого dag.

Например, вы можете создать ежечасные даги depend_on_past, а затем использовать оператор ветвления python, чтобы запустить / запустить дневную задачу агрегации / даг в конце почасового дага для последнего прогона дня. Обратите внимание на PythonBranchOperator и TriggerDagRunOperator.

Вы также можете создать свой собственный датчик для ежедневного агрегатора, чтобы убедиться, что все почасовые даги за этот день выполнены. Проверьте ExternalTaskSensor для справки.

person Vineet Goel    schedule 15.08.2016

Это может быть некрасиво, но с помощью PythonOperator есть довольно простой способ сделать это «за кулисами»:

dag = DAG('hourly_daily_update_v0',
          schedule_interval='@hourly')

hourly_update = PythonOperator(task_id='update_hourly_v0',
                               python_callable=update_hourly,
                               provide_context=True,
                               dag=dag)

daily_update = PythonOperator(task_id='update_daily_v0',
                               python_callable=update_daily,
                               provide_context=True,
                               dag=dag)

Таким образом, вы звоните и ежечасно, и ежедневно в Airflow way. Однако в вызове update_daily () вы можете проверить час:

def update_daily(**context):
    if context['execution_date'].hour == 0: # hour 0
        # Do all the things!
    else:
        # Do none of the things!

Airflow будет успешно запускать update_daily () 24 раза в день, но на самом деле он будет выполнять свою работу только один раз в час 0. Вы можете продлить это как угодно. Единственная проблема заключается в небольшом отклонении от предполагаемого шаблона Airflow, который вызовет некоторую дезинформацию между часами 1 и 24.

person Mathias Andersen    schedule 18.11.2019