Машинное обучение | Спарк против Даска

Крупномасштабный конвейер вывода PyTorch: Spark vs Dask

Мы описываем наш опыт использования двух фреймворков распараллеливания, Spark и Dask, для выполнения крупномасштабного ежедневного пакетного вывода по моделям NLP с глубоким обучением.

В Clarity AI мы создаем различные продукты SaaS, с помощью которых наши клиенты могут оценить устойчивость своих инвестиционных портфелей. Одним из таких продуктов является наш Модуль противоречий. Благодаря этому у клиентов есть четкое представление о том, какие компании были подвержены разногласиям, связанным с одним из наших показателей устойчивости.

Чтобы сделать процесс обнаружения, классификации и присвоения степени серьезности разногласий по миллионам новостных статей эффективным, мы применяем методы искусственного интеллекта на основе НЛП.

Мы получаем новостные статьи от поставщика данных в режиме реального времени. Задание Fluentd сохраняет эту информацию в s3 в качестве нашего хранилища необработанных данных. Ежедневно мы получаем тысячи статей от этого провайдера. Хотя мы собираем статьи в потоковом режиме, наш продукт не требует обновлений в реальном времени. Вместо этого мы можем забивать пачками ежедневно.

Мы выявляем, классифицируем и присваиваем серьезность новостным статьям, связанным с устойчивостью компаний, по широкому спектру экологических и социальных выводов. В рамках этого процесса мы используем трансферное обучение от RoBERTa, модели глубокого обучения для НЛП. В этой статье мы делимся своим опытом перевода модели глубокого обучения из исследовательской в ​​повседневное крупномасштабное развертывание. Поэтому перевод этой большой модели в рабочую среду, чтобы ежедневно набирать тысячи статей, - непростая задача.

Первый подход: Искра

Нашим первым подходом было использование Apache Spark. Это де-факто стандарт в отрасли для рабочих нагрузок с большими данными. Наша инфраструктура находится в AWS, поэтому мы решили развернуть наши задания Spark через EMR.

Развертывание задания Spark

Рабочий процесс был следующим:

  1. Мы используем Airflow для создания кластера EMR.
  2. Airflow отправляет задание в EMR и ожидает его завершения.
  3. Когда работа заканчивается, мы разрушаем инфраструктуру EMR.
  4. Повторяйте ежедневно.

EMR 6.x также позволяет нам использовать контейнеры докеров для инкапсуляции зависимостей заданий. Это было очень полезно для нас, так как очень легко интегрируется с нашими системами сборки. Здесь вы можете найти пример Dockerfile, который мы используем, устанавливая наши зависимости с помощью pipenv.

Мы также добавили шаг для выполнения входа в докер в наш частный реестр. Затем вам нужно будет настроить правильные параметры конфигурации для команды spark-submit.

Также не забудьте добавить следующую конфигурацию в кластер EMR:

- Classification: container-executor
    Configurations:
    - Classification: docker
      Properties:
        docker.trusted.registries: local,centos,{your_docker_registry}
        docker.privileged-containers.registries: local,centos,{your_docker_registry}

Это позволит Yarn доверять изображениям контейнеров, поступающим из вашего реестра, а также реестру докеров по умолчанию в Docker Hub.

Код вывода Spark

Что касается самого процесса подсчета очков, мы основываем свою работу на примере подсчета очков, представленном Databricks.

Идея состоит в том, чтобы использовать PandasUDF для выполнения оценки. PandasUDF - это абстракции PySpark, которые позволяют пользователям применять функцию к фрейму данных Spark, в то время как функция получает на входе серию Pandas. PandasUDF применяется параллельно к каждому разделу DataFrame, поэтому, если ваши разделы имеют 2000 строк в каждом, функция получит серию из 2000 элементов. Это очень полезно, потому что позволяет нам использовать векторизацию pandas и PyTorch при оценке данных.

Мы также используем датасеты PyTorch. Это позволяет нам создавать пакеты данных для оценки. Но почему мы хотим группировать наши данные, когда мы уже работаем по разделам? Ответ заключается в том, что в противном случае это не сработает. Модель RoBERTa имеет разные размеры. Мы используем базовую модель с более чем 125 миллионами параметров. Это модель размером 1 ГБ, что означает очень большой объем памяти для выполнения оценки. Если в пакете слишком много элементов, мы столкнулись с множеством исключений нехватки памяти

Чтобы наборы данных PyTorch работали с моделями RoBERTa, мы расширяем класс, чтобы создать настраиваемую пакетную обработку, чтобы мы могли получить сразу два тензора, attention_mask и input_ids. Это входные данные для модели RoBERTa.

Окончательный код оценки выглядит так:

Ресурсы кластера

Одной из самых больших проблем для нас была точная настройка требований к памяти и вычислениям для работы. Мы потратили много времени на оптимизацию конфигурации для повышения производительности без сбоев. После множества проб и ошибок мы использовали следующую конфигурацию:

  • Флот из 25 5d.4xlarge и c5n.4xlarge спотовых экземпляров.
  • Размер партии 250 элементов.
  • Драйвер Spark: 2 ядра, 2 ГБ памяти и 1 ГБ служебной памяти.
  • Исполнитель Spark: 2 ядра, 13 г памяти и 1 г служебной памяти.

Проблемы и ограничения

В конце концов, наша первоначальная реализация конвейера подсчета очков с использованием Spark оказалась успешной, но это было чрезвычайно сложно. Отладка и тестирование кода, а также выполнение сквозных тестов занимали очень много времени.

Почему? Мы выявили следующие причины:

  • Самозагрузка EMR выполняется довольно медленно. Удивительно, что в настоящее время кластер можно запустить и запустить менее чем за 30 минут. Однако при отладке конфигураций Spark, требований к инфраструктуре и ведения журнала докеров нам иногда приходилось создавать кластеры по несколько раз в день. На это ушло очень много времени.
  • Экосистема Spark не интуитивно понятна (с точки зрения разработчика Python). Найти значимую информацию в журналах (и найти сами журналы) было очень сложной задачей. Логи JVM, Spark и Python запутались. Ошибки в пользовательском интерфейсе Spark и диспетчере ресурсов Yarn было очень сложно идентифицировать.
  • Процесс не был ни устойчивым, ни детерминированным. Несколько раз мы запускали процесс два или три раза с одними и теми же входными данными, и каждый раз у нас была разная производительность или сбои. Экосистема Spark и EMR добавили несколько уровней сложности, которые по какой-то причине создали очень ненадежные процессы.

Второй подход: Даск

Некоторое время мы поддерживали процесс Spark. Проблемы с производством были редкостью, но когда они возникали, мы тратили от дня до недели на их решение. Для нас было ясно, что поддерживать этот процесс слишком дорого, и команде не хватало опыта в экосистеме Spark для его дальнейшего улучшения.

Мы решили рискнуть с Даском. У нас есть несколько причин попробовать применить Dask к этому конвейеру:

  • Широко используется в Clarity AI. Это означает, что больше людей и команд имеют опыт работы с технологиями, а передача знаний упрощается, что способствует удобству обслуживания.
  • Лучшая интеграция с библиотеками и экосистемой Python. Мы пропускаем всю часть, касающуюся JVM, и отказываемся от Python.
  • Его можно развернуть поверх EMR с помощью Yarn. Это означает, что мы можем повторно использовать большую часть текущей реализации и инфраструктуры.

Мы не ожидали необходимого повышения производительности с Dask vs Spark, но само по себе повышение ремонтопригодности было отличным стимулом для пробной миграции.

Развертывание заданий Dask

Мы решили продолжать использовать EMR для развертывания заданий Dask. Мы использовали пакет Dask Yarn, который позволяет запускать рабочие нагрузки с помощью Yarn. У него очень похожий интерфейс на spark-submit, поэтому изменения в нашем конвейере были минимальными в этом отношении.

Основное различие между развертыванием Spark заключается в том, что мы больше не можем использовать контейнеры докеров для инкапсуляции зависимостей и пакетов Python. Вместо этого нам нужно использовать упакованную виртуальную среду. Для нас это было шагом назад, но в нем не было особых сложностей. Мы использовали venvpack, чтобы упаковать среду и загрузить ее на S3.

Dask Yarn также имеет локальный режим и кластерный режим. Хотя терминология похожа на локальный и кластерный режим Spark, они сильно различаются. В локальном режиме Spark вы запускаете исполнитель и диск вместе, поэтому вы ограничены одним экземпляром, что не имеет смысла в производственных условиях. В локальном режиме Dask Yarn будут запускаться Dask Scheduler и Dask Client там, где была запущена команда Dask yarn. В кластерном режиме планировщик и клиент будут работать как контейнеры Yarn. Мы обнаружили, что для Dask имеет смысл запускать рабочие нагрузки в локальном режиме. Таким образом, клиент и планировщик будут запускаться на главном узле, освобождая ресурсы в остальной части кластера, а Yarn нужно только выделять работников и управлять ими. Это привело к более быстрому распределению воркеров, потому что планировщик гарантированно будет активен при запуске воркера.

Затем мы создаем небольшой сценарий bash для запуска команды отправки и управления виртуальной средой.

Код вывода Dask

Одна из первых реализаций, которые у нас есть, заключается в том, что мы можем легко запускать код локально с помощью Dask. Подход состоит в том, чтобы просто создать экземпляр клиента Dask с другим типом кластера в зависимости от среды. Мы используем что-то вроде этого:

Мы нашли пример кода вывода в документации Dask. Однако мы не обнаружили, что этот пример подходит для нашего варианта использования. Он применяется для классификации изображений, что представляет собой отдельную проблему. В этом примере они используют отложенные функции и применяют каждую функцию для каждого изображения.

В итоге мы использовали функционал Dask DataFrame map_partitions. Поскольку каждый раздел Dask представляет собой DataFrame Pandas, это позволяет нам применять функцию, которая принимает DataFrame в качестве входных данных параллельно. Он был очень похож на PandasUDF в Spark, но более интуитивно понятен, поскольку мы взаимодействуем с примитивами первого класса в Dask.

Как и в случае со Spark, нам также нужно было распределить модели между рабочими перед его использованием. Для этого мы используем dask.delayed.

Ресурсы кластера

Мы смогли точно настроить ресурсы кластера и лучше выявить узкие места в процессе. Панель управления Dask была отличным средством диагностики. Наш последний кластер был:

  • Флот из 10 r5d.4xlarge спотовых экземпляров.
  • Размер партии 120 элементов
  • Настройки Dask: 2 рабочих ядра и 16 ГБ рабочей памяти.

Сравнение со Spark

Нашей главной целью было повышение ремонтопригодности работы, и в этом мы однозначно преуспели. Наша команда лучше понимает процесс и его внутреннее устройство, а экосистема упрощена. Код более понятен для большей части компании.

Панель управления Dask была отличным инструментом для отладки и понимания того, что происходило в процессе. Это позволило нам лучше настроить размер кластера при сохранении продолжительности задания. Благодаря использованию спотовых экземпляров оптимизация позволила сократить расходы на 61%. Честно говоря, этого, вероятно, можно было бы достичь с помощью Spark, если бы у нас были те же инструменты для диагностики профиля памяти процесса.

Обратной стороной является отсутствие постоянных интерфейсов для панели инструментов Dask. Это означает, что мы не можем использовать его, если задание завершилось неудачно и кластер разрушен. С другой стороны, с EMR у вас всегда есть постоянный интерфейс с пользовательским интерфейсом Spark.

Развертывание Dask также было проще, чем Spark. Нам просто нужно установить Yarn в кластере, что сокращает время начальной загрузки в EMR. После установки Yarn и dask-yarn у нас было все необходимое для его запуска и работы.

Выводы

Databricks, основная компания, поддерживающая Spark, опубликовала сравнение производительности Dask и Spark, используя в этом случае Spark с Koalas. Вы можете видеть, что в их анализе Spark - явный победитель. Однако не все дело в производительности.

Apache Spark - отличная технология, но она зависит от очень сложной экосистемы. Если у вас есть зрелая платформа для работы с большими данными, и ваша команда обладает опытом для ее обслуживания и эксплуатации, Spark всегда будет лучшим решением для ваших производственных задач. Особенно если вы покупаете облачное решение вроде Databricks. Кроме того, если вы более знакомы с экосистемой JVM и вашим основным языком программирования является Scala, Spark отлично подойдет вам.

Однако большинство команд по анализу данных являются экспертами в экосистеме и библиотеках Python. PySpark неплох, но это не собственный интерфейс Python с платформой. Dask предоставляет простую и масштабируемую структуру распределенных и параллельных вычислений, которую можно очень легко интегрировать в ваш текущий стек Python. И запустить его локально с помощью портативного компьютера или в кластере с одним узлом - тривиально. Ваша команда также получит пользу от интуитивно понятной диагностики и отладки, предоставляемой Dask. Мы считаем, что Dask Dashboard гораздо более интуитивно понятен и прост в использовании, чем пользовательский интерфейс Spark.

Итак, если вам нужно масштабировать текущие ETL и логический вывод Python с помощью простой структуры распараллеливания: взгляните на Dask.