Запуск алгоритмов машинного обучения с использованием Spark Streaming

MLlib - это масштабируемая библиотека машинного обучения Apache Spark, состоящая из общих обучающих алгоритмов и утилит.

Чтобы продемонстрировать, как мы можем запускать алгоритмы машинного обучения с помощью Spark, я взял простой пример использования, в котором наше приложение Spark Streaming считывает данные из Kafka и сохраняет копию в виде паркетного файла в HDFS. В этом конвейере приема данных мы запускаем ML для данных, поступающих от Kafka.

В этом примере я буду использовать набор данных о цветах.

Я протестировал четыре классификатора:

 1. Decision Tree
 2. LogisticRegression
 3. NaiveBayes
 4. RandomForest

Обучение

Предполагая, что все ваши данные были загружены в HDFS, я обучил модели на этих загруженных данных. Это дает нам возможность обучать нашу модель на больших наборах данных. После завершения обучения я сохранил модели в HDFS, чтобы их можно было импортировать при необходимости.

Создайте Dataframe, прочитав данные из HDFS:

Приведение входных объектов к двойным типам данных и сборка их как векторов.

Индексирование этикеток и функций

// Indesing Label
val labelIndexer = new StringIndexer()
        .setInputCol("flower")
        .setOutputCol("indexedFlower")
        .fit(transformed_data)
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
        .setInputCol("features")
        .setOutputCol("indexedFeatures")
        .setMaxCategories(4)
        .fit(transformed_data)
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
        .setInputCol("prediction")
        .setOutputCol("predictedLabel")
        .setLabels(labelIndexer.labels)
// Declaring ML Pipeline 
val pipeline = new Pipeline()

Я обучил модель с каждым классификатором и сохранил их отдельно, так как мы можем загрузить любую модель по нашему выбору.

Объявление об использовании классификатора Дерева решений

Объявление об использовании классификатора логистической регрессии

Объявление об использовании наивного байесовского классификатора

Объявление об использовании классификатора Случайный лес

Обучите и сохраните модель

Выполнение прогнозов с помощью приложения Spark Streaming

После обучения модели я запустил приложение Spark Streaming, которое считывает данные из Kafka и запускает назначенный ML-классификатор для входящих данных.

Создайте Spark DStream с Kafka в качестве источника

// Creating Spark Conf and Streaming context 
val sparkConf = new SparkConf()
              .setAppName(SPARK_APP_NAME)
              .setMaster(SPARK_MASTER)
val sparkStreamingContext = new StreamingContext(sparkConf,    Seconds(SPARK_BATCH_DURATION))
// Creating Spark Sessionval spark = SparkSession
                                    .builder
                                    .config(sparkConf) 
                                    .getOrCreate()
import spark.implicits._
// Setting Kafka Params
val kafkaParams = Map[String, Object]("bootstrap.servers" -> KAFKA_BROKERS,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> KAFKA_GROUP_ID,
    "auto.offset.reset" -> KAFKA_OFFSET_RESET,
    "enable.auto.commit" -> (false: java.lang.Boolean))
val topicsSet = KAFKA_TOPICS.split(",").toSet
// Creating DStream 
val kafkaStream = KafkaUtils
       .createDirectStream[String, String](sparkStreamingContext,        PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

Загрузите модель из HDFS

Обработайте поток Kafka и запустите модель для данных. Как и в случае с обучением, здесь мы также приводим функции к типам данных Double и собираем их как векторы. Мы также запустим оценщик, чтобы узнать точность.

kafkaStream.map(message => {
    message.value().toString
}).foreachRDD(rdd => {
    if (!rdd.isEmpty()) {
// creating Dataframe
val data = spark.read.json(rdd.toDS())
// Casting Features to Double data types
val df = data.withColumn("PetalLength",               $"PetalLength".cast(sql.types.DoubleType))
                .withColumn("PetalWidth", $"PetalWidth".cast(sql.types.DoubleType))
                .withColumn("SepalLength", $"SepalLength".cast(sql.types.DoubleType))
                .withColumn("SepalWidth", $"SepalWidth".cast(sql.types.DoubleType))
// Assembling Features as Vectors
val assembler = new VectorAssembler()
                .setInputCols(Array("PetalLength", "PetalWidth",  "SepalLength", "SepalWidth"))
                .setOutputCol("features")
// Dropping Unwanted Columns
val transformed_data = assembler.transform(df).drop("PetalLength", "PetalWidth", "SepalLength", "SepalWidth")
// Making predictions
val predictions = model.transform(transformed_data)
// Running the Evaluator
val evaluator = new MulticlassClassificationEvaluator()
                .setLabelCol("indexedFlower")
                .setPredictionCol("prediction")
                .setMetricName("accuracy")
       
// Calcuating Accuracy
val accuracy = evaluator.evaluate(predictions)
println("Accuracy: " + accuracy)
println("Test Error = " + (1.0 - accuracy))
}
})
sparkStreamingContext.start()
sparkStreamingContext.awaitTermination()

В среднем за 30 секунд приходило ~ 700 тыс. Записей. Я запустил приложение Spark Streaming с 4 исполнителями и 2 гигабайтами памяти на каждый исполнитель. Вот текущая статистика:

Заключение

Приведенная выше статистика показывает, что все 4 классификатора были близки по времени обработки. Но с точки зрения точности логистическая регрессия дала более низкую точность. Остальные три были почти такими же.

Мы можем очень успешно запускать модели машинного обучения на разных этапах конвейера больших данных, используя Spark ML. По мере выпуска новых версий Spark улучшается поддержка настраиваемых компонентов конвейера.

Ссылки

MLlib: Основное руководство - документация Spark 2.4.3
https://spark.apache.org/docs/latest/ml-guide.html

Юронг Фан, Кушал Чандра, Нитья Л., Адитья Аги. (январь 2017 г.): Введение в инструмент машинного обучения для больших данных -SparkML
https://yurongfan.wordpress.com/ 2017/01/10 / Introduction-of-a-big-data-machine-learning-tool-sparkml /

Виктория МакРитчи. (сентябрь 2018 г.): Проект 1 - Классификация цветов ириса.

Http: // rstudio-pubs static.s3.amazonaws.com/420656_c17c8444d32548eba6f894bcbdffcaab.html