Инжиниринг признаков является важной частью любого рабочего процесса машинного обучения (ML), поскольку он позволяет создавать более сложные модели, чем с использованием только необработанных данных, но он также является одним из самых сложных в управлении. Он страдает от языкового барьера — разницы в языках, используемых для кодирования логики обработки. Проще говоря, специалисты по данным определяют свои вычисления функций на одном языке (например, Python или SQL), и инженерам данных часто приходится переписывать эту логику на другом языке (например, Scala или Java).

Мой коллега Майк затронул причины этого в предыдущей статье «Преодоление разрыва между учеными данных и инженерами в рабочих процессах машинного обучения», но я хочу подробно остановиться на том, что именно влечет за собой этот процесс, и изучить некоторые идеи о том, как устранить какое-то трение.

Когда команды сталкиваются с языковыми трениями?

Эта проблема начинает возникать по мере того, как компании взрослеют в своем уровне сложности данных. Внутреннее машинное обучение не стоит даже рассматривать, пока у компании не будет надежного конвейера данных для предоставления моделей обучающими данными.

Однако по мере того, как доступность и качество данных постепенно улучшаются, группы обработки данных начинают создавать более сложные конвейеры пакетной обработки, включающие машинное обучение. Модели машинного обучения обучаются в автономном режиме, и выходные данные могут начинаться как артефакты, такие как файлы CSV, которые оцениваются людьми, прежде чем переходить к другим типам, таким как метки классов в случае моделей классификации.

Преобразования функций, а также конвейеры обучения и логического вывода, написанные специалистами по данным, обычно не оптимизированы для скорости, поэтому инженеры машинного обучения часто переписывают их, чтобы они работали быстрее. Переписывание логики для разработки функций — это первое, на чем следует искать прирост производительности.

Как только автономный конвейер машинного обучения достигнет стабильного состояния, многие компании будут стремиться использовать эти данные для более непосредственного улучшения своего продукта. Это часто приводит к тому, что модели машинного обучения интегрируются в архитектуры приложений, чтобы, например, веб-приложения могли адаптироваться к требованиям клиентов в режиме реального времени.

Таким образом, машинное обучение как дисциплина должно превратиться из экспериментального, спорадического, автономного процесса в повторяемый процесс доставки программного обеспечения. Файлы модели должны быть развернуты онлайн и своевременно давать результаты. Точно так же код вычисления признаков от специалистов по данным необходимо адаптировать для производственной среды, чтобы вычисления могли выполняться в режиме онлайн. Это позволяет моделям делать прогнозы на основе новых функций.

Именно на этом этапе влияние языковых трений становится более серьезной проблемой:

Вместо того, чтобы объяснять теорию и лучшие практики разработки функций, я хотел бы проиллюстрировать языковой барьер на примере сценария.

Пример сценария: разработка функций для прогнозирования рынка с помощью ИИ.

Одно из наиболее изученных, но загадочных применений машинного обучения — его использование для прогнозирования движения определенных финансовых рынков. Поскольку прогнозы могут влиять на движение цены, большинство организаций держат свои модели прогнозов в тайне. Однако некоторые торговые приложения экспериментируют с той или иной формой предсказания на основе ИИ. Это особенно распространено в торговле криптовалютой, где все торговые данные общедоступны в блокчейне.

Например, в торговом приложении SwissBorg есть «CyBorg Predictor» на основе машинного обучения, который прогнозирует движение цен на определенные активы.

Это канонический пример, когда прогнозы машинного обучения в реальном времени могут принести ощутимую ценность для бизнеса (при условии, что прогнозы точны!), поэтому он прекрасно подходит для анализа онлайн-разработки функций.

Итак, предположим, что вы работаете в многообещающем приложении для торговли криптовалютой, которое хочет представить аналогичную функциональность.

Ключевыми функциями, необходимыми для обучения модели машинного обучения, являются точки данных OHLC: цены открытия, максимума, минимума и закрытия для заданного временного окна. Эти данные обычно визуализируются в виде свечного графика, который трейдеры используют для технического анализа.

Очевидно, что существуют сервисы, предоставляющие предварительно вычисленные данные OHLC, но ради аргумента предположим, что вы хотите обучить модель на функциях, которые вычислили самостоятельно. Я хочу пройтись по процессу переноса этой функции из автономного исследовательского сценария в производственный сценарий в реальном времени.

Следовательно, этот сценарий имеет два раздела: прототип и производство. Обратите внимание, что это упрощение: на самом деле здесь задействовано больше фаз (для более подробной информации я настоятельно рекомендую статью Чипа Хьюена Машинное обучение в реальном времени: проблемы и решения). Однако в целях объяснения языкового барьера я хочу, чтобы все было просто.

Прототипирование в автономном режиме с помощью Python

В первой итерации вашей модели машинного обучения вы можете сосредоточиться на одной или двух валютах, таких как ETH или биткойн. При создании прототипа модели вы можете обучить модель в автономном режиме на исторических торговых данных и протестировать ее на предмет точности прогноза.

Допустим, у вашего специалиста по данным есть файл JSON с некоторыми образцами исторических данных тикера (в идеале он имеет ту же структуру JSON, что и данные, которые будут поступать из прямой торговой ленты).

Предположим, что они используют Python для прототипирования, они могут сначала рассчитать 1-часовые данные OHLC ETH следующим образом:

import pandas as pd
import json

# Load raw ticker data from the JSON file
with open('ticker_data.json', 'r') as file:
    ticker_data = json.load(file)

# Convert ticker data to a pandas DataFrame
ticker_df = pd.DataFrame(ticker_data)

# Only keep rows with "product\_id" equals "ETH-USD"
eth_usd_ticker_df = ticker_df[ticker_df["product_id"] == "ETH-USD"]

# Convert the time column to pandas datetime
eth_usd_ticker_df['time'] = pd.to_datetime(eth_usd_ticker_df['time'])

# Set the time column as the DataFrame index
eth_usd_ticker_df = eth_usd_ticker_df.set_index('time')

# Calculate the OHLC data based on a 1-minute interval
ohlc_df = eth_usd_ticker_df['price'].astype(float).resample('1H', origin='start').agg(
    {
        "open": "first",
        "high": "max",
        "low": "min",
        "close": "last",
    }
)

# Calculate the volume data based on a 1-minute interval
volume_df = eth_usd_ticker_df['last_size'].astype(float).resample('1H', origin='start').sum()

# Combine OHLC and volume data
ohlc_volume_df = pd.concat([ohlc_df, volume_df], axis=1)

print(ohlc_volume_df)

Этот скрипт разделит торговые данные на фиксированные интервалы в 1 час, как показано ниже.

Эти данные подходят для прототипирования модели машинного обучения или предоставления пакетных долгосрочных прогнозов, но не подходят для детальных прогнозов в реальном времени. Цены могут сильно колебаться даже в течение 1 часа, поэтому вам нужно отслеживать их по мере их возникновения. Это означает размещение модели машинного обучения в сети и объединение исторических данных с потоком торговых данных в реальном времени.

Вычисление функций онлайн с помощью Java

Теперь предположим, что вы адаптировали модель для использования функций, которые представляют собой комбинацию:

  • 1-часовые интервалы за последние 30 дней.
  • 1-минутные интервалы для текущего дня.
  • Скользящее окно последних 60 секунд, обновляющееся каждую секунду.

Вы хотите разместить эту модель в сети, чтобы она предоставляла поток прогнозов, которые обновляются в режиме реального времени. Эти прогнозы можно использовать для заполнения панели мониторинга в реальном времени или для включения автоматических торговых ботов.

Это требует значительного рефакторинга расчетов OHLC. На этот рефакторинг влияет ряд факторов, которые способствуют возникновению так называемого языкового барьера, замедляющего рабочие процессы машинного обучения.

Эти факторы следующие:

Задержка и пропускная способность

Теперь запрос должен выполняться на непрерывном неограниченном потоке данных, а не на таблице. Кроме того, необходимо поддерживать определенный уровень пропускной способности, чтобы прогнозы не устаревали. Для этого требуется специально созданный механизм потоковой обработки, способный поддерживать пропускную способность при больших объемах торговых данных.

Apache Flink — один из самых популярных вариантов для таких случаев использования, и, хотя он поддерживает SQL, многие разработчики предпочитают писать логику обработки с использованием API-интерфейсов более низкого уровня Flink. Вычисления выполняются быстрее при прямом доступе к этим API (вместо использования уровня абстракции, такого как PyFlink или SQL).

@Override
public Tuple5<Double, Double, Double, Double, Integer> merge(Tuple5<Double, Double, Double, Double, Integer> a, Tuple5<Double, Double, Double, Double, Integer> b) {
    return new Tuple5<>(
        a.f0,                                   // Open (min)
        Math.max(a.f1, b.f1),                   // High
        Math.min(a.f2, b.f2),                   // Low
        b.f3,                                   // Close (latest value)
        a.f4 + b.f4                             // Volume
    );
}

Отрывок математических операций после рефакторинга во Flink.

Различные зависимости

Если вы собираетесь переводить с SQL или Python на Java для Flink, вам также потребуется импортировать различные зависимости, которые должны быть доступны в среде выполнения. Если вы создали пользовательскую функцию в форме UDF, вам необходимо убедиться, что она также упакована с заданием и развернута в кластере Flink.

import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import java.util.Properties;

Выдержка из всех дополнительных зависимостей, необходимых после рефакторинга кода в Java.

Источники и приемники данных в реальном времени

Теперь для вычисления данных OHLC в скользящем окне запрос должен использовать другой источник данных. Вместо того, чтобы подключаться к базе данных и запрашивать таблицу, процесс должен работать с какой-то очередью сообщений, которая обычно является темой Kafka.

Таким образом, необходимо добавить много «кода коннектора», чтобы процесс:

  • Подключается к брокеру сообщений Kafka.
  • Считывает необработанные данные из одной темы и записывает результаты во вторую тему.
  • Эффективно сериализует и десериализует данные.

Также требуется дополнительный код соединителя для записи самих значений функций в онлайн-хранилище функций, например Redis.

// Create Kafka consumer properties
Properties consumerProps = new Properties();
consumerProps.setProperty("bootstrap.servers", "myserver:9092");
consumerProps.setProperty("group.id", "flink-ohlc-group");

// Create Kafka producer properties
Properties producerProps = new Properties();
producerProps.setProperty("bootstrap.servers", "myserver:9092");

Небольшой отрывок из расширенной конфигурации Kafka, необходимой для Flink.

Оконные агрегации и управление состоянием

На этапе прототипирования вы, возможно, уже приступили к тестированию вычислений скользящего окна, но вы, вероятно, использовали бы словарь в памяти для хранения состояния. Это прекрасно работает на одном компьютере. Однако при переходе к производственной среде вам потребуется использовать механизм обработки, который поддерживает общее состояние отказоустойчивым образом. Вот почему многие компании выбирают Apache Flink, известный своей надежной обработкой с отслеживанием состояния в распределенной вычислительной среде.

Если реплика процесса каким-то образом завершается, когда она находится в середине вычисления данных OHLC для скользящего окна, может прийти другая реплика и продолжить с того места, где остановился предыдущий процесс, потому что этапы вычислений постоянно записываются в общее хранилище.

// Calculate the OHLC data for each ticker over a 30-second sliding window
DataStream<Tuple5<String, Double, Double, Double, Double>> ohlcStream = tickStream
    .keyBy(tick -> tick.ticker)  // Group by ticker
    .timeWindow(Time.seconds(30), Time.seconds(1))  // Sliding window of 30 seconds with 1 second slide
    .aggregate(new OhlcAggregator());

Отрывок из расчета скользящего окна с использованием Flink DataStream API в Java.

Как видите, рефакторинга много. И я даже не коснулся других изменений процесса, таких как добавление функции в каталог функций, взаимодействие с онлайн-магазином функций, тестирование, развертывание и мониторинг расчета онлайн-функций.

Но переписывание кода только сверху вниз может замедлить путь функции от прототипа до производства.

Решения языкового барьера

Если эта проблема настолько повсеместна, как ее решают крупные игроки? Оказывается, Netflix, Uber, DoorDash создали свои собственные сложные платформы функций, которые управляют функциями, а также потоковой и пакетной обработкой. У них по-прежнему есть проблема с переводом объектов, но они могут автоматизировать процесс перевода для обычных вычислений.

Унифицированные функциональные платформы

Следующая таблица взята из другой блестящей работы Чипа Хьюена, на этот раз Платформы функций самообслуживания: архитектуры и API. Это показывает, сколько проприетарных специально созданных функциональных возможностей платформы уже существует в дикой природе. Обратите внимание, что функции обычно по-прежнему определяются на нескольких языках.

Сравнение функциональных платформ

Однако не у каждой компании есть время или ресурсы для создания собственной функциональной платформы. Теперь, когда все больше компаний переходят на более поздние этапы модели зрелости машинного обучения, растет спрос на более простые комплексные решения, которые помогают преодолеть языковой барьер и устранить сложность инфраструктуры.

В настоящее время существуют общие функциональные платформы, такие как Tecton (проприетарная) и Feahr (с открытым исходным кодом), цель которых — обеспечить тесную синхронизацию пакетного и потокового кода при выполнении самой фактической обработки. Этого само по себе достаточно, чтобы сократить время производства. Когда в апреле 2022 года LinkedIn объявила, что открывает исходный код Feathr, они сообщили, что сократили время разработки, необходимое для добавления новых функций и экспериментов с ними, с недель до дней.

Tecton идет дальше и устраняет головную боль, связанную с необходимостью предоставления дополнительной инфраструктуры (при условии, что у вас есть Databricks, Amazon EMR или Snowflake, настроенные как автономное хранилище функций). Они обеспечивают комплексную платформу для управления, хранения и вычисления онлайн- и офлайн-функций.

Следующий снимок экрана от Tecton должен дать вам общее представление о том, как работают эти функциональные платформы.

По сути, вы храните варианты одного и того же преобразования функции в одной «записи» вместе с некоторыми переменными конфигурации, которые влияют на оценку преобразования. Соединения с внешними источниками, такими как Kafka, определяются в других местах конфигурации Tecton, поэтому между кодом преобразования и кодом потокового транспорта существует четкое разделение задач.

Предостережения

Такие системы по-прежнему предназначены для компаний, которые достаточно продвинулись в своей зрелости в области машинного обучения. В некотором смысле они предназначены для предотвращения повторного создания крупными предприятиями своих собственных специализированных функциональных платформ (хотя многие все еще делают это). По этой причине эти платформы все еще довольно сложны, вероятно, потому, что они должны удовлетворять очень специфические требования многих предприятий со зрелыми командами MLOps. Если вы начинаете с ограниченным набором функций, существует риск того, что дополнительная сложность может компенсировать экономию времени, которую вы получаете за счет более структурированного конвейера управления функциями.

Другая проблема заключается в том, что они все еще используют Spark или Flink для выполнения потоковой обработки, а это означает, что код все еще транслируется или «транспилируется» на каком-то уровне. Tecton, например, использует Spark Structured Streaming для потоковой обработки. Собственный API Spark написан на Scala, поэтому, как и в случае с Flink, Python API — это просто оболочка вокруг собственного API, поэтому его использование может привести к дополнительной задержке. Кроме того, Spark Structured Streaming использует модель микропакетной обработки, которая обычно имеет более высокую задержку по сравнению с системами потоковой передачи, управляемыми событиями, такими как Apache Flink или Kafka Streams. В нем также отсутствуют встроенные функции обработки сложных событий (CEP), которые предлагают другие платформы, такие как Apache Flink.

Однако не для каждого приложения требуется CEP или обработка с очень малой задержкой (доли секунды или миллисекунды), поэтому в большинстве случаев с этой задачей справятся потоковые процессоры, встроенные в эти функциональные платформы.

Но что, если вам нужно более простое решение, которое дает вам более прямой контроль над логикой обработки потоков и при этом не требует от специалистов по обработке и анализу данных разбираться с Java или Scala? Вот где в игру вступает другой тип решения — чистые фреймворки потоковой обработки Python.

Фреймворки потоковой обработки Pure Python

Фреймворк потоковой обработки на чистом Python может позволить специалистам по обработке и анализу данных создавать прототипы с потоковыми данными на самых ранних этапах процесса. Они делают это, упрощая подключение к Kafka и выполнение типичных операций, которые вы выполняете с неограниченным потоком (например, агрегирование скользящих окон). Исследователь данных может по-прежнему сначала строить свою логику на пакетном наборе данных, но становится очень просто адаптировать ту же логику для потоковых данных. Это уменьшает языковой барьер, потому что тот же код прототипа можно использовать в рабочей среде с минимальным рефакторингом. В идеальном сценарии специалисты по данным могут также использовать Python для определения рабочих процессов обработки. Многие функции необходимо вычислять в несколько этапов, поэтому специалисты по обработке и анализу данных получают больше автономии при определении рабочих процессов, а также самой логики преобразования.

Например, Faust и Bytwax — это чистые среды обработки потоков Python, которые можно использовать в сложных конвейерах обработки.

Фауст

Исходный код Faust был открыт Robinhood в 2018 году, и с тех пор он был передан сообществу открытого исходного кода.

Когда он был впервые выпущен, Фауст выглядел очень многообещающе. Например, команда инженеров Robinhood опубликовала убедительный пост в блоге о том, как они использовали Faust в сочетании с Apache Airflow для создания лучшей системы новостей. Они использовали команды Faust через Airflow для непрерывного извлечения данных из различных источников (таких как RSS-каналы и агрегаторы), а также использовали Kafka для хранения результатов каждого шага обработки. Faust также поддерживает масштабируемую обработку с отслеживанием состояния с помощью так называемых таблиц с отслеживанием состояния и может быть настроен только на однократную обработку с помощью параметра processing_guarantee.

Однако похоже, что Робингуд бросил Фауста. Непонятно, почему именно, но на Reddit было множество спекуляций. Теперь существует форк оригинального репозитория Faust от Robinhood, который более активно поддерживается сообществом открытого исходного кода. Тем не менее, в нем все еще есть много открытых ошибок, которые мешают некоторым командам (см. этот обзор фреймворков потоковой обработки для получения более подробной информации об этих ошибках).

байтвакс

Bytwax намного новее, запущен в начале 2021 года и с открытым исходным кодом в феврале 2022 года, но быстро набирает обороты благодаря тому, что он имеет открытый исходный код и очень удобен для специалистов по данным. В отличие от Faust, Bytewax стремится стать полной платформой обработки потоков и включает в себя функциональные возможности, позволяющие специалистам по данным создавать свои собственные потоки данных — другими словами, конвейеры обработки, которые включают несколько шагов, которые могут быть представлены в виде узлов на графике.

Фактически, пример сценария OHLC, который я представил ранее, был вдохновлен учебным пособием, в котором используется простой поток данных Bytewax для чтения данных из веб-сокета Coinbase и записи значений функций OHLC в хранилище функций (Hopsworks).

Предостережения

Учитывая, что официальный репозиторий, похоже, заброшен, оговорки с Фаустом, надеюсь, должны быть ясны. Хотя форк Faust более активен, все еще неясно, когда будут исправлены некоторые из наиболее серьезных ошибок. Стоит отметить, что мы также столкнулись с этими ошибками при попытке провести бенчмаркинг с Faust (для нашей собственной библиотеки Python).

Bytewax все еще довольно новый, поэтому потребуется некоторое время, чтобы больше отчетов о том, как он работает в производстве, просочилось через экосистему. Однако когда дело доходит до его развертывания, вам все равно придется иметь дело с некоторой инфраструктурной сложностью — по крайней мере, на данный момент (у них есть управляемая платформа в разработке). Глядя на их документацию по развертыванию, становится ясно, что они ожидают от читателей определенных знаний об инфраструктуре, в которой будет размещаться логика потоковой обработки. Вы можете запускать потоки данных в локальных контейнерах Docker, в Kubernetes, экземплярах AWS EC2 или экземплярах виртуальной машины GCP. Все это требует работы по настройке и настройке, которая, вероятно, будет неинтересна специалисту по данным и, вероятно, лучше справится с ней дружелюбный (ML) инженер. Мы надеемся, что большая часть этой сложности исчезнет, ​​как только их платформа станет общедоступной.

Заключение

К настоящему времени должно быть ясно, что индустрия данных и машинного обучения хорошо осведомлена о языковом барьере, влияющем на разработку функций в рабочих процессах машинного обучения в реальном времени. Она существовала всегда, но исторически решалась с помощью внутренних решений, скрытых от общественности. Вывод в реальном времени на основе функций в реальном времени практиковался немногими избранными с очень специфическими требованиями, поэтому для них имело смысл создавать свои собственные решения. Теперь, когда все больше внимания уделяется ИИ, мы наблюдаем демократизацию многих аспектов рабочих процессов MLOps, и теперь существуют более стандартизированные подходы к преодолению языкового барьера, такие как функциональные платформы «все в одном» и чистые фреймворки потоковой обработки Python.

Хотя я сосредоточился на Faust и Bytewax, было бы упущением с моей стороны не упомянуть нашу собственную платформу Quix, на которой работает Quix Streams — наша библиотека обработки потоков с открытым исходным кодом. Модель обработки мало чем отличается от модели Bytwax, но вместо определения конвейеров данных в Python вы используете пользовательский интерфейс Quix Portal для объединения шагов преобразования (чтобы посмотреть, как это работает в производственной среде, см. этот пример телеметрии) . Платформа Quix также представляет собой полностью размещенное и управляемое решение, в котором используются Kafka и Kubernetes, что делает его почти бесконечно масштабируемым. Мы стремимся решить языковой барьер так же, как Faust и Bytwax, но мы также хотим устранить головную боль инфраструктуры. Однако инфраструктура — это совсем другая тема, которую я надеюсь затронуть в следующем посте. На данный момент я надеюсь, что мой простой пример сценария помог вам более подробно понять языковой барьер и вдохновил вас спланировать его, когда вы будете готовы погрузиться в обработку функций в реальном времени.