(Иллюстрация основ Apache-Airflow)

Написано в сотрудничестве с Хирен Рупчандани

Предисловие

В предыдущих историях вы узнали, как настроить воздушный поток в Windows (с использованием WSL), Ubuntu, и >macOSоперационные системы. Наконец-то пришло время показать вам, как создать свой первый DAG в воздушном потоке!

В этой статье вы познакомитесь с некоторыми основными понятиями, которые необходимо иметь в видупри сочинениигруппы обеспечения доступности баз данных. Вы изучите работу компонентов DAG в рассеяннойи комбинированной форме, которую вы можете запланировать в определенное время и продолжительность в воздушном потоке.

Создание файла Python

  • Включите виртуальную среду и перейдите в каталог воздушного потока, содержащий папку dags и некоторые другие файлы.
  • Откройте свой любимый редактор и создайте новый файл с именем «hello_world_dag.py».

Импорт модулей

  • Чтобы создать правильный конвейер в airflow, нам нужно импортироватьмодуль «DAG» и оператор python из модуля «operators.python». в воздушном пакете.
  • Мы также импортируем модуль «datetime», чтобы планировать даги.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

Создание объекта DAG

  • Далее мы создадим объект DAG для вложения задач в конвейер. Мы передаем строку «dag_id», которая является уникальным идентификатором dag.
  • Мы рекомендуем сохранить файл python и dag_id с одним и тем жеименеми после этого, мы назначим «dag_id» как «hello_world_dag».
  • Мы также установим параметр «start_date», указывающий временную метку, с которой планировщик попытается выполнить заполнение.
  • За ним следует параметр schedule_interval, указывающий интервал последующих запусков DAG, созданных планировщиком. Он имеет форму объекта datetime.timedelta или выражения cron. Airflow имеет несколько доступных предустановок cron, таких как @hourly, @daily, @yearly и т. д. Вы можете прочитать подробнее о них здесь.
  • Если «start_date» установлено как 1 января 2021 г., а «schedule_interval» равно ежечасно, то планировщик будет запускать выполнение DAG каждый час, пока не будет достигнут текущий час или «end_date» (необязательный параметр). Это называется catchup, и мы можем отключить его, сохранив для параметра значение False.
  • После установки этих параметров наша инициализация DAG должна выглядеть так:
with DAG(dag_id="hello_world_dag",
         start_date=datetime(2021,1,1),
         schedule_interval="@hourly",
         catchup=False) as dag:

Создание задачи

  • Согласно документации по воздушному потоку, объект, созданный от оператора, называется задачей. В airflow у нас есть различные типы операторов, но сейчас мы сосредоточимся только на PythonOperator.
  • PythonOperator используется для вызова функции python внутри вашей DAG. Мы создадим объект PythonOperator, который вызывает функцию Python, которая при вызове возвращает «Hello World».
  • Подобно тому, как объект DAG имеет «dag_id», объект PythonOperator имеет идентификатор с именем «task_id».
  • Он также имеет параметр «python_callable», который принимает имя вызываемой функции в качестве входных данных.
  • После настройки параметров наша задача должна выглядеть так:
task1 = PythonOperator(
        task_id="hello_world",
        python_callable=helloWorld)

Создание вызываемой функции

  • Нам также нужно создать функцию, которая будет вызываться PythonOperator, как показано ниже:
def helloWorld():
    print(‘Hello World’)

Установка зависимостей

  • Мы можем установить зависимости задач, написав имена задач вместе с ›› или ‹‹, чтобы указать нижестоящий или восходящий поток.
  • Поскольку у нас здесь одна задача, нам не нужно указывать поток и продолжить писать имя задачи.

Вуаля, это файл DAG

После компиляции всех элементов DAG наш окончательный код должен выглядеть так:

Выполнение DAG в пользовательском интерфейсе веб-сервера

  • Чтобы увидеть, как работает файл, активируйте виртуальную среду и запустите веб-сервер и планировщик воздушного потока.
  • Перейдите на http://localhost:8080/home (или выделенный порт для воздушного потока), и вы должны увидеть следующее в пользовательском интерфейсе веб-сервера:

  • Группа обеспечения доступности баз данных должна работать успешно. Вы можете проверить представление в виде графика или представление в виде дерева, наведя указатель мыши на ссылку и выбрав параметры графика или дерева.

  • Вы также можете просмотреть информацию о выполнении задачи с помощью журналов. Для этого нажмите на задачу, что приведет вас к следующему диалоговому окну:

  • Затем нажмите кнопку Журнал, и вы будете перенаправлены в журнал задачи.

Поздравляем! Мы создали нашу первую DAG с использованием воздушного потока. В следующих историях мы покажем вам, как создать правильную DAG с несколькими задачами и установить зависимости между ними.

Заключительные мысли и заключительные комментарии

Есть некоторые жизненно важные моменты, которые люди не понимают, занимаясь наукой о данных или путешествием в области искусственного интеллекта. Если вы один из них и ищете способ уравновесить эти минусы, ознакомьтесь с сертификационными программами, предоставляемыми INSAID на их веб-сайте. . Если вам понравилась эта история, я рекомендую вам пройти Глобальный сертификат в области науки о данных, потому что он будет охватывать ваши основы, а также алгоритмы машинного обучения(базовые для продвижения) .

& Вот и все. Надеюсь, вам понравилось объяснение Hello World! используя Apache-Airflow, и узнал кое-что ценное. Пожалуйста, дайте мне знать в разделе комментариев, если вам есть чем поделиться со мной. Я хотел бы знать ваши мысли.

Подпишитесь на меня, чтобы не пропустить новые статьи, посвященные Python, R, науке о данных, машинному обучению и искусственному интеллекту.

Если вы считаете это чтение полезным, нажмите хлопать👏. Ваша поддержка послужит катализатором вдохновения, чтобы поддерживать меня и разрабатывать более ценный контент.

Что дальше?