Система управления рабочими процессами стала проще — объяснение на примерах Python
Apache Airflow — это система управления рабочими процессами, созданная Airbnb. С точки зрения непрофессионала, это можно представить как планировщик заданий на стероидах. Если у вас есть скрипты Python, которые нужно запускать по расписанию или в определенной последовательности, Apache Airflow — это удобный и надежный инструмент, который справляется как с этим, так и с другими задачами. Существует 4 компонента Apache Airflow: база данных метаданных, планировщик, исполнитель и веб-сервер. В этой статье будут затронуты компоненты и терминология, используемые в Apache Airflow, примеры Python о том, как настроить и использовать Apache Airflow, и, наконец, как запустить веб-сервер Apache Airflow.
Примечание. Apache Airflow работает только в Linux, который доступен в Windows через Docker или WSL2 (подсистема Windows для Linux).
Обновление: эта статья является частью серии. Ознакомьтесь с другими темами за 10 минут здесь!
Оглавление
- Что такое Apache Airflow
- Что такое задачи и DAG
- Установка воздушного потока
- Использование Airflow с Python
- Запуск веб-сервера
Что такое воздушный поток Apache
Apache Airflow — это система управления рабочими процессами, которая определяет задачи и их зависимости в виде кода и выполняет задачи по регулярному расписанию. Он предлагает веб-интерфейс, который отображает состояние текущих и прошлых задач с диагностической информацией о выполнении задачи. При необходимости пользователи также могут вручную управлять выполнением и состоянием задач, что делает их автоматизированными, но также допускает вмешательство человека.
Есть несколько компонентов Airflow, а именно
- База данных метаданных: хранит информацию о состоянии задач.
- Планировщик: определите, какие задачи необходимо выполнить, и определите приоритет выполнения.
- Исполнитель: процессы, выполняющие логику задач.
- Веб-сервер: сервер, поддерживающий графический интерфейс пользователя (GUI), который отображает информацию для пользователя.
Что такое задачи и DAG
Задача определяется как единица работы и может рассматриваться как задание Python, выполняющее сценарий. Экземпляр задачи выполняет одну задачу и имеет такое состояние, как running
, success
, failed
, skipped
или up for retry
и т. д.
Есть два типа задач, а именно
- Оператор: выполнить операцию.
PythonOperator
может выполнять любую функцию, которую можно выполнить в Python, также естьBashOperator
,EmailOperator
,SimpleHttpOperator
и т. д. в зависимости от ваших потребностей - Датчик: приостанавливает выполнение зависимых задач до тех пор, пока не будет выполнено какое-либо условие. Сенсор может проверять состояние любого процесса или структуры данных
Два типа задач обычно выполняются в последовательности sensor > operator
, где задача датчика может приостановить выполнение задачи оператора до тех пор, пока не будет выполнено условие.
Задачи могут быть объединены в цепочку для выполнения в направленном ациклическом графе (DAG). DAG — это тип графа, который является направленным (есть последовательность выполнения задач) и ациклическим (задачи не могут образовывать цикл, чтобы он был бесконечным).
Установка воздушного потока
Airflow можно установить как пакет Python, и его база данных будет создана с помощью следующих команд:
$ pip install apache-airflow $ airflow db init
Теперь вы готовы использовать Apache Airflow!
Использование Airflow с Python
Существует 3 основных шага при использовании Apache Airflow. Во-первых, вам нужно определить DAG, указав расписание, когда необходимо запускать сценарии, кому отправлять электронные письма в случае сбоев задач и так далее. Затем вам нужно определить задачи оператора и задачи датчика, связав задачи с функциями Python. Наконец, вам нужно определить зависимость между задачами, указав последовательность задач.
Теперь давайте углубимся в то, как мы можем использовать Apache Airflow с Python!
1. Определение группы обеспечения доступности баз данных
Чтобы определить DAG, это просто объект DAG
с параметрами, переданными в default_args
. Необходимо определить несколько важных параметров,
owner
: владелец DAGstart_date
: дата начала запуска DAG, это должна быть историческая дата.schedule_interval
: интервал для запуска DAG, может быть определен с помощьюdatetime.timedelta
или строки в формате расписание CRON.email
: адреса электронной почты для отправки писем в случае неудачи или повторной попыткиretries
: количество повторных попытокretry_delay
: задержка между каждой повторной попыткой
Дополнительные параметры можно найти в документации Apache Airflow. Ниже приведен пример создания экземпляра DAG.
import datetime from airflow.models import DAG # Parameteres WORFKLOW_DAG_ID = "my_example_dag" WORFKFLOW_START_DATE = datetime.datetime(2022, 1, 1) WORKFLOW_SCHEDULE_INTERVAL = "* * * * *" WORKFLOW_EMAIL = ["[email protected]"] WORKFLOW_DEFAULT_ARGS = { "owner": "kayjan", "start_date": WORFKFLOW_START_DATE, "email": WORKFLOW_EMAIL, "email_on_failure": False, "email_on_retry": False, "retries": 0, } # Initialize DAG dag = DAG( dag_id=WORFKLOW_DAG_ID, schedule_interval=WORKFLOW_SCHEDULE_INTERVAL, default_args=WORKFLOW_DEFAULT_ARGS, )
2. Определение задач
Задачи должны быть связаны с соответствующими функциями Python. Имена задач указываются в task_id
и должны быть уникальными, а функция Python указывается с аргументом python_callable
.
В приведенном ниже примере я определяю 2 функции Python job_1
и job_2
, фиктивную функцию sensor_job
для датчика и связываю ее с задачами оператора и датчика.
from airflow.operators.python import PythonOperator from airflow.sensors.python import PythonSensor # Define functions def job_1(): print("Perform job 1") def job_2(): print("Perform job 2") def sensor_job(): print("Sensor Job") # Define jobs jobrunning
operator = PythonOperator( task_id="task_job_1", python_callable=job_1, dag=dag, ) jobsuccess
sensor = PythonSensor( task_id="task_jobsuccess
sensor", python_callable=sensor_job, dag=dag, poke_interval=180, ) jobsuccess
operator = PythonOperator( task_id="task_job_2", python_callable=job_2, dag=dag, )
3. Определение зависимости между задачами
После определения задач нам нужно определить порядок, в котором задачи должны выполняться. Это можно сделать с помощью оператора сдвига битов >>
, который имеет соглашение:
preceding_task >> subsequent_task
, orsubsequent_task << preceding_task
Зависимость также может быть определена с помощью кода Pythonic,
preceding_task.set_downstream(subsequent_task)
, orsubsequent_task.set_upstream(preceding_task)
Лично я предпочитаю использовать оператор сдвига битов, потому что его можно накладывать друг на друга и он более удобочитаем.
# Set dependency jobrunning
operator >> jobsuccess
sensor >> jobsuccess
operator
Я поместил весь код выше в каталог dags_folder/dag.py
. Запустите код один раз с python3 dags_folder/dag.py
, чтобы проверить наличие ошибок. Когда вы закончите, скопируйте файл по пути к файлу, указанному в ~/airflow/airflow.cfg
. Либо измените файл airflow.cfg
так, чтобы он указывал на ваш каталог dags_folder
, используя абсолютный путь.
$ cp dags_folder/dag.py /home/kayjan/airflow/dags/
Запуск веб-сервера
Перед просмотром вашей DAG на веб-сервере Airflow необходимо выполнить еще несколько шагов.
Во-первых, пользователь должен быть зарегистрирован, поскольку для веб-сервера Airflow требуются учетные данные для входа. Мы можем зарегистрировать пользователя и удалить пользователя, используя следующие команды соответственно:
$ airflow users create -r Admin -u <username> -p <password> -f <first_name> -l <last_name> -e <email> $ airflow users delete -u <username>
Затем запустите планировщик Airflow. Обратите внимание, что для запуска DAG по расписанию должен быть запущен планировщик Airflow.
$ airflow scheduler
Проверьте, присутствует ли ваша DAG, выполнив команду airflow dags list
. Может быть много других DAG, которые являются примерами DAG, которые вы можете пока игнорировать.
Наконец, откройте новый терминал и запустите веб-сервер с помощью следующей команды:
$ airflow webserver
На веб-сервере можно просматривать исторические запуски и их состояния в формате дерева или просматривать DAG в формате графика. Существует также другая информация, такая как параметры DAG, код DAG, журналы задач и многое другое. Вы можете вручную запустить запуск DAG с помощью кнопки воспроизведения в правом верхнем углу.
Надеюсь, вы изучили основы Apache Airflow и как настроить Airflow DAG с кодом Python. В Apache Airflow появилось много новых терминов, которые могут сбивать с толку. После этого введения вы сможете лучше понимать документацию, форум и другие статьи Apache Airflow.
В Apache Airflow есть много других функций, таких как распределенное выполнение задач с помощью Celery или Kubernetes Executors, обмен информацией между операторами с помощью XCom, определение собственных операторов и датчиков с помощью хуков. Продолжение может быть, если будет спрос ;)
Наконец, коды, используемые в этой статье, приведены ниже.
Спасибо за прочтение! Если вам понравилась эта статья, поделитесь ею.
Ссылки по теме
Документация по Apache Airflow: https://airflow.apache.org/
Учебное пособие по Apache Airflow: https://airflow-tutorial.readthedocs.io/en/latest/index.html
Официальный GitHub Apache Airflow: https://github.com/apache/airflow