Система управления рабочими процессами стала проще — объяснение на примерах Python

Apache Airflow — это система управления рабочими процессами, созданная Airbnb. С точки зрения непрофессионала, это можно представить как планировщик заданий на стероидах. Если у вас есть скрипты Python, которые нужно запускать по расписанию или в определенной последовательности, Apache Airflow — это удобный и надежный инструмент, который справляется как с этим, так и с другими задачами. Существует 4 компонента Apache Airflow: база данных метаданных, планировщик, исполнитель и веб-сервер. В этой статье будут затронуты компоненты и терминология, используемые в Apache Airflow, примеры Python о том, как настроить и использовать Apache Airflow, и, наконец, как запустить веб-сервер Apache Airflow.

Примечание. Apache Airflow работает только в Linux, который доступен в Windows через Docker или WSL2 (подсистема Windows для Linux).

Обновление: эта статья является частью серии. Ознакомьтесь с другими темами за 10 минут здесь!

Оглавление

Что такое воздушный поток Apache

Apache Airflow — это система управления рабочими процессами, которая определяет задачи и их зависимости в виде кода и выполняет задачи по регулярному расписанию. Он предлагает веб-интерфейс, который отображает состояние текущих и прошлых задач с диагностической информацией о выполнении задачи. При необходимости пользователи также могут вручную управлять выполнением и состоянием задач, что делает их автоматизированными, но также допускает вмешательство человека.

Есть несколько компонентов Airflow, а именно

  • База данных метаданных: хранит информацию о состоянии задач.
  • Планировщик: определите, какие задачи необходимо выполнить, и определите приоритет выполнения.
  • Исполнитель: процессы, выполняющие логику задач.
  • Веб-сервер: сервер, поддерживающий графический интерфейс пользователя (GUI), который отображает информацию для пользователя.

Что такое задачи и DAG

Задача определяется как единица работы и может рассматриваться как задание Python, выполняющее сценарий. Экземпляр задачи выполняет одну задачу и имеет такое состояние, как running, success, failed, skipped или up for retry и т. д.

Есть два типа задач, а именно

  • Оператор: выполнить операцию. PythonOperator может выполнять любую функцию, которую можно выполнить в Python, также есть BashOperator, EmailOperator, SimpleHttpOperator и т. д. в зависимости от ваших потребностей
  • Датчик: приостанавливает выполнение зависимых задач до тех пор, пока не будет выполнено какое-либо условие. Сенсор может проверять состояние любого процесса или структуры данных

Два типа задач обычно выполняются в последовательности sensor > operator, где задача датчика может приостановить выполнение задачи оператора до тех пор, пока не будет выполнено условие.

Задачи могут быть объединены в цепочку для выполнения в направленном ациклическом графе (DAG). DAG — это тип графа, который является направленным (есть последовательность выполнения задач) и ациклическим (задачи не могут образовывать цикл, чтобы он был бесконечным).

Установка воздушного потока

Airflow можно установить как пакет Python, и его база данных будет создана с помощью следующих команд:

$ pip install apache-airflow
$ airflow db init

Теперь вы готовы использовать Apache Airflow!

Использование Airflow с Python

Существует 3 основных шага при использовании Apache Airflow. Во-первых, вам нужно определить DAG, указав расписание, когда необходимо запускать сценарии, кому отправлять электронные письма в случае сбоев задач и так далее. Затем вам нужно определить задачи оператора и задачи датчика, связав задачи с функциями Python. Наконец, вам нужно определить зависимость между задачами, указав последовательность задач.

Теперь давайте углубимся в то, как мы можем использовать Apache Airflow с Python!

1. Определение группы обеспечения доступности баз данных

Чтобы определить DAG, это просто объект DAG с параметрами, переданными в default_args. Необходимо определить несколько важных параметров,

  • owner: владелец DAG
  • start_date: дата начала запуска DAG, это должна быть историческая дата.
  • schedule_interval: интервал для запуска DAG, может быть определен с помощью datetime.timedelta или строки в формате расписание CRON.
  • email: адреса электронной почты для отправки писем в случае неудачи или повторной попытки
  • retries: количество повторных попыток
  • retry_delay: задержка между каждой повторной попыткой

Дополнительные параметры можно найти в документации Apache Airflow. Ниже приведен пример создания экземпляра DAG.

import datetime

from airflow.models import DAG

# Parameteres
WORFKLOW_DAG_ID = "my_example_dag"
WORFKFLOW_START_DATE = datetime.datetime(2022, 1, 1)
WORKFLOW_SCHEDULE_INTERVAL = "* * * * *"
WORKFLOW_EMAIL = ["[email protected]"]

WORKFLOW_DEFAULT_ARGS = {
    "owner": "kayjan",
    "start_date": WORFKFLOW_START_DATE,
    "email": WORKFLOW_EMAIL,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
}
# Initialize DAG
dag = DAG(
    dag_id=WORFKLOW_DAG_ID,
    schedule_interval=WORKFLOW_SCHEDULE_INTERVAL,
    default_args=WORKFLOW_DEFAULT_ARGS,
)

2. Определение задач

Задачи должны быть связаны с соответствующими функциями Python. Имена задач указываются в task_id и должны быть уникальными, а функция Python указывается с аргументом python_callable.

В приведенном ниже примере я определяю 2 функции Python job_1 и job_2, фиктивную функцию sensor_job для датчика и связываю ее с задачами оператора и датчика.

from airflow.operators.python import PythonOperator
from airflow.sensors.python import PythonSensor

# Define functions
def job_1():
    print("Perform job 1")

def job_2():
    print("Perform job 2")

def sensor_job():
    print("Sensor Job")

# Define jobs
jobrunningoperator = PythonOperator(
    task_id="task_job_1",
    python_callable=job_1,
    dag=dag,
)

jobsuccesssensor = PythonSensor(
    task_id="task_jobsuccesssensor",
    python_callable=sensor_job,
    dag=dag,
    poke_interval=180,
)

jobsuccessoperator = PythonOperator(
    task_id="task_job_2",
    python_callable=job_2,
    dag=dag,
)

3. Определение зависимости между задачами

После определения задач нам нужно определить порядок, в котором задачи должны выполняться. Это можно сделать с помощью оператора сдвига битов >>, который имеет соглашение:

  • preceding_task >> subsequent_task , or
  • subsequent_task << preceding_task

Зависимость также может быть определена с помощью кода Pythonic,

  • preceding_task.set_downstream(subsequent_task) , or
  • subsequent_task.set_upstream(preceding_task)

Лично я предпочитаю использовать оператор сдвига битов, потому что его можно накладывать друг на друга и он более удобочитаем.

# Set dependency
jobrunningoperator >> jobsuccesssensor >> jobsuccessoperator

Я поместил весь код выше в каталог dags_folder/dag.py. Запустите код один раз с python3 dags_folder/dag.py, чтобы проверить наличие ошибок. Когда вы закончите, скопируйте файл по пути к файлу, указанному в ~/airflow/airflow.cfg. Либо измените файл airflow.cfg так, чтобы он указывал на ваш каталог dags_folder, используя абсолютный путь.

$ cp dags_folder/dag.py /home/kayjan/airflow/dags/

Запуск веб-сервера

Перед просмотром вашей DAG на веб-сервере Airflow необходимо выполнить еще несколько шагов.

Во-первых, пользователь должен быть зарегистрирован, поскольку для веб-сервера Airflow требуются учетные данные для входа. Мы можем зарегистрировать пользователя и удалить пользователя, используя следующие команды соответственно:

$ airflow users create -r Admin -u <username> -p <password> -f <first_name> -l <last_name> -e <email>
$ airflow users delete -u <username>

Затем запустите планировщик Airflow. Обратите внимание, что для запуска DAG по расписанию должен быть запущен планировщик Airflow.

$ airflow scheduler

Проверьте, присутствует ли ваша DAG, выполнив команду airflow dags list. Может быть много других DAG, которые являются примерами DAG, которые вы можете пока игнорировать.

Наконец, откройте новый терминал и запустите веб-сервер с помощью следующей команды:

$ airflow webserver

На веб-сервере можно просматривать исторические запуски и их состояния в формате дерева или просматривать DAG в формате графика. Существует также другая информация, такая как параметры DAG, код DAG, журналы задач и многое другое. Вы можете вручную запустить запуск DAG с помощью кнопки воспроизведения в правом верхнем углу.

Надеюсь, вы изучили основы Apache Airflow и как настроить Airflow DAG с кодом Python. В Apache Airflow появилось много новых терминов, которые могут сбивать с толку. После этого введения вы сможете лучше понимать документацию, форум и другие статьи Apache Airflow.

В Apache Airflow есть много других функций, таких как распределенное выполнение задач с помощью Celery или Kubernetes Executors, обмен информацией между операторами с помощью XCom, определение собственных операторов и датчиков с помощью хуков. Продолжение может быть, если будет спрос ;)

Наконец, коды, используемые в этой статье, приведены ниже.

Спасибо за прочтение! Если вам понравилась эта статья, поделитесь ею.

Ссылки по теме

Документация по Apache Airflow: https://airflow.apache.org/

Учебное пособие по Apache Airflow: https://airflow-tutorial.readthedocs.io/en/latest/index.html

Официальный GitHub Apache Airflow: https://github.com/apache/airflow