Airflow DAG - Лучшие Лрактики

В сообщении блога мы увидим некоторые передовые методы создания групп DAG. Давайте начнем.

DAG как файл конфигурации

Планировщик Airflow сканирует и компилирует файлы DAG при каждом такте. Если файлы DAG тяжелые и в них присутствует много кодов верхнего уровня, планировщик будет потреблять много ресурсов и времени для их обработки при каждом такте. Поэтому рекомендуется, чтобы группы DAG оставались светлыми, больше похожими на файл конфигурации. В качестве шага вперед было бы неплохо иметь определение рабочего процесса на основе YAML / JSON, а затем на его основе генерировать DAG. Это дает двойное преимущество. 1. Группы DAG, которые создаются программно, будут согласованными и воспроизводимыми в любое время. 2. Пользователи, не использующие Python, также смогут его использовать.

Мы можем отделить блоки кода, не связанные с конфигурацией, вне определения DAG и использовать атрибут template_searchpath для их добавления. Например, если вы пытаетесь подключиться к RDS и выполнить некоторую команду SQL, эта команда SQL должна быть загружена из файла. И расположение файла должно быть указано в template_searchpath. Аналогично с запросами Hive (.hql).

Инвестируйте в систему плагинов Airflow

Настраиваемые хуки и операторы - отличные способы упростить поддержку конвейеров, отладить их и упростить создание. Так что хорошо иметь надлежащее репозиторий плагинов и поддерживать его для создания собственных плагинов, необходимых в соответствии с требованиями организации. Создавая плагин, будьте универсальны, чтобы его можно было многократно использовать в разных сценариях использования. Это помогает в управлении версиями, а также в поддержании чистоты рабочих процессов и в основном деталей конфигурации, а не логики реализации. Кроме того, не выполняйте тяжелую работу / операции во время инициализации класса, вставляйте операции (например, получение переменных и соединений) внутри метода выполнения.

Не выполняйте обработку данных в файлах DAG.

Поскольку группы DAG основаны на Python, у нас определенно возникнет соблазн использовать pandas или аналогичные вещи в DAG, но мы не должны этого делать. Airflow - это оркестратор, а не среда выполнения. Все вычисления следует делегировать определенной целевой системе. Следуйте за огнем и отслеживайте приближение. Используйте оператора для запуска задачи и датчик для отслеживания завершения. Airflow не предназначен для длительных задач.

emr_add_step = EmrAddStepsOperator(
  task_id='demo_step',
  job_flow_id='<EMR_CLUSTER_ID>',
  aws_conn_id='aws_default',
  steps=<SPARK_STEPS>
)
emr_step_status_checker = EmrStepSensor(
  task_id='check_step',
  job_flow_id='<EMR_CLUSTER_ID>',
  step_id="{{ task_instance.xcom_pull(task_ids='demo_step', key='return_value')[0] }}",
  aws_conn_id='aws_default'
)

Делегировать вызовы API или БД операторам

Это несколько похоже на первый пункт. Вызов API или соединение с БД, выполненное в коде верхнего уровня в файлах DAG, перегружает планировщик и веб-сервер. Эти вызовы, определенные вне оператора, вызываются при каждом такте. Поэтому рекомендуется передать их оператору util / common (может быть оператором Python).

Сделайте DAG-файлы / задачи идемпотентными

DAG должен выдавать одни и те же данные при каждом запуске. Чтение из раздела и запись в раздел. Оба должны быть неизменными. Создавайте и удаляйте разделы, чтобы избежать неизвестных ошибок.

Использовать одну переменную для каждого DAG

Каждый раз, когда мы обращаемся к переменным DAG, он создает соединение с БД метаданных. Это может перегрузить базу данных, если у нас работает несколько групп DAG с несколькими вызываемыми переменными. Лучше использовать одну переменную для каждого DAG с объектом JSON. Это создаст единое соединение. Мы можем проанализировать JSON, чтобы получить желаемую пару ключ-значение.

# Combining SSM keys in one
dag_specific_variable = '{
  "variable_1" : "value_1",
  "variable_2" : "value_2",
  "variable_3" : "value_3"
}'
# Single call to get all three varaibles
dag_specific_params =  Variable.get("dag_specific_variable", deserialize_json=True)
# Using these in DAG
{{ var.json.dag_specific_params.variable_1 }}

Отметьте DAG

Наличие тегов в DAG помогает фильтровать и группировать DAG. Сделайте его совместимым с текущей системой тегов вашей инфраструктуры. Тег Like на основе BU, проекта, категории приложения и т. Д.

Не злоупотребляйте XCom

XCom действует как канал между задачами для обмена данными. Для этого он использует серверную БД. Следовательно, мы не должны передавать огромное количество данных, используя это, так как с большим объемом данных серверная БД будет перегружена.

Используйте промежуточное хранилище между задачами.

Если данные, которые должны быть разделены между двумя задачами, огромны, сохраните их в промежуточной системе хранения. И передайте ссылку нижестоящей задаче.

Используйте возможности шаблонов Jinja

Многие операторы поддерживают template_fields. Этот объект кортежа определяет, какие поля будут преобразованы.

class PythonOperator(BaseOperator):
    template_fields = ('templates_dict', 'op_args', 'op_kwargs')

При написании пользовательского оператора этот атрибут template_fields переопределяется.

class CustomBashOperator(BaseOperator):
    template_fields = ('file_name', 'command', 'dest_host')

В приведенном выше примере поля «file_name», «command», «dest_host» будут доступны для создания шаблонов jinja.

Вы также можете получить доступ к переменным с помощью шаблона.

{{ var.value.key_name }}

Параметры также можно шаблонизировать, как показано ниже:

# Define param
params={
  "param_key_1": "param_value_1",
  "param_key_2": "param_value_2"
}
# access param using template
{{ params.param_key_1 }}

Внедрить контроль доступа на уровне DAG

Используйте представления Flask App Builder, чтобы иметь контроль доступа на уровне DAG. Установите владельца DAG на правильного пользователя Linux. Создайте настраиваемую роль, чтобы решить, кто может выполнять действия группы DAG / задачи.

Использовать статическую start_date

Статическая дата начала DAG помогает в правильном заполнении запусков и расписания DAG.

Переименовать DAG в случае структурных изменений

До тех пор, пока не будет реализована функция управления версиями DAG, в случае каких-либо структурных изменений в DAG переименуйте DAG при изменениях. Это создаст новый DAG, и вся история DAG предыдущего запуска для старой версии DAG будет там без каких-либо противоречий.

Некоторые другие передовые практики:

  • Установите повторные попытки на уровне DAG
  • Используйте согласованную файловую структуру
  • Выберите последовательный метод для зависимостей задач
  • Имейте стратегию уведомления о сбое

Следите за предстоящими улучшениями воздушного потока:

  • Функциональный DAG
  • Сериализация DAG
  • Планировщик HA
  • REST API производственного уровня
  • Умные датчики
  • Группы задач

Удачи с DAG.