ТЕХНИЧЕСКИЕ СОВЕТЫ

5 важных советов по созданию конвейера ETL для базы данных, размещенной на Redshift, с использованием Apache Airflow

Рекомендации для начинающих, работающих с Airflow

Вступление

Apache Airflow - одна из лучших систем управления рабочими процессами (WMS), которая предоставляет инженерам по данным удобную платформу для автоматизации, мониторинга и обслуживания их сложных конвейеров данных. Созданный в Airbnb в 2014 году, затем ставший проектом с открытым исходным кодом и отличным пользовательским интерфейсом, Airflow стал популярным выбором среди разработчиков. Есть много хороших ресурсов / руководств для пользователей Airflow на разных уровнях. Вы можете начать изучение Airflow с множества хороших руководств, таких как это пошаговое руководство или эта серия на Medium, в которых вы также можете узнать о системе управления рабочим процессом в целом. Для пользователей, уже знакомых с Airflow, этот ресурс может помочь вам получить очень глубокое понимание многих аспектов Airflow.

В этом посте я просто хочу поделиться своим опытом создания конвейера ETL хранилища данных на AWS с помощью Airflow. Надеюсь, это поможет. И, пожалуйста, поправьте меня, если что-то не так в моем посте.

Предостережения

1. В этой статье предполагается, что вы уже знакомы с хранилищем данных, AWS, в частности Amazon Redshift, Apache Airflow, средой командной строки и записной книжкой Jupyter.

2. Вы несете ответственность за мониторинг платы за использование используемой вами учетной записи AWS. Не забывайте завершать работу кластера и других связанных ресурсов каждый раз, когда вы заканчиваете работу.

3. Это один из оценочных проектов наноразмерной инженерии данных на Udacity. Поэтому, соблюдая Кодекс чести Udacity, я бы не стал включать полную записную книжку в рабочий процесс для изучения и создания конвейера ETL для проекта. Часть версии этого руководства для ноутбука Jupyter вместе с другими руководствами по науке о данных можно найти на моем github.

Ссылка

Введение в проект

Цель проекта

Sparkify - это стартап, работающий над приложением для потоковой передачи музыки. С помощью приложения Sparkify собирает информацию об активности пользователей и песнях, которая хранится в виде каталога журналов JSON (log-data - активность пользователя) и каталога файлов метаданных JSON (song_data - информация о песне). Эти данные хранятся в общедоступной корзине S3 на AWS.

Этот проект будет рабочим процессом для исследования и создания конвейера ETL (извлечение - преобразование - загрузка), который:

  • Извлекает данные из S3 и размещает их в AWS Redshift в виде промежуточных таблиц (активность пользователя - Stage_events таблица и данные песен - Stage_songs таблица).
  • Преобразует данные из промежуточных таблиц в набор таблиц фактов (songplays) и размерных таблиц (включая artists, time, users, and songs таблицы) для аналитических целей. Более подробно об этих таблицах можно найти в другом моем родственном проекте.
  • Этот полноценный конвейер ETL должен быть динамичным, за ним можно следить и при необходимости обеспечивать возможность простой обратной засыпки. Это требование выполняется за счет внедрения в систему Apache Airflow.

Ниже приведен полный Направленный ациклический график - DAG - с операторами, используемыми для проекта. (Если вы не знаете, что такое группы DAG или операторы, прочтите краткое определение концепций Airflow здесь).

В этом посте я не буду подробно обсуждать, как поэтапно выполнялся проект. Скорее, я просто расскажу вам несколько важных советов и проблем, с которыми я столкнулся при работе с Airflow. Я надеюсь, что это сэкономит вам столько времени и усилий, связанных со многими странными состояниями, которые могут возникнуть при построении рабочего процесса.

Совет 1. Начните с простейшего DAG

Ваш DAG, высокоуровневый план, определяющий задачи в определенном порядке, должен быть как можно более простым. Очевидно, что это лучшая практика в программировании, но о ней легко забыть.

Почему мы должны начать с простого DAG?

Ниже приведены окончательные требования к конфигурации DAG для моего проекта:

По сути, DAG не зависит от прошлых запусков; start_date - 12 января 2019 г .; трубопровод будет запускаться каждый час. В случае неудачи задача повторяется 3 раза; повторные попытки происходят каждые 5 минут.
Я был настолько наивен, пытаясь использовать эту окончательную конфигурацию DAG для моего первого запуска, что в конечном итоге меня перегрузили при запуске моего DAG в пользовательском интерфейсе Airflow. В очереди было так много пробежек. И многие, даже многие другие могут прийти после этого.

Откуда все эти запуски в очереди?

  • Аргументы start_date': datetime(2019, 1, 12), и schedule_interval = '@hourly будут давать ~ 600 days x 24 hourly backfill runs с start_date (12 января 2019 г.) до настоящего времени (2020 г.).
  • С аргументами 'retries': 3, и 'retry_delay': timedelta(minutes = 5),: если задача не удалась - конечно, она не удалась бы, хотя сложный конвейер данных не потерпел бы сбой с первой попытки - ей нужно повторить 3 раза, Разница в 5 минут. Таким образом, в случае сбоя, помимо большого количества запусков (each scheduled run x 3 times of retries), для каждого запуска с повторными попытками в этой настройке требуется время ожидания 3 x 5 минут. Что касается количества запусков обратной засыпки, у вас не будет другого выбора, кроме как остановить / убить задачи Airflow. Это довольно хакерский метод, чтобы останавливать / закрывать задачи Airflow из пользовательского интерфейса, поэтому лучше не сталкиваться с этой проблемой. В некоторых случаях, хотя запущенные DAG были удалены, и группы DAG были изменены и снова запускались, они все еще могут перезапускаться и запускать предыдущие незавершенные задачи.

Тогда как же выглядит простой DAG?

В моей отладочной версии группа DAG запускалась сразу после запуска ('start_date': datetime.now()) только с одним запуском за раз (max_active_runs=1,)) и запускалась только один раз (schedule_interval= None) без повторных попыток при сбое (по умолчанию в DAG). Этот простой DAG мгновенно останавливается при сбое любой задачи, что позволяет нам легко отлаживать нашу DAG.

Некоторые другие примечания:

  • Что произойдет, если я оставлю start_date равным start_date': datetime(2019, 1, 12) и запущу DAG только один раз (schedule_interval= None): будет 2 запуска: обратный запуск для 2019 года и один для текущего ручного запуска. Поскольку мне нужен только один запуск для моего процесса разработки DAG, лучше установить 'start_date': datetime.now().

  • DAG выполняет несколько расписаний одновременно, более поздние запуски могут выполняться одновременно и мешать первому. Нехорошо, если DAG содержит тяжелые задачи, такие как копирование огромной таблицы из S3 в Redshift. Чтобы обойти это, вы можете установить max_active_runs в 1.

Совет 2: как остановить / убить задачи Airflow из пользовательского интерфейса Airflow?

Как упоминалось в совете 1, довольно сложно остановить / убить задачи Airflow. Есть несколько способов остановить / убить задачу Airflow в пользовательском интерфейсе. Подход, который мне идеально подходит, выглядит следующим образом:

Шаг 1. Отключите DAG

Шаг 2. Удалите все прогоны

В строке меню Airflow выберите Browse -> DAG Runs -> Checkbox to select all the runs -> With Selected -> Delete

Обратите внимание, что вы должны сначала отключить DAG, иначе вы можете увидеть Белых ходоков и Армию мертвецов в действии: исполнитель может продолжить планирование и запускайте новые запуски, даже если вы только что удалили все запуски DAG.

Совет 3. «Неисправный DAG». Это может быть связано с проблемой синтаксического анализа DAG, а не с вашим кодом.

С настройками Airflow по умолчанию, когда вы обновляете связанный файл python, DAG должны быть перезагружены. Как кто-то упомянул, когда веб-сервер запущен, он по умолчанию обновляет DAG каждые 30 секунд. Мы готовы пойти, когда увидели, что наши DAG-файлы теперь свежи как ромашка:

Как правило, веб-сервер Airflow может плавно обрабатывать сбои загрузки DAG в большинстве случаев, но не все время. Я потратил целый день, пытаясь выяснить, что не так с моим кодом . Я даже перезагрузил пустышку - DAG ничего не могло быть неправильным, но все равно не смог исправить ошибку, которая привела к сломанному DAG:

Оказалось, что с моим DAG все в порядке, мне нужно только обновить рабочую область, чтобы преодолеть проблемы с веб-сервером из-за синтаксического анализа DAG.

Таким образом, если есть неработающая проблема с DAG, и вы уверены, что это не из-за вашего кода, вы можете попробовать:

  • Используйте кнопку обновить на главном экране DAG, которая позволяет перезагружать DAG вручную.
  • Введите python -c "from airflow.models import DagBag; d = DagBag();" в командной строке, чтобы вручную обновить DAG.
  • Введите /opt/airflow/start.sh в командной строке, чтобы снова запустить веб-сервер Airflow.
  • Для тех, кто использует рабочее пространство проекта в Udacity, если ничего не работает, обновите рабочее пространство с помощью панели меню.

Refresh Workspace скопирует все ваши файлы на новый компьютер, поэтому каждая проблема, связанная с анализом файлов, может быть решена. После завершения моего проекта Airflow я должен признать, чтоRefresh Workspace является моим окончательным решением многих Broken DAG проблем.

Примечание об отладке сломанного DAG

Пользовательский интерфейс Airflow может уведомить о том, что у вас сломалась группа DAG, однако проблема с вашей DAG не будет отображаться. Подробные проблемы в сломанной группе DAG можно было увидеть, вручную перезагрузив DAG с помощью python -c "from airflow.models import DagBag; d = DagBag();" или снова запустив веб-сервер Airflow с помощью /opt/airflow/start.sh в командной строке. Ниже представлена ​​демонстрация этого:

Совет 4. Для всех задач с AWS используйте IaC (инфраструктура как код) на ноутбуке Jupyter.

Ресурсы AWS можно настраивать и контролировать либо с помощью консоли AWS, либо с помощью IaC (Infrastructure-as-code) . Я предпочитаю подход IaC, поскольку он помогает разработчикам легко автоматизировать, поддерживать, развертывать, реплицировать и совместно использовать сложные инфраструктуры. Существует 3 варианта IaC на AWS:

  • Скрипты AWS-CLI: похожи на скрипты bash.
  • Формирование Amazon Cloud с помощью описания JSON для всех ресурсов, разрешений и ограничений.
  • AWS SDK: больше возможностей, возможность интеграции со многими приложениями. Пользователи Python могут использовать boto3, Python SDK, для программного доступа к AWS.

Мы могли бы использовать IaC для создания, запуска кластера Amazon Redshift и распечатать всю необходимую информацию о конфигурации для подключения Airflow, такую ​​как host, schema, login_user, password и т. Д., Не щелкая мышью и не выполняя поиск в консоли AWS. Я нашел это чрезвычайно удобным и экономящим время, особенно когда нам нужно завершить работу кластера / очистить ресурсы, а затем создать, запустить, снова настроить кластер в другой день, чтобы сократить расходы на AWS.

Полный процесс настройки и очистки Amazon Redshift и других ресурсов AWS, а также демонстрация создания простого конвейера ETL хранилища данных на AWS с помощью Airflow можно найти в моем репозитории Github.

Совет 5. Настройте соединение воздушного потока программно.

Для запуска DAG с сервисом AWS вы можете использовать пользовательский интерфейс Airflow для настройки соединения, например для настройки учетных данных AWS, подключения к Redshift и т. Д.

Однако эти подключения могут быть потеряны при остановке сервера или при обновлении рабочего пространства. Чтобы избежать ненужного сбоя, рекомендуется перед запуском групп DAG проверить, доступны ли эти подключения.

Во время разработки DAG вручную повторно настраивать эти соединения Airflow, снова и снова, это хлопотный и подверженный ошибкам процесс, поэтому я предпочитаю программно настроить Airflow Connection как файл сценария python. При необходимости вы можете запустить сценарий из командной строки. Вот простой скрипт Python для настройки соединений Airflow:

В качестве хорошей практики вместо того, чтобы помещать все учетные данные AWS и информацию Redshift непосредственно в сценарий, при создании кластера Redshift (как описано в совете 4) вы можете экспортировать их в файл .cfg, а затем использовать Python packageconfigparser для получения необходимая информация.

Большое вам спасибо за то, что прочитали этот пост. Удачи в вашем проекте, и я более чем рад любым вопросам и обсуждениям.

Блокнот Jupyter, коды, файл .cfg и т. д. для этого поста, а также других руководств по науке о данных можно найти на моем Github.