С выпуском версии 3.6 Elyra внесла несколько улучшений, которые обеспечивают более полную поддержку разработки рабочих процессов Apache Airflow. В этом сообщении блога я расскажу об основах Apache Airflow и о том, как использовать Elyra для создания конвейеров, включающих различные операторы Apache Airflow.

Рекомендуем прочитать эту запись в блоге, чтобы более подробно обсудить некоторые упомянутые здесь концепции Elyra, такие как пользовательские компоненты и каталоги компонентов, а также узнать больше о поддержке Elyra в Kubeflow Pipelines.

Apache Airflow

Apache Airflow — это инструмент оркестровки рабочих процессов с открытым исходным кодом, который обеспечивает создание, выполнение и отслеживание рабочих процессов. Рабочий процесс или конвейер можно рассматривать как последовательность шагов, предпринимаемых для достижения определенной цели преобразования данных. Каждый шаг, также называемый задачей, соответствует одной единице работы в последовательности. В Apache Airflow рабочий процесс представлен направленным ациклическим графом (DAG), где каждый узел графа соответствует задаче, а каждое ребро представляет зависимости между задачами. Реализация задачи в Airflow подробно описана в повторно используемом классе Python, называемом оператором. Таким образом, каждая задача является воплощением оператора. Развертывания Airflow поставляются с набором встроенных операторов для общих задач. Воздушный поток также можно расширить дополнительными операторами, подключив любое количество пакетов провайдера.

Apache Airflow ориентирован на код, а это означает, что группа обеспечения доступности баз данных рабочего процесса и связанные с ней задачи оператора определяются программно. Это то, что делает Airflow таким мощным и гибким, но также может быть немного громоздким для тех, кто не знаком с Python или при написании DAG для очень сложного или очень большого конвейера. Именно здесь Визуальный конвейерный редактор Elyra (VPE) может упростить задачу.

Elyra Pipelines и VPE

Elyra VPE позволяет построить конвейер без написания кода. Интерфейс включает палитру (слева), холст (в центре) и панель свойств (справа), как показано ниже.

В палитре перечислены доступные операторы (в документации Elyra их обычно называют компонентами). Соберите конвейер, перетащив компоненты из палитры на холст и соединив их соответствующим образом. Elyra также отображает входные и выходные свойства каждого компонента на панели свойств.

Обратите внимание, что для компонентов Airflow (операторов) будут отображаться только входные свойства.

Как видно на снимке экрана выше, при построении специфичного для Apache Airflow пайплайна в палитре доступны два типа компонентов:

  1. универсальные компоненты для выполнения блокнотов Jupyter, сценариев Python и сценариев R. Их можно увидеть в категории Elyra в палитре.
  2. специальные компоненты среды выполнения (также называемые настраиваемые компоненты), каждый из которых соответствует одному оператору Airflow и отображается с логотипом Apache Airflow слева от его имени. Показанный здесь набор организован в категорию Основные пакеты.

Пользовательские компоненты не включены в Elyra по умолчанию, поэтому ими необходимо управлять отдельно с помощью функции каталога компонентов Elyra.

Каталоги компонентов

Elyra не имеет собственного репозитория компонентов и полагается на каталоги компонентов и связанные с ними коннекторы для управления компонентами, доступными в палитре. Соединители каталога служат двум целям:

  1. получить список компонентов, доступных в данном каталоге
  2. получить определение компонента, чтобы раскрыть его входные и выходные свойства

Elyra предоставляет три встроенных типа соединителя каталога компонентов: файловая система каталоги и каталог каталоги, которые обеспечивают доступ к компонентам. хранятся в файловой системе по отдельности или вместе в каталоге, а также URL каталог, обеспечивающий доступ к компонентам, доступным через анонимный HTTP-запрос GET. Дополнительные коннекторы доступны в репозитории примеров Elyra. В отличие от встроенных типов, вы должны явно установить соединитель, чтобы получить доступ к компонентам из этих типов соединителей. Если ни один из готовых коннекторов не подходит вам, вы также можете создать свой коннектор, следуя инструкциям здесь.

Elyra версии 3.6 основывается на этом, добавляя два новых типа встроенных коннекторов каталога, специфичных для Airflow: коннектор каталога пакетов Airflow и коннектор каталога пакетов поставщика Airflow. Эти соединители доступны в дистрибутиве Elyra без необходимости установки каких-либо дополнительных пакетов.

Коннектор каталога пакетов Airflow

Коннектор каталога пакетов Airflow позволяет пользователям получить доступ к встроенным (или основным) операторам для данного распределения Airflow. Некоторые из этих операторов включают BashOperator, который выполняет команду bash, и EmailOperator, который отправляет электронное письмо списку получателей. Полный список можно найти здесь.

Чтобы настроить каталог пакетов Airflow, сначала откройте панель «Каталоги компонентов» в JupyterLab. Выберите символ +, затем «Новый каталог операторов пакетов Apache Airflow».

Введите имя каталога и при необходимости измените описание. Среда выполнения автоматически устанавливается как Apache Airflow. При необходимости укажите одну или несколько категорий для операторов, которые будут извлекаться этим каталогом. Эти имена категорий используются для организации операторов в палитре. Одна категория «Основные пакеты» назначается по умолчанию.

Наконец, вам нужно настроить файл Airflow package download URL. URL-адрес должен соответствовать нескольким ограничениям:

  1. он должен указывать на файл встроенного дистрибутива (колеса)
  2. он должен ссылаться на местоположение, к которому Elyra может получить доступ с помощью HTTP-запроса GET без необходимости аутентификации.

Например, для дистрибутива, доступного на PyPI, откройте «Историю выпусков» пакета и выберите нужную версию.

На странице соответствующей версии нажмите ссылку «Загрузить файлы» и скопируйте ссылку на файл колеса пакета.

Вставьте скопированную ссылку в соответствующее поле и нажмите «Сохранить и закрыть». Теперь основные операторы должны быть доступны на палитре VPE в указанных категориях, в данном случае это категория по умолчанию, «Основные пакеты».

Хотя это и не является обязательным требованием, вы также должны убедиться, что используемая вами версия URL-адреса загрузки пакета соответствует версии пакета Apache Airflow в вашем кластере. Если они не совпадают, изменения, внесенные в оператор между версиями, могут вызвать ошибку во время выполнения конвейера.

Как и существующие типы коннекторов каталогов, новый коннектор пакетов Airflow Provider также упрощает совместное использование конвейеров между развертываниями Elyra. Например, предположим, что пользователь А добавляет операторы из archiveapache_airflow-1.10.15-py2.py3-none-any.whl в свое развертывание Elyra и создает конвейер, используя эти операторы. Пользователь B имеет собственное развертывание Elyra и настраивает каталог для добавления операторов из старого архива apache_airflow-1.10.12-py2.py3-none-any.whl. В этой ситуации пользователь B по-прежнему может запускать конвейеры, созданные пользователем A, и наоборот, поскольку операторы внутренне идентифицируются с помощью набора дескрипторов, которые не включают номер версии настроенного URL-адреса загрузки архива. Как упоминалось выше, операторы могут немного различаться между версиями, и могут потребоваться изменения некоторых свойств узла, чтобы гарантировать одинаковую функциональность в общем конвейере.

Обратите внимание, что коннектор уже должен поддерживать пакеты Airflow версии 2.x, но они не тестировались, поскольку Elyra еще не поддерживает Airflow 2.x.

Коннектор каталога пакетов провайдера Airflow

Если оператор не установлен с Airflow по умолчанию, существует огромный выбор пакетов провайдеров, которые, скорее всего, содержат операторы, реализующие необходимый функционал. Пакеты поставщиков пишутся и поддерживаются сообществом Apache Airflow и должны устанавливаться отдельно в кластере Airflow. Пакет apache-airflow-providers-amazon и пакет apache-airflow-providers-google — это два примера поддерживаемых сообществом пакетов поставщиков, реализующих поддержку различных задач, связанных с Amazon и Google, соответственно. Полный список пакетов провайдера можно посмотреть здесь. Пользователи также могут создавать свои собственные провайдеры для расширения основных функций по мере необходимости.

Инструкции по настройке Каталога пакетов провайдера Airflow, чтобы сделать его операторов доступными в Elyra, во многом аналогичны инструкциям по настройке каталога пакетов Airflow. При добавлении нового каталога на панели Каталоги компонентов выберите Новый каталог оператора пакета поставщика Apache Airflow. Остальные инструкции такие же, включая требования к форме Provider package download URL: файл .whl, доступный из анонимного HTTP-запроса GET. Пакеты провайдера обычно доступны на PyPI. Убедитесь, что вы выбираете лучшую версию для своего кластера Airflow.

На снимке экрана ниже показана палитра с несколькими пользовательскими компонентами, импортированными из пакета поставщика Slack в PyPI.

Опять же, имейте в виду, что коннектор пакетов провайдеров не тестировался для пакетов провайдеров Airflow 2.x, поскольку Elyra еще не поддерживает Airflow 2.x. Если вы заинтересованы во внедрении поддержки Airflow 2.x, рассмотрите возможность внесения этой функции или предложения связанных улучшений в рамках цели Elyra Создайте собственную среду выполнения.

Теперь, когда операторы пакетов добавлены в палитру, осталось выполнить последний шаг, чтобы успешно экспортировать и отправить конвейеры Airflow, включающие пользовательские компоненты.

Настройка Elyra для поддержки пользовательских компонентов воздушного потока

Elyra и ее VPE упрощают процесс создания и отправки конвейера для Airflow, устраняя необходимость вручную писать код для определения DAG. Однако для успешного рендеринга DAG для данного конвейера Elyra должна быть настроена на включение информации о полностью квалифицированных пользовательских классах операторов, используемых в этом конвейере. Эта конфигурация поддерживает способность Elyra правильно отображать операторы импорта для всех узлов операторов в данной DAG.

Если вы неправильно настроите имя пакета оператора и попытаетесь экспортировать или отправить конвейер, Elyra выдаст вам сообщение об ошибке, подобное следующему:

Как видно на снимке экрана, полные имена пакетов операторов должны быть добавлены в переменную available_airflow_operators. Эта переменная является значением списка и является настраиваемым свойством в Elyra. Чтобы настроить available_airflow_operators, сначала создайте файл конфигурации из командной строки:

$ jupyter elyra --generate-config

Откройте созданный файл и найдите файл PipelineProcessor(LoggingConfigurable) header. Добавьте следующую строку под этим заголовком.

c.AirflowPipelineProcessor.available_airflow_operators.extend([...])

При необходимости замените полной строкой пакета для каждого класса операторов. Например, если вы хотите использовать SlackAPIPostOperator из пакета поставщика Slack и PapermillOperator из основного пакета в своих пайплайнах, ваша конфигурация будет выглядеть следующим образом:

#------------------------------------------------------------------
# PipelineProcessor(LoggingConfigurable) configuration
#------------------------------------------------------------------
c.AirflowPipelineProcessor.available_airflow_operators.extend(
    [
        "airflow.providers.slack.operators.SlackAPIPostOperator",
        "airflow.operators.papermill_operator.PapermillOperator"
    ]
)

Нет необходимости перезапускать JupyterLab, чтобы эти изменения вступили в силу. Конвейерный экспорт и отправка теперь будут успешными!

В настоящее время команда Elyra работает над автоматизацией этого шага настройки описанных здесь коннекторов пакета Airflow. Этот шаг по-прежнему потребуется при использовании других типов каталогов коннекторов, таких как URL-адреса и каталоги файловой системы.

Заключение

Спасибо за прочтение и интерес к Элире. Как проект с открытым исходным кодом, мы приветствуем вклады и отзывы любого рода, от отчетов об ошибках до новых функций и улучшенной документации. Подробную информацию о том, как вы можете обратиться с вопросами, комментариями или предложениями, можно найти в разделе Получение справки в документации. Мы с нетерпением ждем ответа от вас!