Apache Airflow - это инструмент с открытым исходным кодом для создания сложных рабочих процессов и управления ими. В последнее время Airflow набирает популярность среди специалистов по данным для управления рабочими процессами машинного обучения. С увеличением использования мы можем видеть рост ожиданий пользователей. Как и все пользователи, мы ожидаем, что инструменты будут надежными, масштабируемыми и будут работать из коробки. Сообщество Apache Airflow работает над улучшением всех этих аспектов. Недавние совместные усилия Databand и Polidea принесли много улучшений производительности в ядро ​​Airflow.

Почему? Устранение неэффективности воздушного потока

Airflow - это большой проект с множеством функций и движущихся частей. Проблемы с производительностью воздушного потока часто начинают появляться, когда вам нужно запустить сотни рабочих процессов с десятками задач. Это серьезное препятствие для масштабирования Airflow для крупных сценариев использования. Производительность планировщика Airflow была проблемой для пользователей, которым еще многое нужно улучшить. Почему? Вероятно, основная причина - сложность и взаимосвязь внутренних устройств ядра Airflow. Для понимания кодовой базы планировщика с подпроцессом синтаксического анализа DAG требуются недели. Причина этого - сложность модели предметной области, которая включает в себя структуры в памяти и модели баз данных, которые тесно связаны, и легко забыть, что есть что.

Как? Наш подход

Во-первых, наша работа над производительностью Airflow была не первой. Один из последних сделал Ash Berlin-Tylor (Apache Airflow PMC). Эш сосредоточился на повышении производительности выполнения кода Python. В наших статьях мы сосредоточились на времени, когда переводчик был в режиме ожидания. Нас особенно интересовало время, когда планировщик ожидает ответа от базы метаданных Airflow. Чтобы исследовать это, мы просто зарегистрировали прослушиватель событий в движке SQLAlchemy, который помог нам измерить количество запросов, выполняемых функцией или процессом, и, что более важно, время этих запросов.

Поначалу цифры, которые мы получили, были невероятными. Например, запустив DagFileProcessor процесс в 200 группах DAG по 10 задач в каждой, мы наблюдали 1801 запрос! Это число определенно несоразмерно настройке. Мы просмотрели журналы и обнаружили, что некоторые запросы выполнялись много раз. Следующим естественным шагом было найти места, где они используются, и проанализировать, можно ли что-то улучшить. Что ж, это было несложно.

Проблемы особенно заметны в запросах на вытягивание, связанных с DAG.is_paused attribute. Этот атрибут указывает, следует ли создавать новую группу DAG Run для данной группы DAG. Код этого атрибута выглядит так. На первый взгляд проблемы не возникает.

@provide_session
def _get_is_paused(self, session=None):
    qry = session.query(DagModel).filter(
        DagModel.dag_id == self.dag_id
    )
    return qry.value(DagModel.is_paused)
@property
def is_paused(self) -> bool:
    """
    Returns a boolean indicating whether this DAG is paused
    """
    return self._get_is_paused()

Однако это стало серьезной проблемой для производительности. Когда этот атрибут использовался в коде, не было видно, что его чтение вызывает запрос к базе данных. Разработчики привыкли, что чтение атрибутов - это несложная операция и не вызывает никаких проблем. В данном случае это неверно.

Проблема N + 1 - это ситуация, когда данные обрабатываются в цикле, и для каждой итерации цикла выполняется другой запрос к базе данных. Эта ситуация может выглядеть так.

paused_dag_ids = {
    dag.dag_id for dag in dagbag.dags.values() if dag.is_paused
}

Фрагмент кода для DagBag, который содержал 200 групп DAG, привел к 200 запросам и мог быть заменен 1 запросом к базе данных. Стоит помнить, что эта проблема может возникнуть и при обновлении объектов. Когда мы используем ORM, иногда все же необходимо подумать, будет ли операция обновления эффективной, или мы должны переписать код и прекратить повторение цикла и выполнить один ручной UPDATE запрос.

Airflow - это приложение, использующее множество процессов для обеспечения высокой производительности. Следовательно, отследить, требуется ли данный запрос, непросто, потому что значение, сохраненное в объекте и переданное другому объекту, требует более глубокого понимания большей части кода. В Airflow есть не только недолговечные объекты, которые умирают после обработки запросов от пользователя, как в классических веб-приложениях, но и объекты, сохраняющие состояние в течение длительного времени.

Избегайте регресса

Последний вывод из нашей истории производительности Airflow - как избежать регресса. Поскольку после повышения производительности мы хотим избежать любых ненужных или необоснованных изменений, которые имеют негативное влияние. Для этого мы создали несколько дополнительных тестов, которые выполняют операцию в диспетчере контекста (код доступен на Github), который считает запросы, например:

with assert_queries_count(3):
    DAG.bulk_sync_to_db(dags)

Благодаря этим тестам мы сможем отследить изменения, которые потенциально могут повлиять на производительность Airflow. Такие тесты критически важны в проектах с открытым исходным кодом, таких как Apache Airflow, потому что участники приходят и уходят, а знания не всегда передаются.

Результат

Мы сделали 7 PR, связанных с решением этих задач. Мы также подготовили другие изменения, которые предотвращают снижение производительности или позволяют нам легко повторить наши исследования на другом элементе Airflow.

Наконец, для файла DAG, который имеет следующие характеристики:

  • 200 объектов DAG в одном файле
  • У всех DAG 10 задач
  • Интервал расписания установлен на «Нет» для всех групп DAG.
  • У задач нет зависимостей

Когда мы тестируем метод DagFileProcessor.process_file, мы получаем следующие результаты:

Перед (фиксация):

  • Количество запросов: 1801
  • Время обработки DAG: 8 275 мс

После (фиксация):

  • Подсчитать запросы: 5
  • Время обработки DAG: 814 мс

Разница:

  • Подсчет запросов: -1 796 (-99,7%)
  • Время обработки: -7 461 мс (-90%)

Что это значит? Меньше запросов! Это приводит к увеличению скорости всего планировщика и влияет на общую производительность Airflow! В наших условиях время анализа DAG увеличилось в 10 раз. Это изменение значительно снизило нагрузку на базу данных Airflow. Конечно, это не означает 10-кратное ускорение выполнения задач. Изменения доступны только в основной ветке и будут частью Airflow 2.0.

Проблема с N + 1 запросами довольно легко обнаруживается и обычно решается. Некоторые инструменты могут помочь вам обнаружить это, но зарегистрировать прослушиватель событий в ядре базы данных - самый простой из них. Однако вместо того, чтобы решать эту проблему, мы должны попытаться ее избежать. Все ORM имеют простые в использовании пакетные операции, и мы должны использовать их, когда это возможно. Случай с воздушным потоком показывает, что многочисленные мелкие недостатки приводят к большим узким местам. Который после удаления повысит производительность вашего инструмента.

Авторы

Камил Брегула - инженер-программист Polidea | Коммиттер Apache Airflow
Томек Урбашек - инженер-программист в Polidea | Коммиттер Apache Airflow
Евгений Шульман - технический директор Databand