В нашем последнем сообщении в блоге о машинном обучении мы обсудили, как мы используем машинное обучение для удовлетворения наших потребностей в прогнозировании спроса в Blue Apron. В этом посте мы сделаем шаг назад от теории, лежащей в основе этих моделей, и подробно рассмотрим, как мы их построили, подготовили к производству и масштабировали. Попутно мы обсудим наш путь к созданию платформы машинного обучения и наши знания, полученные в ходе его итеративного процесса.

Разработка локальной модели

Все наши канонические источники данных курируются и загружаются в BigQuery, наше хранилище данных нашими группами операций с данными и моделирования данных. Чтобы изучить эти данные и разработать наши модели, мы загружаем меньшие выборочные подмножества этих источников данных BigQuery на наши локальные машины. Как и многие специалисты по обработке данных и инженеры по машинному обучению, мы используем Pandas для загрузки и управления этими наборами данных и Scikit-learn для подготовки наших функций и построения наших моделей. Используя широко распространенные методы и пакеты, мы были уверены, что разрабатываем наши модели прогнозирования спроса наилучшим образом.

Когда мы были удовлетворены способностями наших моделей прогнозировать будущий спрос, мы были готовы начать использовать их в наших регулярно публикуемых прогнозах объемов рецептур. Для этого нам нужно было автоматизировать обучение и использование наших моделей. Нам нужно было внедрить их в производственную среду. Однако вскоре мы поняли, что менее уверены в том, что означает производство в контексте машинного обучения - мы не уверены в шагах и компонентах, необходимых для выхода из стадии разработки.

Вертикального масштабирования недостаточно

Наша первая попытка выпуска наших моделей в производство заключалась в простой упаковке нашего кода Python, разработанного на месте, и его передаче в вычислительный экземпляр. Мы передали наши меньшие выборочные подмножества данных в модели в этом экземпляре и использовали cron для планирования обучения и выполнения модели. Хотя собрать воедино это было тривиально, мы быстро обнаружили, что такой подход неприемлем. Вот некоторые из проблем, с которыми мы столкнулись:

  1. Одно обучение нашей модели заняло более 12 часов. Это сделало итерацию наших моделей очень медленной и сложной. Это также привело к катастрофическим последствиям для единственного сбоя, так как решение проблемы заняло бы не менее 12 часов.
  2. Наши модели не использовали весь наш набор данных. Это означало, что они принесли в жертву точность моделей из-за недостаточной подгонки. Мы попытались увеличить размер экземпляра, чтобы учесть больше данных, но как только мы достигли максимума на самом большом доступном экземпляре, у нас не было вариантов.
  3. Наш cron при настройке EC2 был нестабильным. У нас не было надежного способа отслеживать запуски предыдущих моделей или сбои. Отсутствие мониторинга и предупреждений затрудняло отладку проблем и сжигало много часов разработчика.

Чтобы решить эти проблемы, у нас не было выбора, кроме горизонтального масштабирования - нам нужно было перенести наши проекты машинного обучения на платформу распределенных вычислений. Мы изучили различные предложения и пришли к выводу, что Apache Spark является отраслевым стандартом в этой области. Помимо предоставления инфраструктуры, которая использует кластеры машин для параллельного разделения и обработки заданий, Spark предлагает несколько встроенных библиотек, которые хорошо подходят для наших нужд. Для манипуляций с табличными данными мы использовали Spark DataFrames в качестве масштабируемой замены Pandas. А для конвейерной обработки и моделирования машинного обучения мы использовали Spark ML, библиотеку, написанную для масштабирования и распараллеливания алгоритмов машинного обучения. Эти библиотеки доступны через Python API, известный как PySpark Spark, который отлично подходит для набора навыков нашей команды. Более того, API-интерфейсы Spark были очень последовательными и знакомыми с API-интерфейсами наших ранее используемых наборов инструментов машинного обучения, поэтому перевод между ними был чрезвычайно простым.

Наша платформа машинного обучения

После перевода наших моделей на использование Spark DataFrames и Spark ML нам нужно было определить, где запускать эти задания PySpark. У нас не было ресурсов для создания и обслуживания собственных кластеров Spark, поэтому мы изучили облачные предложения. Уже приняв Google BigQuery и Google Cloud Storage (GCS), мы сочли естественным использовать Google Dataproc для управления всеми нашими потребностями Spark. С Dataproc запуск кластера Spark сводился к простому указанию типа машины и количества машин. Запуск задания Spark в кластере был таким же простым и требовал только расположения яйца Python, файла драйвера точки входа и любых аргументов, необходимых для точки входа.

Dataproc работал особенно хорошо, потому что он позволял нам использовать GCS в качестве нашей распределенной системы хранения файлов для Spark. Используя коннектор Google Cloud Storage на Dataproc, команды чтения и записи Spark могут напрямую принимать путь к файлу GCS, что упрощает передачу данных. Извлечение функций из наших таблиц BigQuery в GCS требовало не больше усилий, чем запрос SQL, поэтому заполнение GCS данными не представляло проблемы. Использование GCS в качестве централизованного хранилища для всех наших наборов функций позволило использовать эти функции во всех без исключения будущих моделях. Нам это показалось настолько привлекательным, что мы даже создали внутреннюю библиотеку, чтобы предоставить и расширить возможности доступа ко всем нашим существующим функциям. (Ищите будущий пост в блоге, в котором я расскажу об этом!)

Наконец, чтобы запланировать эти задания, мы используем Airflow, наш менеджер оркестровки рабочих процессов. Airflow поставляется с логикой повторных попыток, журналированием, предупреждениями и другими инструментами, которые повышают прозрачность и надежность наших заданий. Сообщество Airflow проделало потрясающую работу с хуками и операторами, так что интегрировать Airflow с Dataproc было проще простого. Мы используем отдельные группы Airflow DAG (ориентированные ациклические графы) для организации каждого из наших заданий по созданию функций и модели. В каждой модели DAG мы используем несколько операторов PySpark для модулирования отдельных компонентов конвейера модели.

Spark Learning

Распараллеливание с помощью Spark - мощный инструмент для машинного обучения. Однако есть много общих ошибок, которые следует учитывать и избегать.

Входные данные осколков для включения распараллеливания

Первоначальное распараллеливание заданий Spark с использованием GCS определяется форматом разделения источника входных данных. (Примечание: это не тот случай, когда используется HDFS. При использовании HDFS количество разделов по умолчанию определяется количеством блоков HDFS.) Следовательно, если ваш исходный источник данных находится в одном файле, ваше задание Spark будет ограничено до использование одного исполнителя для обработки этого файла. Мы узнали, что при извлечении данных из BigQuery в GCS нам нужно было сегментировать вывод по нескольким файлам - это можно сделать, указав путь вывода с завершающим символом подстановки. С самого начала сегментирование позволит вам в полной мере использовать возможности распараллеливания.

Кэшировать повторно используемые кадры данных

Еще одна проблема, с которой мы столкнулись, заключалась в том, что мы забыли кэшировать повторно используемые DataFrames. Поскольку Spark DataFrames оцениваются лениво, использование одного и того же DataFrame в нескольких последующих вызовах вызовет несколько запусков восходящего конвейера, который его создал. Кэширование DataFrame перед его повторным использованием позволит обойти это расточительное и трудоемкое повторное вычисление.

Избегайте Python UDF

Пользовательские функции PySpark (UDF) полезны для определения пользовательских функций, когда ваша логика недостаточно хорошо определена с помощью API-интерфейсов Spark DataFrame. Однако их следует использовать с осторожностью, потому что при написании на Python UDF и его входные данные должны переводиться туда и обратно между интерпретатором Python и JVM, создавая значительно большие накладные расходы на сериализацию. Хотя они могут сделать ваш код более чистым, по нашему опыту, UDF в PySpark были настолько неэффективными, что полностью заморозили наши конвейеры или значительно снизили производительность.

Последние мысли

Выгоды от перевода наших моделей на Spark огромны. Наши недавно переписанные модели Spark-ified теперь могли обучаться по всем нашим наборам данных, что привело к повышению показателей точности. Разделив наши рабочие нагрузки на более мелкие параллелизуемые задачи, Spark также резко сократил время вычислений с более 12 часов до одного часа. Кроме того, производственная экосистема, которую мы создали с помощью Dataproc и Airflow, сделала сбои более очевидными, а расследование стало более безболезненным.

Создание масштабируемой платформы машинного обучения стало огромной победой для нашей команды. С момента запуска наших моделей прогнозирования спроса мы также использовали эту платформу для создания нашей первой персонализированной системы рекомендаций по рецептам. Мы обнаружили, что использование этой платформы сделало наши модели более обширными и мощными. Мы обнаружили, что это сделало наши модели более эффективными, резко сократив время, необходимое для их обучения, запуска и итерации. И в конечном итоге мы обнаружили, что это сделало нашу команду более дальновидной в отношении инструментов, которые мы можем продолжать развивать, чтобы поддерживать создание более эффективных продуктов машинного обучения в нашей инженерной организации.