Система управления рабочими процессами стала проще — объяснение на примерах Python
Apache Airflow — это система управления рабочими процессами, созданная Airbnb. С точки зрения непрофессионала, это можно представить как планировщик заданий на стероидах. Если у вас есть скрипты Python, которые нужно запускать по расписанию или в определенной последовательности, Apache Airflow — это удобный и надежный инструмент, который справляется как с этим, так и с другими задачами. Существует 4 компонента Apache Airflow: база данных метаданных, планировщик, исполнитель и веб-сервер. В этой статье будут затронуты компоненты и терминология, используемые в Apache Airflow, примеры Python о том, как настроить и использовать Apache Airflow, и, наконец, как запустить веб-сервер Apache Airflow.
Примечание. Apache Airflow работает только в Linux, который доступен в Windows через Docker или WSL2 (подсистема Windows для Linux).
Обновление: эта статья является частью серии. Ознакомьтесь с другими темами за 10 минут здесь!
Оглавление
- Что такое Apache Airflow
- Что такое задачи и DAG
- Установка воздушного потока
- Использование Airflow с Python
- Запуск веб-сервера
Что такое воздушный поток Apache
Apache Airflow — это система управления рабочими процессами, которая определяет задачи и их зависимости в виде кода и выполняет задачи по регулярному расписанию. Он предлагает веб-интерфейс, который отображает состояние текущих и прошлых задач с диагностической информацией о выполнении задачи. При необходимости пользователи также могут вручную управлять выполнением и состоянием задач, что делает их автоматизированными, но также допускает вмешательство человека.
Есть несколько компонентов Airflow, а именно
- База данных метаданных: хранит информацию о состоянии задач.
- Планировщик: определите, какие задачи необходимо выполнить, и определите приоритет выполнения.
- Исполнитель: процессы, выполняющие логику задач.
- Веб-сервер: сервер, поддерживающий графический интерфейс пользователя (GUI), который отображает информацию для пользователя.
Что такое задачи и DAG
Задача определяется как единица работы и может рассматриваться как задание Python, выполняющее сценарий. Экземпляр задачи выполняет одну задачу и имеет такое состояние, как running, success, failed, skipped или up for retry и т. д.
Есть два типа задач, а именно
- Оператор: выполнить операцию.
PythonOperatorможет выполнять любую функцию, которую можно выполнить в Python, также естьBashOperator,EmailOperator,SimpleHttpOperatorи т. д. в зависимости от ваших потребностей - Датчик: приостанавливает выполнение зависимых задач до тех пор, пока не будет выполнено какое-либо условие. Сенсор может проверять состояние любого процесса или структуры данных
Два типа задач обычно выполняются в последовательности sensor > operator, где задача датчика может приостановить выполнение задачи оператора до тех пор, пока не будет выполнено условие.
Задачи могут быть объединены в цепочку для выполнения в направленном ациклическом графе (DAG). DAG — это тип графа, который является направленным (есть последовательность выполнения задач) и ациклическим (задачи не могут образовывать цикл, чтобы он был бесконечным).
Установка воздушного потока
Airflow можно установить как пакет Python, и его база данных будет создана с помощью следующих команд:
$ pip install apache-airflow $ airflow db init
Теперь вы готовы использовать Apache Airflow!
Использование Airflow с Python
Существует 3 основных шага при использовании Apache Airflow. Во-первых, вам нужно определить DAG, указав расписание, когда необходимо запускать сценарии, кому отправлять электронные письма в случае сбоев задач и так далее. Затем вам нужно определить задачи оператора и задачи датчика, связав задачи с функциями Python. Наконец, вам нужно определить зависимость между задачами, указав последовательность задач.
Теперь давайте углубимся в то, как мы можем использовать Apache Airflow с Python!
1. Определение группы обеспечения доступности баз данных
Чтобы определить DAG, это просто объект DAG с параметрами, переданными в default_args. Необходимо определить несколько важных параметров,
owner: владелец DAGstart_date: дата начала запуска DAG, это должна быть историческая дата.schedule_interval: интервал для запуска DAG, может быть определен с помощьюdatetime.timedeltaили строки в формате расписание CRON.email: адреса электронной почты для отправки писем в случае неудачи или повторной попыткиretries: количество повторных попытокretry_delay: задержка между каждой повторной попыткой
Дополнительные параметры можно найти в документации Apache Airflow. Ниже приведен пример создания экземпляра DAG.
import datetime from airflow.models import DAG # Parameteres WORFKLOW_DAG_ID = "my_example_dag" WORFKFLOW_START_DATE = datetime.datetime(2022, 1, 1) WORKFLOW_SCHEDULE_INTERVAL = "* * * * *" WORKFLOW_EMAIL = ["[email protected]"] WORKFLOW_DEFAULT_ARGS = { "owner": "kayjan", "start_date": WORFKFLOW_START_DATE, "email": WORKFLOW_EMAIL, "email_on_failure": False, "email_on_retry": False, "retries": 0, } # Initialize DAG dag = DAG( dag_id=WORFKLOW_DAG_ID, schedule_interval=WORKFLOW_SCHEDULE_INTERVAL, default_args=WORKFLOW_DEFAULT_ARGS, )
2. Определение задач
Задачи должны быть связаны с соответствующими функциями Python. Имена задач указываются в task_id и должны быть уникальными, а функция Python указывается с аргументом python_callable.
В приведенном ниже примере я определяю 2 функции Python job_1 и job_2, фиктивную функцию sensor_job для датчика и связываю ее с задачами оператора и датчика.
from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor
# Define functions
def job_1():
print("Perform job 1")
def job_2():
print("Perform job 2")
def sensor_job():
print("Sensor Job")
# Define jobs
jobrunningoperator = PythonOperator(
task_id="task_job_1",
python_callable=job_1,
dag=dag,
)
jobsuccesssensor = PythonSensor(
task_id="task_jobsuccesssensor",
python_callable=sensor_job,
dag=dag,
poke_interval=180,
)
jobsuccessoperator = PythonOperator(
task_id="task_job_2",
python_callable=job_2,
dag=dag,
)
3. Определение зависимости между задачами
После определения задач нам нужно определить порядок, в котором задачи должны выполняться. Это можно сделать с помощью оператора сдвига битов >>, который имеет соглашение:
preceding_task >> subsequent_task, orsubsequent_task << preceding_task
Зависимость также может быть определена с помощью кода Pythonic,
preceding_task.set_downstream(subsequent_task), orsubsequent_task.set_upstream(preceding_task)
Лично я предпочитаю использовать оператор сдвига битов, потому что его можно накладывать друг на друга и он более удобочитаем.
# Set dependency jobrunningoperator >> jobsuccesssensor >> jobsuccessoperator
Я поместил весь код выше в каталог dags_folder/dag.py. Запустите код один раз с python3 dags_folder/dag.py, чтобы проверить наличие ошибок. Когда вы закончите, скопируйте файл по пути к файлу, указанному в ~/airflow/airflow.cfg. Либо измените файл airflow.cfg так, чтобы он указывал на ваш каталог dags_folder, используя абсолютный путь.
$ cp dags_folder/dag.py /home/kayjan/airflow/dags/
Запуск веб-сервера
Перед просмотром вашей DAG на веб-сервере Airflow необходимо выполнить еще несколько шагов.
Во-первых, пользователь должен быть зарегистрирован, поскольку для веб-сервера Airflow требуются учетные данные для входа. Мы можем зарегистрировать пользователя и удалить пользователя, используя следующие команды соответственно:
$ airflow users create -r Admin -u <username> -p <password> -f <first_name> -l <last_name> -e <email> $ airflow users delete -u <username>
Затем запустите планировщик Airflow. Обратите внимание, что для запуска DAG по расписанию должен быть запущен планировщик Airflow.
$ airflow scheduler
Проверьте, присутствует ли ваша DAG, выполнив команду airflow dags list. Может быть много других DAG, которые являются примерами DAG, которые вы можете пока игнорировать.
Наконец, откройте новый терминал и запустите веб-сервер с помощью следующей команды:
$ airflow webserver
На веб-сервере можно просматривать исторические запуски и их состояния в формате дерева или просматривать DAG в формате графика. Существует также другая информация, такая как параметры DAG, код DAG, журналы задач и многое другое. Вы можете вручную запустить запуск DAG с помощью кнопки воспроизведения в правом верхнем углу.



Надеюсь, вы изучили основы Apache Airflow и как настроить Airflow DAG с кодом Python. В Apache Airflow появилось много новых терминов, которые могут сбивать с толку. После этого введения вы сможете лучше понимать документацию, форум и другие статьи Apache Airflow.
В Apache Airflow есть много других функций, таких как распределенное выполнение задач с помощью Celery или Kubernetes Executors, обмен информацией между операторами с помощью XCom, определение собственных операторов и датчиков с помощью хуков. Продолжение может быть, если будет спрос ;)
Наконец, коды, используемые в этой статье, приведены ниже.
Спасибо за прочтение! Если вам понравилась эта статья, поделитесь ею.
Ссылки по теме
Документация по Apache Airflow: https://airflow.apache.org/
Учебное пособие по Apache Airflow: https://airflow-tutorial.readthedocs.io/en/latest/index.html
Официальный GitHub Apache Airflow: https://github.com/apache/airflow