Этот пост - знак признательности замечательному сообществу специалистов по науке о данных с открытым исходным кодом, которому я многим обязан тем, чему научился.

Последние несколько месяцев я работал над своим побочным проектом по разработке приложения машинного обучения для потоковой передачи данных. Это был отличный учебный опыт с многочисленными проблемами и большим объемом знаний, некоторыми из которых я попытался поделиться здесь.

Этот пост посвящен развертыванию моделей машинного обучения для потоковой передачи данных и охватывает все 3 необходимые области успешного производственного приложения: инфраструктуру, технологии и мониторинг.

РАЗРАБОТКА МОДЕЛИ МАШИННОГО ОБУЧЕНИЯ С ПОМОЩЬЮ SPARK-ML И СТРУКТУРИРОВАННОМ ПОТОКУ

Первым шагом для любого успешного приложения является определение технологического стека, в который оно должно быть написано, на основе бизнес-требований. Как правило, при большом объеме данных используйте Spark.

Основными преимуществами использования Spark являются его доказанная способность обрабатывать большие данные и доступность встроенных распределенных библиотек машинного обучения. Первоначальными проблемами при использовании Spark было использование RDD, которые были интуитивно понятными и сильно отличались от Dataframes, с которыми специалисты по данным привыкли работать. Однако с введением Dataset API в Spark2.0 теперь стало проще, чем раньше, кодировать алгоритм машинного обучения.

Во время своего опыта я обнаружил, что работа с моделями машинного обучения становится чрезвычайно простой при правильном использовании структуры «Конвейер». Конвейер предоставляет структуру, включающую все шаги, необходимые для обработки и очистки данных, обучения модели, а затем ее записи в виде объекта.

Затем этот объект можно напрямую импортировать для обработки новых данных и получения результатов, освобождая разработчика от процесса перезаписи и сохранения точной копии шагов обработки для новых данных, которые использовались для построения модели с обучающими данными.

В приведенном ниже фрагменте я попытался объяснить, как использовать этот API для создания, сохранения и использования моделей для прогнозирования. Для создания и сохранения модели можно использовать следующую структуру кода.

// Create a sqlContext
 var sqlContext = new SQLContext(sparkContext)
// Read the training data from a source. In this case i am reading it from a s3 location
var data = sqlContext.read.format(“csv”).option(“header”, “true”).option(“inferSchema”, “true”).load(“pathToFile”)
// Select the needed messages 
 data = data.select(“field1”,”field2",”field3")
// Perform pre-processing on the data 
 val process1 = … some process … 
 val process2 = … some process …
// Define an evaluator
val evaluator = … evaluator of your choice …
// split the data into training and test 
 val Array(trainingData, testData) = data.randomSplit(Array(ratio1, ratio2))
// Define the algorithm to train. For example decision tree 
 val dt = new DecisionTree()
 .setFeaturesCol(featureColumn).setLabelCol(labelColumn)
// Define the linear pipeline. Methods specified in the pipeline are executed in a linear order. Sequence of steps is binding
 val pipelineDT = new Pipeline().setStages(Array(process1, process2, dt))
// Define the cross validator for executing the pipeline and performing cross validation. 
 val cvLR = new CrossValidator()
 .setEstimator(pipelineDT)
 .setEvaluator(evaluator)
 .setNumFolds(3) // Use 3+ in practice
// Fit the model on training data 
 val cvModelLR = cvLR.fit(trainingData)
// extract the best trained pipeline model from the cross validator. 
 val bestPipelineModel = cvModelLR.bestModel.asInstanceOf[PipelineModel]
// Save the model in a s3 bucket 
 cvModelLR.write.overwrite().save(mlModelPath)

После сохранения модели ее можно легко использовать для прогнозирования потоковых данных, выполнив следующие шаги.

1. Чтение данных из темы Kafka

// Create a spark session object 
 val ss = SparkSession.builder.getOrCreate()
// Define schema of the topic to be consumed 
 val schema= StructType( Seq(
 StructField(“Field1”,StringType,true),
 StructField(“Field2”,IntType,true)
 )
 )
// Start reading from a Kafka topic
 val records = ss.readStream
 .format(“kafka”)
 .option(“kafka.bootstrap.servers”, kafkaServer)
 .option(“subscribe”,kafkaTopic)
 .load()
 .selectExpr(“cast (value as string) as json”)
 .select(from_json($”json”,schema).as(“data”))
 .select(“data.*”)

2. Загрузите сохраненную модель машинного обучения и используйте ее для прогнозирования

// Load the classification model from saved location
 val classificationModel = CrossValidatorModel.read.load(mlModelPath)
// Use the model to perform predictions. By default best model is used
 val results = classificationModel.transform(records)

3. Сохраните результаты в s3 или другом месте

В формате csv

// Saving results to a location as csv 
 results.writeStream.format(“csv”).outputMode(“append”)
 .option(“path”, destination_path) .option(“checkpointLocation”, checkpointPath)
 .start()

В формате паркет

// Saving results to a location as parquet 
 results.writeStream.format(“parquet”).outputMode(“append”)
 .option(“path”, destination_path) .option(“checkpointLocation”, checkpointPath)
 .start()

Или если мы хотим отправить результаты в какую-то базу данных или любые другие расширения

val writer = new JDBCSink(url, user, password)
 results.writeStream
 .foreach(writer)
 .outputMode(“append”)
 .option(“checkpointLocation”, checkpointPath)
 .start()

Для этой цели необходимо реализовать отдельный модуль записи, расширив интерфейс ForeachWriter с помощью структурированного потока Spark g. Ниже показан пример кода для jdbc, взятого из https://docs.databricks.com/_static/notebooks/structured-streaming-etl-kafka.html.

import java.sql._
class JDBCSink(url:String, user:String, pwd:String) extends ForeachWriter[(String, String)] {
 val driver = “com.mysql.jdbc.Driver”
 var connection:Connection = _
 var statement:Statement = _
 
 def open(partitionId: Long,version: Long): Boolean = {
 Class.forName(driver)
 connection = DriverManager.getConnection(url, user, pwd)
 statement = connection.createStatement
 true
 }
def process(value: (String, String)): Unit = {
 statement.executeUpdate(“INSERT INTO zip_test “ + 
 “VALUES (“ + value._1 + “,” + value._2 + “)”)
 }
def close(errorOrNull: Throwable): Unit = {
 connection.close
 }
 }
 
 }
 }

МОНИТОРИНГ, РЕГИСТРАЦИЯ И ОПОВЕЩЕНИЯ

Следующим шагом является интеграция в приложение сервисов мониторинга, оповещения и регистрации, чтобы получать мгновенные оповещения и следить за тем, как приложение работает. В стеке AWS есть множество инструментов, которые могут ими воспользоваться. Пара из них, которые часто используются, - это CloudWatch for Monitoring и Elastic Search for Logging.

Примерная панель мониторинга будет выглядеть примерно так

ИНФРАСТРУКТУРА

Когда код готов к развертыванию, пора выбрать подходящую инфраструктуру для его развертывания. Лучшей инфраструктурой я считаю Kafka (в основном из-за его архитектуры с несколькими издателями и потребителями и возможности устанавливать периоды хранения по разным темам) и AWS EMR в качестве базовой инфраструктуры для запуска приложений.

AWS EMR стал очевидным выбором из-за наличия кластеров с предустановленной искрой и внутренним управлением ресурсами. Возможность развернуть новый кластер с полным развертыванием за короткое время также является большим плюсом.

Схема упрощенной архитектуры будет выглядеть так.

НАСТРОЙКА ЗАДАНИЯ SPARK

Наконец, как и в любой другой искровой задаче, ее настройка необходима и в случае потоковой работы для максимальной эффективности. Первым шагом в настройке искровой работы является выбор подходящих экземпляров для работы. Проведя несколько экспериментов с типами экземпляров M4 (общего назначения) и C4 (тяжелые вычисления), я обнаружил, что M4 работает лучше, в первую очередь из-за его способности предоставлять виртуальные ядра.

Свойство DynamicAllocation в Spark также было чрезвычайно полезным для стабильного максимального увеличения использования. Также есть ряд других параметров, которые я нашел полезными для настройки производительности:

a) - conf spark.yarn.executor.memoryOverhead = 1024: объем служебной памяти, определенный для задания.

б) - conf spark.yarn.maxAppAttempts = 4: это свойство определяет максимальное количество попыток, которое будет предпринято для отправки приложения. Это очень полезно для сценариев, когда несколько искровых заданий отправляются в один кластер, а иногда отправка заданий завершается неудачно из-за нехватки доступных ресурсов.

c) - conf spark.task.maxFailures = 8: это свойство устанавливает максимальное количество раз, когда задача может завершиться ошибкой, прежде чем искровое задание откажет само. Значение по умолчанию - 2. Всегда рекомендуется оставлять это число выше.

d) - conf spark.speculation = false: если для этого свойства установлено значение true, yarn автоматически убивает и переназначает задачи в зависимости от времени, которое они занимают (если yarn считает, что они застряли). В нашем случае мы не обнаружили, что это сильно влияет на производительность, но это хорошее свойство, на которое стоит обратить внимание при обработке искаженных наборов данных.

д) - conf spark.yarn.max.executor.failures = 15: максимальное количество сбоев исполнителя до сбоя приложения. Всегда устанавливайте большее число.

f) - conf spark.yarn.executor.failuresValidityInterval = 1h: Определяет временной интервал для достоверности ошибок исполнителя. В сочетании с вышеуказанным свойством в основном за час максимум 15 исполнителей могут выйти из строя до того, как задание умрет.

g) - driver-memory 10g: Обеспечивает достаточно высокий объем памяти для драйвера, чтобы не допустить сбоя в случае пакета сообщений, которые должны быть обработаны.

Я надеюсь, что этот материал окажется полезным для людей, которые только начинают заниматься структурированным стримингом. Мне будет приятно внести свой вклад в сообщество открытого исходного кода, благодаря которому я многому научился.

Для более подробного технического обзора посетите https://spark.apache.org/docs/2.0.0/structured-streaming-programming-guide.html