Архитектура для внедрения моделей машинного обучения в производство в NEW YORKER

Это сообщение изначально появилось в Техническом блоге NEW YORKER.

Один из наиболее эффективных вариантов использования машинного обучения в розничных компаниях - это уценка. NEW YORKER, один из крупнейших розничных продавцов модной одежды в Европе, устанавливает начальную цену для каждого товара, продаваемого в наших 1100 магазинах в 45 странах. Цена постепенно снижается в серии снижений цен до тех пор, пока товар не будет распродан. Цель состоит в том, чтобы предсказать, когда и как снизить цену, чтобы весь инвентарь продавался по максимально высокой цене. Таким образом, максимизация доходов компании.

Решение этого варианта использования включает две задачи: создание модели, которая обеспечивает точные прогнозы, и внедрение ее в производственный конвейер в нашей текущей инфраструктуре.

В этом посте мы сосредоточимся на последней задаче и представим архитектуру для оркестровки конвейеров машинного обучения в производственной среде на Apache Mesos и Hadoop с использованием Airflow. Мы начнем с требований к модели машинного обучения, а также требований, предъявляемых нашей текущей инфраструктурой и архитектурой инженерии данных. Затем мы представляем архитектуру, которая выполняет эти задачи, включая контейнеризацию, конвейерную оркестровку с помощью Airflow, соображения производительности и упрощенный код DAG. В заключение мы подведем итоги и сделаем прогноз на будущую работу.

Требования

Начнем с требований модели, которую мы хотим реализовать, наряду с требованиями, вытекающими из нашей текущей инфраструктуры и архитектуры инженерии данных.

обучающие данные для модели хранятся в виде файлов Parquet в HDFS в нашем кластере Hadoop. Они являются результатом обширной предварительной пакетной обработки данных с помощью Spark, включая очистку, агрегацию и условное исчисление данных, где это необходимо. Этот устаревший ETL еще не согласован с Airflow и использует Luigi для управления рабочим процессом и Cron Jobs для планирования.

Модель написана на Python, и используемый фреймворк машинного обучения должен выбираться нашими специалистами по данным. В настоящее время используется LightGBM. Обучение и выводы делаются для каждой страны - характеристика, которую мы можем использовать позже для распараллеливания. Поддержка графических процессоров важна, поскольку наши специалисты по данным также рассматривают подходы к глубокому обучению, которые от этого выиграют.

Полученные в результате прогнозы модели необходимо записать на уровень персистентности нашей системы ERP - реляционную базу данных. Прогнозы служат руководством для нашего отдела ценообразования, который проверяет их через пользовательский интерфейс ERP на достоверность и применяет предлагаемые снижения цен там, где и когда считает их целесообразными. Поскольку прогнозы меняются очень медленно, необходимо обновлять их один раз в начале каждой недели.

Помимо вышеупомянутого кластера Hadoop, в нашей инфраструктуре используется кластер DC / OS. Он предоставляет нам оркестровку контейнеров и планирование ресурсов на основе Apache Mesos. Графические процессоры можно использовать в контейнерах с помощью определений Marathon application и Metronome job для длительно и коротко работающих приложений. У нас есть контейнерная установка Airflow, работающая на DC / OS для оркестровки большинства наших конвейеров данных.

Архитектура

Для выполнения требований, изложенных выше, мы придумали следующую архитектуру.

Контейнеризация

Мы решили поместить обучение модели и вычисление логических выводов в контейнер и использовать DC / OS для их запуска. У этого есть два преимущества. Во-первых, он сохраняет независимость от структуры машинного обучения, выбранной нашими специалистами по данным. Во-вторых, мы можем использовать существующие графические процессоры в кластере DC / OS для ускорения будущих подходов к глубокому обучению.

Если бы мы решили использовать SparkML или распараллелить другую платформу машинного обучения с помощью Spark на Hadoop, нам пришлось бы перенести существующую модель LightGBM на этот подход. Тогда мы бы по сути заблокировали используемую структуру машинного обучения. Кроме того, чтобы упростить использование графических процессоров, нам потребовалось бы установить графические процессоры в кластере Hadoop или переместить их из кластера DC / OS в него. Также потребовалось бы обновление версии Hadoop до 3.x с нынешней версии 2.x. Все это было бы нетривиально и отнимало бы очень много времени.

Сам контейнер имеет следующий рабочий процесс:

Вначале он загружает данные обучения из HDFS. Затем он обучает модель на ней и вычисляет прогнозы для данной страны. Наконец, прогнозы обновляются в базовой реляционной базе данных ERP.

Обновление прогнозов выполняется за одну транзакцию, т. Е. Удаляются старые прогнозы для всех элементов для одной страны и записываются новые. Здесь мы не используем обновления, так как набор прогнозов может меняться со временем. Таким образом, пользователи ERP всегда будут видеть прогнозы в пользовательском интерфейсе для выбранного набора элементов без простоев.

Мы параметризовали контейнер, чтобы он мог рассчитывать и обновлять прогнозы не только для одной страны, но и для определенной группы стран.

Трубопроводная оркестровка

Имея параметризованный контейнер для обучения и вывода модели, теперь мы можем приступить к развертыванию и оркестровке нескольких его экземпляров, чтобы сформировать правильный конвейер с помощью Airflow:

Для краткосрочных приложений, то есть заданий, DC / OS предоставляет Metronome в качестве планировщика задач поверх Mesos. Поскольку наш контейнер недолговечен по своей природе, мы можем использовать Metronome для его развертывания в кластере DC / OS. Мы делаем это из Airflow с помощью MetronomeOperator, о котором мы вскоре расскажем более подробно.

Мы запускаем несколько экземпляров MetronomeOperator для групп стран, чтобы охватить все 45 стран, тем самым эффективно распараллеливая работу.

Поскольку устаревший ETL, который обрабатывает необходимые данные для обучения в Hadoop, еще не согласован с Airflow, мы используем модифицированный датчик HDFS, чтобы запускать экземпляры MetronomeOperator. Исходный датчик Airflow HDFS работал только с Python 2. Мы внесли некоторые небольшие изменения, чтобы он работал с версией Python 3, которую мы используем в нашей контейнерной установке Airflow.

Его код можно найти здесь:
https://github.com/NewYorkerData/ny-public-airflow-operators

Мы планируем запуск конвейера одновременно с устаревшим ETL на Hadoop. Датчик HDFS будет постоянно опрашивать успешный файл и запускать оставшиеся операторы в DAG, когда файл будет найден.

Примечание. В рамках производственного процесса мы также записываем прогнозы в HDFS для дальнейшего анализа. Во время вычисления мы регистрируем все шаги в Elasticsearch. У нас есть информационные панели Kibana, и в настоящее время мы используем elastalert для мониторинга и предупреждения об аномалиях, например количество прогнозов уменьшилось на 10% по сравнению с прошлым запуском.
Перед тем, как продвигать изменения в производство, мы используем Нептун как часть конвейера тестирования, чтобы отслеживать и оценивать влияние различных изменений на саму модель.

Оператор воздушного потока метронома

Насколько нам известно, в начале проекта для Airflow не существовало оператора метронома. Итак, мы создали его.

Он принимает определение задания метронома в качестве параметра JSON и работает, как показано на рисунке ниже:

Сначала мы получаем токен авторизации от IAM REST API, чтобы иметь возможность развертывать контейнеры в кластере DC / OS. Во-вторых, мы проверяем, существует ли задание, через Metronome REST API. В зависимости от ответа мы подтверждаем определение должности. Затем приступаем к работе. Наконец, мы опрашиваем статус задания и, в зависимости от ответа, меняем статус оператора Airflow на УСПЕШНО или НЕУДАЧНО.

Код оператора можно найти здесь:
https://github.com/NewYorkerData/ny-public-airflow-operators

Соображения производительности

В определении задания Metronome мы можем указать использование ЦП, ОЗУ и графического процессора для контейнера. Поэтому мы можем масштабировать по вертикали. Что касается горизонтального масштабирования, мы делаем это, распараллеливая обучение модели и вывод для групп стран. Однако степень параллелизма не безгранична и ограничена доступными ресурсами кластера. Чтобы не использовать все ресурсы, мы ограничиваем количество одновременно работающих контейнеров с помощью функции пула ресурсов Airflow. Имя пула ресурсов задается во всех экземплярах MetronomeOperator, как мы увидим в приведенном ниже коде DAG. Таким образом, размер пула ресурсов равен степени параллелизма.

Реализация как Airflow DAG

Ниже вы можете найти упрощенную версию Airflow DAG:

default_args = {
    "owner": "ny-data-science",
    "start_date": datetime(2020, 1, 1),
    "provide_context": True
}
dag = DAG(
    "markdown-pricing-dag",
    schedule_interval="0 8 * * 1", # every Monday at 8am
    dagrun_timeout=timedelta(days=1),
    default_args=default_args,
    max_active_runs=1,
    catchup=True
)
# Example list of countries for the model training and inference
COUNTRY_LIST = ["DEU", "AUT", "NLD", "FRA", "ESP", "ITA"]
# Job template we will use in the MetronomeOperator
METRONOME_JOB_TEMPLATE = """
{{
  "id": "{}",
  "description": "Markdown Pricing ML Job",
  "run": {{
    "cpus": 4,
    "mem":  32768,
    "gpus": 0,
    "disk": 0,
    "ucr": {{
      "image": {{
       "id": "registry-dns-address:1234/markdown-pricing:latest-master",
       "forcePull": true
        }}
     }},
     "env": {{
        "COUNTRY_GROUP": "{}",
        "DB_USER": "{}",
        "DB_PASSWORD": "{}"
     }}
   }}
}}
"""
# Creates a Metronome job on DC/OS by instantiating the MetronomeOperator with the job template above
# and setting the country group and other environment variables
def create_metronome_job_for_country_group(country_group, index):
    return MetronomeOperator(
        task_id=f"metronome_operator_{index}",
        metronome_job_json=
        	METRONOME_JOB_TEMPLATE.format(f"markdown-job-{index}",
                                          country_group,
                                         Variable.get("markdown_erp_db_user"),                                          Variable.get("markdown_erp_db_pwd")),
        dcos_http_conn_id="dcos_master",
        dcos_robot_user_name=Variable.get("robot_user_name_dcos"),
        dcos_robot_user_pwd=Variable.get("robot_user_pwd_dcos"),
        dag=dag,
        pool="markdown_metronome_job_pool",
        retries=3
    )
# Get the resource pool size (in slots) for the MetronomeOperator instances from Airflow configuration
metronome_job_pool_size = get_pool_size(pool_name="markdown_metronome_job_pool")
# Split the country list into groups into N parts of approximately equal length for parallelization purposes.
# N is here the size of the Metronome job pool.
# Given the COUNTRY_LIST defined above and N = 3, the function will return: [["DEU","AUT"], ["NLD","FRA"], ["ESP","ITA"]]
country_groups = split_country_list(COUNTRY_LIST, metronome_job_pool_size)
# Iterates through the country groups and creates a Metronome job for each of those groups
metronome_countries_jobs = [create_metronome_job_for_country_group(country_group=country_group, index=index) for
                            index, country_group in enumerate(country_groups)]
# HDFS sensor on the latest training data
training_data_sensor = NYHDFSSensor(
    task_id="training_data_sensor",
    filepaths=[f"/data/production/markdown_training_data/{get_current_date()}/_SUCCESS"],
    hdfs_conn_id="hdfs_conn_default",
    retries=1440,
    retry_delay=timedelta(minutes=1),
    timeout=0,
    dag=dag)
# Create DAG
training_data_sensor >> metronome_countries_jobs

Заключение и дальнейшие шаги

Мы показали архитектуру, которая позволяет нам легко управлять конвейерами машинного обучения в смешанной кластерной среде Mesos и Hadoop с помощью Airflow. Мы используем оркестровку контейнеров и планирование ресурсов Mesos для масштабирования обучения и вывода модели как по горизонтали, так и по вертикали, при этом имея возможность воспользоваться преимуществами доступного аппаратного ускорения, обеспечиваемого графическими процессорами. Мы используем мощные функции Airflow, такие как датчики и динамические группы доступности базы данных, для эффективного управления всем рабочим процессом в кластерах.

Есть еще много областей улучшения, над которыми мы можем работать в будущем.
Например, миграция устаревшего ETL на Hadoop с Luigi / Cron на Airflow позволит нам отказаться от датчика HDFS и упростить DAG за счет использования взаимозависимости конвейеров Airflow. функция.
Мы также знаем, что некоторые вычисления являются общими для всех стран. Этот шаг можно выделить в отдельный этап, от которого будут зависеть все последующие этапы, что, возможно, повысит производительность.

Наконец, мы готовимся к миграции наших кластеров с Mesos на Kubernetes. В краткосрочной перспективе нам нужно будет настроить все наши конвейеры машинного обучения в Airflow. Теоретически это должно быть относительно легко выполнить, например используйте KubernetesOperator вместо MetronomeOperator в DAG. В более долгосрочной перспективе Kubernetes открывает новый мир возможностей с точки зрения операционных подходов к машинному обучению, таких как Kubeflow.