Придать искру вашему алгоритму

от Йорга Шнайдера и Йенса Ортманна

Кластерные вычисления быстро набирают обороты во всех отраслях. Все больше и больше компаний получают доступ к распределенным вычислительным мощностям в облаке. Некоторые даже создают свои собственные кластеры. Хотя это открывает огромные новые возможности для комплексного анализа, возникает вопрос, как продолжить работу с существующими алгоритмами: как можно ускорить существующие алгоритмы в кластере?

Подготовка алгоритма для распределенных вычислений может привести к значительным улучшениям во время выполнения (в 10–100 раз быстрее). Также может потребоваться списание локальных серверных решений и консолидация выполнения обработки данных и алгоритмов в кластере. Загвоздка в том, что алгоритм, разработанный для работы на одном компьютере, не работает эффективно в кластере. Для этого его необходимо перенести на Spark или аналогичный фреймворк.

В этой статье мы сосредоточимся на случае, когда алгоритм реализован на Python с использованием распространенных библиотек, таких как pandas, numpy, sklearn. Эти методы могут применяться аналогично алгоритмам, написанным на R. Далее мы предполагаем, что вычисления в кластере выполняются в Spark.

О полной повторной реализации алгоритма обычно не может быть и речи. Это требует дополнительной работы и новых навыков, подвержено ошибкам, и его сложно проверить, поскольку репликация 1: 1 исходной реализации невозможна.

Вместо этого алгоритм часто развертывается как есть и работает на одном узле кластера. Обычно это выполнение происходит на граничном узле, который является наиболее доступным узлом в кластере. Однако это создает ненужную нагрузку на граничный узел и оставляет неиспользованными доступные ресурсы в кластере (фактически, это противоречит самой парадигме распределенных вычислений).

В качестве лекарства мы обсуждаем здесь два простых, но эффективных метода переноса существующего алгоритма на Spark:

1. Программирование пользовательской функции Spark на Python (PySpark UDF): UDF Spark подходят для задач с ограниченной сложностью и небольшим количеством зависимостей, но с большим числом повторений.

2. Разделение набора данных для многопроцессорной обработки с поддержкой кластера: этот метод подходит, когда набор данных может быть разделен, а подмножества могут быть независимо запланированы для вычислений.

Эти два метода позволяют алгоритму использовать доступные ресурсы в кластере, что значительно улучшает время выполнения. На практике эти методы можно даже комбинировать.

Основы кластерных вычислений с Hadoop, Hive и Spark

В этом первом разделе мы представляем высокоуровневое введение в Hadoop, Hive и Spark, поскольку они имеют отношение к этой статье. Читатели, знакомые с этими темами, могут пропустить.

Apache Hadoop - это фактическая стандартная среда для кластерных вычислений и крупномасштабной аналитической обработки. Его основные компоненты (начиная с Hadoop 2.0):

· Еще один посредник ресурсов (YARN): YARN - это система управления кластером, предназначенная для распределения ресурсов, планирования заданий и отслеживания заданий.

· Распределенная файловая система Hadoop (HDFS): HDFS хранит файлы на узлах кластера. Физическое местоположение скрыто от пользователя, и данные хранятся с избыточностью. Однако существует простой синтаксис папки / пути для доступа к файлам в HDFS.

Физически кластер Hadoop состоит из серверов, называемых узлами. Их можно разделить на следующие типы:

· Граничный узел (также называемый узлами рабочего процесса). Граничный узел действует как точка входа, к которой пользователи могут подключаться и откуда отправлять задания. Граничные узлы не выполняют задания YARN.

· Узел имени (также называемый головным узлом): узел имени запускает службу диспетчера ресурсов YARN и службу HDFS. Для обеспечения высокой доступности стандартной практикой, начиная с Hadoop 2.0, является запуск второго узла имени в качестве аварийного переключения.

· Узел данных (также называемый рабочим узлом): узлы данных предоставляют хранилище для HDFS и вычислительные ресурсы (ядра ЦП и память) для заданий YARN.

Вычисления в кластерах Hadoop выполняются с помощью механизмов обработки, которые используют основные компоненты YARN и HDFS. Hive и Spark - самые актуальные на сегодняшний день фреймворки:

· Apache Hive хранит семантику схемы, таблицы и столбца. Hive позволяет запрашивать данные в HDFS на диалекте SQL. Таким образом, в качестве механизма обработки он более сопоставим с механизмом базы данных.

· Apache Spark - это механизм выполнения, который может работать в Hadoop. Внутренние задания Spark выполняются на Java и Scala (выполняются на JVM), но есть API для Python и R (PySpark и SparkR). Spark предоставляет собственную реализацию фреймов данных с функциональностью, аналогичной Pandas в Python, но фреймы распределяются, а не локально. С фреймами данных Spark вычисление выполняется там, где находятся данные (доставка запроса), тогда как в pandas данные идут туда, где находится (локальное) выполнение (доставка данных). Поскольку данные отправляются и сериализуются во время преобразования, преобразование фрейма данных Spark в фреймворк Pandas может занять значительное время.

Программирование пользовательской функции искры

Наименее инвазивный способ использовать кластерные вычисления для алгоритмов - это написать собственную пользовательскую функцию PySpark (UDF). Исходя из Python, думайте об этом как о векторизации или лямбда-применении, использующем Spark: небольшая часть алгоритма заключена в автономную функцию, которая регистрируется в Spark и оценивается на ваших данных. Сама UDF работает с настолько высокой степенью распараллеливания, насколько позволяют ресурсы и данные вашего кластера. Используя этот подход в реальных проектах, мы добились увеличения времени непрерывного выполнения в 10–100 раз.

Чтобы использовать этот подход, можно полагаться на следующие шаги:

1. Выберите кандидата (ов) UDF: Какая часть вашего алгоритма требует больших вычислений, но может быть изолирована от остальных? Это должна быть часть, которая выполняется много раз (для разных данных) или может быть структурирована как таковая.

2. Определить параметры и вернуть тип данных. Фреймы данных Spark поддерживают типы данных, отличные от Python. Числовые примитивы, логические значения и строки по умолчанию работают на обеих платформах. Сложные типы данных (например, объекты) могут использоваться путем объединения их в строку в кодировке base64. Строка может быть передана как примитив, а исходные данные могут быть восстановлены внутри UDF (аналогично, если из UDF возвращается сложный тип).

3. Определите область данных UDF: UDF Spark Python будет скалярной функцией, которая оценивается построчно во фрейме данных Spark. Каждая строка должна содержать уникальный идентификатор. В большинстве сценариев чисто скалярная UDF очень ограничена. Таким образом, передача сложных данных, закодированных в виде строки, является изящным способом передачи множества массивов, векторов или собственных объектов.

Spark в версии ›= 2.3 поставляется с пользовательскими функциями Pandas (также называемыми векторизованными пользовательскими функциями), которые полагаются на Apache Arrow. Они ускоряют обмен данными и предлагают как скалярную, так и групповую оценку. По нашему опыту, несколько развертываний Hadoop уже используют Spark 2.3, а также содержат пакет Arrow.

4. Управление зависимостями. UDF будет выполняться распределенно на узлах данных, находящихся вне вашего контроля. Если вашему UDF требуются библиотеки Python, они должны быть доступны на каждом узле данных или отправлены вместе с заданием Spark в виде zip-файла. UDF не должен взаимодействовать с локальной файловой системой (единственное исключение: временные файлы) или сетью. Попробуйте извлечь из него эти части.

Чтобы проиллюстрировать, когда и как можно применять PySpark UDF, мы построили пример на основе набора данных о такси Нью-Йорка (все поездки в 12/2018, когда клиенты давали чаевые). Мы очистили данные; извлекли некоторые характеристики, такие как день недели и час поездки, расстояние поездки и продолжительность поездки; и преобразовал размер чаевых в три категории: низкие, нормальные и высокие. Наконец, мы сгруппировали поездки по их зонам такси (в Нью-Йорке около 260 таких зон) и продолжили работу только с теми маршрутами (сбор, высадка), по которым было совершено не менее 500 поездок.

Это дало нам примерно 4,5 миллиона поездок примерно в 2000 группах. Полученные данные выглядят следующим образом:

Теперь представьте, что мы хотим вычислить что-то индивидуально для каждой группы - процесс, который требует больших вычислительных ресурсов. Например, каждому таксисту нравятся хорошие чаевые, поэтому давайте посчитаем, где и когда водитель может их ожидать. Для этого мы могли бы обучить и протестировать дорогостоящий классификатор (например, SVM), который нацелен на прогнозирование категории подсказок на основе других атрибутов. (SparkML в настоящее время не поддерживает нелинейную SVM, как показано здесь, поэтому полный переход на Spark не является тривиальным.)

Предположим, мы определили это вычисление как функцию Python, созданную для локального выполнения. Он охватывает все этапы от масштабирования данных до проверки и, когда это сделано, возвращает измеренную точность, а также построенный объект классификатора:

Обычно мы бы теперь использовали эту функцию для всех данных, например:

Это вычисление занимает около 60 минут на компьютере (стандартный ноутбук 2018 года), поскольку сложность SVM квадратична по отношению к входным данным.

К счастью, это идеальный кандидат для использования PySpark UDF!

Мы начинаем с определения PySpark UDF, который инкапсулирует все, что в настоящее время делает train_and_test (). Для этого нам нужно сериализовать сложные объекты Python и закодировать их как строки, чтобы их можно было передать в PySpark. Для этого мы определяем следующие две вспомогательные функции (используя пакеты base64 и pickle):

Функция UDF, которую будет использовать PySpark, объявлена ​​следующим образом:

Обратите внимание, что мы просто повторно используем train_and_test (): все, о чем нам нужно позаботиться, это преобразование нашей матрицы функций в массив numpy (который мы делаем внутри UDF, чтобы избежать необходимости также кодировать его перед загрузкой в ​​PySpark) и кодирование результатов (показатель точности и обученный объект классификатора).

Затем PySpark UDF можно зарегистрировать, указав на ранее определенную функцию Python и объявив ее тип возвращаемого значения:

Начиная снова с фрейма данных Pandas «model_data_df», мы можем обучать и тестировать классификаторы с помощью PySpark UDF следующим образом:

Запуск измененного кода на нашем компьютере (локально на Spark, даже не в кластере!) Уменьшает время выполнения блока кода на 80% (12 минут вместо 60) - и это включает время для преобразования данных из Pandas в фрейм данных Spark и обратно. Поскольку необходимо построить около 2000 моделей, потенциал дальнейшего повышения производительности за счет большего распараллеливания в кластере намного больше. С другой стороны, логика внутри UDF может быть изменена для получения лучших результатов модели, то есть выполнения более дорогостоящего поиска параметров.

Существуют некоторые ограничения на использование пользовательских функций PySpark для перемещения алгоритма в Spark. Если между процессами Python и Spark необходимо обмениваться большим объемом данных, это создает накладные расходы, и объединенная среда выполнения для передачи данных может перевесить улучшения среды выполнения от распараллеливания. Как правило, это относится к проблемам, которые больше связаны с вводом-выводом, чем с процессором.

Этот подход работает особенно хорошо, когда есть набор автономных шагов, которые необходимо выполнить, и когда вычислительные затраты высоки по сравнению с необходимостью обмена данными. Например, пользовательские функции Spark могут быть отличным выбором для:

- Временные ряды: например, для прогнозирования затрат на детали или отдельные счета. Каждая строка должна содержать данные временного ряда для части или учетной записи.

- Применение обученной модели машинного обучения: например, для определения вероятности оттока. Каждая строка содержит полный набор функций клиента.

- Настройка функции: например, для определения надежности детали или машины (прогнозируемое качество). Каждая строка содержит информацию об истории отказов.

- Анализ рыночной корзины: например, для анализа товаров, покупаемых по каждой транзакции.

Распределенные вычисления с поддержкой кластера на секционированных наборах данных

В этом разделе мы покажем, как YARN и PySpark могут быть подходящими в качестве фреймворка для упаковки существующего алгоритма Python для распределенных вычислений в кластере Spark - без необходимости полностью переписывать Spark.

Многопроцессорность - это стандартный подход к ускорению кода Python. Идея состоит в том, чтобы разделить последовательный процесс на параллельные подпроцессы. Главный процесс управляет разделением и объединением подпроцессов. Локально этот подход ограничен количеством процессов, которые может обрабатывать ЦП (или ГП). Распределенные вычисления воплощают этот подход в кластере с доступом к нескольким серверам и их процессорам. Это обещает значительный выигрыш во времени выполнения, хотя при использовании нескольких серверов возникает дополнительная сложность: необходимо управлять ресурсами в кластере, передавать данные, отслеживать процессы и перезапускать вышедшие из строя процессы.

С помощью следующих шагов вы можете упаковать существующий алгоритм Python и отправить его для параллельного выполнения в кластер:

1. Определите часть вашего алгоритма, подходящую для параллельной обработки

Если в вашем алгоритме есть подзадачи, которым не нужен полный доступ к входным данным, эта часть является хорошим кандидатом для параллельной обработки. Представьте, что у вас есть данные о продажах и запасах для восьми рынков, и вы тренируете прогнозную модель для каждого рынка. Модельное обучение одного рынка не зависит от других. Вместо того, чтобы тренировать модели одну за другой, вы можете тренировать их одновременно. При распараллеливании всего обучения модели процесс занимает примерно столько же времени, сколько самый медленный процесс. Во многих случаях этапы предварительной обработки хорошо подходят для параллельной обработки.

2. Завершите процесс

Затем вам нужно указать текущую задачу как процесс с определенной сигнатурой и параметрами, которые описывают данные, необходимые для задачи, а также основную функцию, которая управляет разделением на подзадачи. Вы можете передать в процесс информацию о фильтре, чтобы процесс мог получить необходимое подмножество данных. Ввод и вывод процесса должны полагаться на HDFS (или на Hive, если на то пошло), чтобы обеспечить доступ через кластер.

3. Выполнять распределенно и параллельно.

Чтобы использовать подпроцессы, определенные выше, вам понадобится точка входа как часть вашего алгоритма Python, за которой следует первоначальная проверка данных. Например, алгоритм извлекает количество строк для каждого рынка, чтобы охарактеризовать разделы данных. Затем процесс (локальный) координатор порождает подпроцессы. Последнее выполняется с помощью операции spark-submit в кластерном режиме. Spark – submit позволяет отправить пакет кода на выполнение в кластер. Это ключевой аспект, делающий вычисления распределенными. В этом случае ресурсы управляются YARN, что позволяет масштабировать количество процессов настолько высоко, насколько позволяют ваш кластер Hadoop и данные. Процесс-координатор ожидает завершения подпроцессов и консолидирует результаты.

Теперь мы можем кратко представить шаги, описанные выше, в виде псевдокода. В конце концов, ваша конфигурация / реализация могут отличаться, а фактический код может стать слишком подробным.

Чтобы подготовить существующий алгоритм для распределенных вычислений, мы начинаем с определения единицы работы, которую должен выполнять каждый подпроцесс (1):

В рамках нашего потока данных нам нужен только один другой шаг - комбинация промежуточных результатов, созданных теперь параллельными выполнениями run_algo () (2):

Эти две настройки охватывают основную часть вашего алгоритма. Следующие дополнительные функции предназначены исключительно для их оркестровки для распределенного выполнения.

Чтобы получить оптимальное ускорение, мы не должны распределять единицы работы произвольно, а скорее, чтобы вспомогательная функция просматривала существующие группы в наших исходных данных (например, подсчетом строк) и возвращала список фильтров данных. Каждый фильтр данных позже будет отображаться в процессе и должен ориентироваться примерно на один и тот же объем данных, чтобы рабочая нагрузка была равномерной (3):

Чтобы запустить и обработать наши подпроцессы, нам нужно определить две дополнительные вспомогательные функции (мы рекомендуем реализовать их с помощью встроенного в Python модуля «subprocess») (4), ( 5):

Наконец, мы завершим процесс следующим образом:

· Определите сценарий оболочки, чтобы обернуть команду «spark-submit»: нам это нужно, чтобы отправить наш сценарий Python в либо client ( координатора) или кластера (подпроцесса), а также для обработки архивирования и прикрепления зависимостей. Это также полезно, если вам нужно установить некоторые другие настройки PySpark (память, ядра и т. Д.) Или переменные среды перед его запуском.

· Определите точку входа в нашем основном скрипте Python, чтобы различать логику координатора и подпроцесса.

В простейшей форме сценарий оболочки start_algo.sh выглядит так:

Точка входа в algo.py наконец связывает все воедино (E):

Многопроцессорность с поддержкой кластера позволяет вам ускорить работу вашего алгоритма настолько, насколько это может быть количество разделов, которые вы можете создать. Поскольку этот подход может охватывать большую часть алгоритма Python, можно повторно использовать большой объем существующего кода. Вам нужно только убедиться, что все необходимые библиотеки доступны (на каждом узле данных) в кластере, и что все данные читаются и записываются из / в HDFS. Поскольку вся тяжелая работа выполняется на узлах данных, а на граничном узле происходит только координация, использование распределенной вместо локальной многопроцессорной обработки в кластере позволяет избежать потенциальной перегрузки граничного узла.

Упаковывая код и используя spark-submit для отправки его в кластер для вычислений, YARN управляет параллельным выполнением ваших задач. Кластер уже позаботился о дополнительной сложности распределенных вычислений с минимальными усилиями со стороны пользователя.

Этот подход особенно хорошо работает, когда ваши данные имеют естественную группировку и алгоритм применяется к каждой группе индивидуально, например, когда у вас есть разные рынки для продукта или предложения, или у вас есть несколько производственных линий, сборочных линий или направлений бизнеса. Это также работает, когда вы применяете разные алгоритмы или алгоритмы с разными параметрами к одним и тем же данным.

Преимущества в реальных приложениях

В недавнем проекте BCG GAMMA мы применили оба метода. Мы изменили критически важные и трудоемкие части существующего алгоритма, чтобы воспользоваться преимуществами кластера, на котором он был развернут. Благодаря многократному повторному использованию существующего кода алгоритма, алгоритм остался практически таким же, что сделало результаты до и после модификации сопоставимыми и облегчило дальнейшее обслуживание алгоритма.

Модификации сократили этап проектирования функций с шести часов до менее одного и сократили время выполнения алгоритма с 30 до менее трех часов. В целом, он сократил время выполнения с 1,5 дней до того, что можно легко вычислить утром, тем самым значительно улучшив работу процессов внутреннего аудита.

Примечательно, что улучшение среды выполнения все еще ограничивалось внутренними ограничениями на то, сколько процессов может запускать пользователь. При необходимости администратор кластера может предоставить больше процессов, что еще больше снизит время выполнения.

В итоге: эффективное промежуточное решение.

В этой статье мы представили два метода, которые можно использовать для использования кластерных вычислений, сохраняя при этом большую часть существующей кодовой базы неизменной. Обычно полная повторная реализация во фреймворке, таком как PySpark, чище и, следовательно, предпочтительнее. Проблема заключается в том, что для достижения этого процесса требуется больше времени и усилий, а также увеличивается риск того, что миграция приведет к другому или ошибочному алгоритмическому поведению - поведению, которое затем необходимо будет контролировать с помощью обширной проверки. При рассмотрении их предпосылок и ограничений представленные нами методы могут служить мощным компромиссом для разблокировки вычислительных ресурсов кластера.