Этот пост - знак признательности замечательному сообществу специалистов по науке о данных с открытым исходным кодом, которому я многим обязан тем, чему научился.
Последние несколько месяцев я работал над своим побочным проектом по разработке приложения машинного обучения для потоковой передачи данных. Это был отличный учебный опыт с многочисленными проблемами и большим объемом знаний, некоторыми из которых я попытался поделиться здесь.
Этот пост посвящен развертыванию моделей машинного обучения для потоковой передачи данных и охватывает все 3 необходимые области успешного производственного приложения: инфраструктуру, технологии и мониторинг.
РАЗРАБОТКА МОДЕЛИ МАШИННОГО ОБУЧЕНИЯ С ПОМОЩЬЮ SPARK-ML И СТРУКТУРИРОВАННОМ ПОТОКУ
Первым шагом для любого успешного приложения является определение технологического стека, в который оно должно быть написано, на основе бизнес-требований. Как правило, при большом объеме данных используйте Spark.
Основными преимуществами использования Spark являются его доказанная способность обрабатывать большие данные и доступность встроенных распределенных библиотек машинного обучения. Первоначальными проблемами при использовании Spark было использование RDD, которые были интуитивно понятными и сильно отличались от Dataframes, с которыми специалисты по данным привыкли работать. Однако с введением Dataset API в Spark2.0 теперь стало проще, чем раньше, кодировать алгоритм машинного обучения.
Во время своего опыта я обнаружил, что работа с моделями машинного обучения становится чрезвычайно простой при правильном использовании структуры «Конвейер». Конвейер предоставляет структуру, включающую все шаги, необходимые для обработки и очистки данных, обучения модели, а затем ее записи в виде объекта.
Затем этот объект можно напрямую импортировать для обработки новых данных и получения результатов, освобождая разработчика от процесса перезаписи и сохранения точной копии шагов обработки для новых данных, которые использовались для построения модели с обучающими данными.
В приведенном ниже фрагменте я попытался объяснить, как использовать этот API для создания, сохранения и использования моделей для прогнозирования. Для создания и сохранения модели можно использовать следующую структуру кода.
// Create a sqlContext var sqlContext = new SQLContext(sparkContext) // Read the training data from a source. In this case i am reading it from a s3 location var data = sqlContext.read.format(“csv”).option(“header”, “true”).option(“inferSchema”, “true”).load(“pathToFile”) // Select the needed messages data = data.select(“field1”,”field2",”field3") // Perform pre-processing on the data val process1 = … some process … val process2 = … some process … // Define an evaluator val evaluator = … evaluator of your choice … // split the data into training and test val Array(trainingData, testData) = data.randomSplit(Array(ratio1, ratio2)) // Define the algorithm to train. For example decision tree val dt = new DecisionTree() .setFeaturesCol(featureColumn).setLabelCol(labelColumn) // Define the linear pipeline. Methods specified in the pipeline are executed in a linear order. Sequence of steps is binding val pipelineDT = new Pipeline().setStages(Array(process1, process2, dt)) // Define the cross validator for executing the pipeline and performing cross validation. val cvLR = new CrossValidator() .setEstimator(pipelineDT) .setEvaluator(evaluator) .setNumFolds(3) // Use 3+ in practice // Fit the model on training data val cvModelLR = cvLR.fit(trainingData) // extract the best trained pipeline model from the cross validator. val bestPipelineModel = cvModelLR.bestModel.asInstanceOf[PipelineModel] // Save the model in a s3 bucket cvModelLR.write.overwrite().save(mlModelPath)
После сохранения модели ее можно легко использовать для прогнозирования потоковых данных, выполнив следующие шаги.
1. Чтение данных из темы Kafka
// Create a spark session object val ss = SparkSession.builder.getOrCreate() // Define schema of the topic to be consumed val schema= StructType( Seq( StructField(“Field1”,StringType,true), StructField(“Field2”,IntType,true) ) ) // Start reading from a Kafka topic val records = ss.readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, kafkaServer) .option(“subscribe”,kafkaTopic) .load() .selectExpr(“cast (value as string) as json”) .select(from_json($”json”,schema).as(“data”)) .select(“data.*”)
2. Загрузите сохраненную модель машинного обучения и используйте ее для прогнозирования
// Load the classification model from saved location val classificationModel = CrossValidatorModel.read.load(mlModelPath) // Use the model to perform predictions. By default best model is used val results = classificationModel.transform(records)
3. Сохраните результаты в s3 или другом месте
В формате csv
// Saving results to a location as csv results.writeStream.format(“csv”).outputMode(“append”) .option(“path”, destination_path) .option(“checkpointLocation”, checkpointPath) .start()
В формате паркет
// Saving results to a location as parquet results.writeStream.format(“parquet”).outputMode(“append”) .option(“path”, destination_path) .option(“checkpointLocation”, checkpointPath) .start()
Или если мы хотим отправить результаты в какую-то базу данных или любые другие расширения
val writer = new JDBCSink(url, user, password) results.writeStream .foreach(writer) .outputMode(“append”) .option(“checkpointLocation”, checkpointPath) .start()
Для этой цели необходимо реализовать отдельный модуль записи, расширив интерфейс ForeachWriter с помощью структурированного потока Spark g. Ниже показан пример кода для jdbc, взятого из https://docs.databricks.com/_static/notebooks/structured-streaming-etl-kafka.html.
import java.sql._ class JDBCSink(url:String, user:String, pwd:String) extends ForeachWriter[(String, String)] { val driver = “com.mysql.jdbc.Driver” var connection:Connection = _ var statement:Statement = _ def open(partitionId: Long,version: Long): Boolean = { Class.forName(driver) connection = DriverManager.getConnection(url, user, pwd) statement = connection.createStatement true } def process(value: (String, String)): Unit = { statement.executeUpdate(“INSERT INTO zip_test “ + “VALUES (“ + value._1 + “,” + value._2 + “)”) } def close(errorOrNull: Throwable): Unit = { connection.close } } } }
МОНИТОРИНГ, РЕГИСТРАЦИЯ И ОПОВЕЩЕНИЯ
Следующим шагом является интеграция в приложение сервисов мониторинга, оповещения и регистрации, чтобы получать мгновенные оповещения и следить за тем, как приложение работает. В стеке AWS есть множество инструментов, которые могут ими воспользоваться. Пара из них, которые часто используются, - это CloudWatch for Monitoring и Elastic Search for Logging.
Примерная панель мониторинга будет выглядеть примерно так
ИНФРАСТРУКТУРА
Когда код готов к развертыванию, пора выбрать подходящую инфраструктуру для его развертывания. Лучшей инфраструктурой я считаю Kafka (в основном из-за его архитектуры с несколькими издателями и потребителями и возможности устанавливать периоды хранения по разным темам) и AWS EMR в качестве базовой инфраструктуры для запуска приложений.
AWS EMR стал очевидным выбором из-за наличия кластеров с предустановленной искрой и внутренним управлением ресурсами. Возможность развернуть новый кластер с полным развертыванием за короткое время также является большим плюсом.
Схема упрощенной архитектуры будет выглядеть так.
НАСТРОЙКА ЗАДАНИЯ SPARK
Наконец, как и в любой другой искровой задаче, ее настройка необходима и в случае потоковой работы для максимальной эффективности. Первым шагом в настройке искровой работы является выбор подходящих экземпляров для работы. Проведя несколько экспериментов с типами экземпляров M4 (общего назначения) и C4 (тяжелые вычисления), я обнаружил, что M4 работает лучше, в первую очередь из-за его способности предоставлять виртуальные ядра.
Свойство DynamicAllocation в Spark также было чрезвычайно полезным для стабильного максимального увеличения использования. Также есть ряд других параметров, которые я нашел полезными для настройки производительности:
a) - conf spark.yarn.executor.memoryOverhead = 1024: объем служебной памяти, определенный для задания.
б) - conf spark.yarn.maxAppAttempts = 4: это свойство определяет максимальное количество попыток, которое будет предпринято для отправки приложения. Это очень полезно для сценариев, когда несколько искровых заданий отправляются в один кластер, а иногда отправка заданий завершается неудачно из-за нехватки доступных ресурсов.
c) - conf spark.task.maxFailures = 8: это свойство устанавливает максимальное количество раз, когда задача может завершиться ошибкой, прежде чем искровое задание откажет само. Значение по умолчанию - 2. Всегда рекомендуется оставлять это число выше.
d) - conf spark.speculation = false: если для этого свойства установлено значение true, yarn автоматически убивает и переназначает задачи в зависимости от времени, которое они занимают (если yarn считает, что они застряли). В нашем случае мы не обнаружили, что это сильно влияет на производительность, но это хорошее свойство, на которое стоит обратить внимание при обработке искаженных наборов данных.
д) - conf spark.yarn.max.executor.failures = 15: максимальное количество сбоев исполнителя до сбоя приложения. Всегда устанавливайте большее число.
f) - conf spark.yarn.executor.failuresValidityInterval = 1h: Определяет временной интервал для достоверности ошибок исполнителя. В сочетании с вышеуказанным свойством в основном за час максимум 15 исполнителей могут выйти из строя до того, как задание умрет.
g) - driver-memory 10g: Обеспечивает достаточно высокий объем памяти для драйвера, чтобы не допустить сбоя в случае пакета сообщений, которые должны быть обработаны.
Я надеюсь, что этот материал окажется полезным для людей, которые только начинают заниматься структурированным стримингом. Мне будет приятно внести свой вклад в сообщество открытого исходного кода, благодаря которому я многому научился.
Для более подробного технического обзора посетите https://spark.apache.org/docs/2.0.0/structured-streaming-programming-guide.html