Сейчас большие данные являются частью нашей жизни, и большинству компаний, собирающих данные, приходится иметь дело с большими данными, чтобы получить от них значимую информацию. Хотя мы знаем, что сложные нейронные сети работают красиво и точно, когда у нас есть большой набор данных, иногда они не самые идеальные. Однако в ситуации, когда сложность предсказания высока, предсказание должно быть быстрым и эффективным. Следовательно, нам нужно масштабируемое решение для машинного обучения.

Apache Spark поставляется с SparkML. SparkML имеет отличные встроенные алгоритмы машинного обучения, которые оптимизированы для параллельной обработки и, следовательно, очень эффективны по времени для больших данных. В этой статье мы рассмотрим простой пример конвейера SparkML для очистки, обработки и создания прогнозов для больших данных.

Мы возьмем данные о погоде в аэропорту JFK и попробуем несколько встроенных классификаторов в SparkML. Набор данных содержит такие столбцы, как скорость ветра, влажность, давление на станции и т. Д., И мы попытаемся классифицировать направление ветра на основе других входных данных.

Давайте очистим набор данных с помощью Spark. Обратите внимание, я оставлю ссылку на мой репозиторий GitHub для этого кода, чтобы вам не приходилось копировать его отсюда. Однако я объясню код в этой статье.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer, MinMaxScaler
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
import random
from pyspark.sql.functions import translate, col
# spark context
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession \
    .builder \
    .getOrCreate()
# create a dataframe out of it by using the first row as field names # and trying to infer a schema based on contents
df = spark.read.option("header", "true").option("inferSchema","true").csv('noaa-weather-data-jfk-airport/jfk_weather.csv')
# register a corresponding query table. we do this to save the data #in memory and run our operations on it. 
df.createOrReplaceTempView('df')
# cleaning the data as it contains trailing charcters. Double is a #data type like float
# columns with no trailing charecters were straight converrted to double type, rest were first cleaned
df_cleaned = df \
    .withColumn("HOURLYWindSpeed", df.HOURLYWindSpeed.cast('double')) \
    .withColumn("HOURLYWindDirection", df.HOURLYWindDirection.cast('double')) \
    .withColumn("HOURLYStationPressure", translate(col("HOURLYStationPressure"), "s,", "")) \
    .withColumn("HOURLYPrecip", translate(col("HOURLYPrecip"), "s,", "")) \
    .withColumn("HOURLYRelativeHumidity", translate(col("HOURLYRelativeHumidity"), "*", "")) \
    .withColumn("HOURLYDRYBULBTEMPC", translate(col("HOURLYDRYBULBTEMPC"), "*", "")) \
# the cleaned columsn were now changed to double types
df_cleaned =   df_cleaned \
                    .withColumn("HOURLYStationPressure", df_cleaned.HOURLYStationPressure.cast('double')) \
                    .withColumn("HOURLYPrecip", df_cleaned.HOURLYPrecip.cast('double')) \
                    .withColumn("HOURLYRelativeHumidity", df_cleaned.HOURLYRelativeHumidity.cast('double')) \
                    .withColumn("HOURLYDRYBULBTEMPC", df_cleaned.HOURLYDRYBULBTEMPC.cast('double')) \
# Filtering for clean data set with no nulls and wind speed not 0
df_filtered = df_cleaned.filter("""
    HOURLYWindSpeed <> 0
    and HOURLYWindSpeed IS NOT NULL
    and HOURLYWindDirection IS NOT NULL
    and HOURLYStationPressure IS NOT NULL
    and HOURLYPressureTendency IS NOT NULL
    and HOURLYPrecip IS NOT NULL
    and HOURLYRelativeHumidity IS NOT NULL
    and HOURLYDRYBULBTEMPC IS NOT NULL
""")
# saving the cleaned data set into CSV
df_filtered.write.csv('clean_df.csv')

Приведенный выше код - это мой сценарий предварительной обработки, который предоставит мне чистый фрейм данных для работы и опробования различных подходов к машинному обучению. Посмотрев на необработанные данные, я сделал следующие наблюдения, которые необходимо было учесть при предварительной обработке:

  1. Тип данных был в строке, и его нужно было изменить на что-то читаемое алгоритмами.
  2. В некоторых столбцах заканчивались символы, такие как "s" и "*".
  3. У нас было значительное количество нулевых значений и непригодных для использования значений, таких как 0, в целевом столбце.

Первая часть кода имеет дело с завершающими символами там, где это необходимо, и преобразует другие столбцы в тип double, который похож на значение с плавающей запятой. Вторая часть кода преобразует чистые столбцы в двойной тип. Третья часть кода отфильтровывает все нулевые значения из предикторов и нулевые значения из целевых переменных. Наконец, мы сохраняем файл в CSV как чистый df, который мы будем использовать для операций машинного обучения.

Я опробовал несколько классификаторов в своем основном коде, чтобы проверить, какой из них работает лучше всего, однако в этой статье я приведу только пример логистической регрессии. Опять же, нет необходимости копировать код отсюда, так как я предоставлю ссылку на репозиторий GitHub.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, Normalizer, MinMaxScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Bucketizer
from pyspark.ml.stat import Correlation
from pyspark.ml.linalg import Vectors
from pyspark.ml import Pipeline
import random
# read our clean csv
df_filtered = spark.read.csv('clean_df.csv')
# vector assembler
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","","HOURLYStationPressure"],
                                  outputCol="features")
df_pipeline = vectorAssembler.transform(df_filtered)
# checking correlations
Correlation.corr(df_pipeline,"features").head()[0].toArray()
# train test split 
splits = df_filtered.randomSplit([0.8, 0.2])
df_train = splits[0]
df_test = splits[1]
# discretize the value using the Bucketizer, where we split the #column in buckets from above 0, 180 and then infinity
bucketizer = Bucketizer(splits=[ 0, 180, float('Inf') ],inputCol="HOURLYWindDirection", outputCol="HOURLYWindDirectionBucketized")
# after the bucketizer we do one hot enncoding 
encoder = OneHotEncoder(inputCol="HOURLYWindDirectionBucketized", outputCol="HOURLYWindDirectionOHE")
# funtion for ccuracy calculation
def classification_metrics(prediction):
    mcEval = MulticlassClassificationEvaluator().setMetricName("accuracy") .setPredictionCol("prediction").setLabelCol("HOURLYWindDirectionBucketized")
    accuracy = mcEval.evaluate(prediction)
    print("Accuracy on test data = %g" % accuracy)
# logistic regression
# defining the model
lr = LogisticRegression(labelCol="HOURLYWindDirectionBucketized", maxIter=10)
# new vector assembler
vectorAssembler = VectorAssembler(inputCols=["HOURLYWindSpeed","HOURLYDRYBULBTEMPC"],
                                  outputCol="features")
# bew piplineline for lr
pipeline = Pipeline(stages=[bucketizer,vectorAssembler,normalizer,lr])
# predictions
model = pipeline.fit(df_train)
prediction = model.transform(df_test)
classification_metrics(prediction)

В приведенном выше коде используются некоторые общие функции SparkMl, которые должен знать специалист по анализу данных. Эти:

  1. Векторный ассемблер: в основном используется для объединения всех функций в один вектор, который затем может быть передан в оценщик или алгоритм машинного обучения.
  2. Корреляция: Spark предоставляет удобный инструмент для проверки корреляций для лучшего проектирования функций.
  3. Bucketizer. Бакетирование - это подход к преобразованию непрерывных переменных в категориальную переменную. Мы предоставляем ряд сегментов в разделах, что позволяет нам категоризировать и использовать алгоритмы классификации.
  4. OneHotEncoder: горячее кодирование - это процесс, с помощью которого категориальные переменные преобразуются в форму, которая может быть предоставлена ​​алгоритмам машинного обучения для более эффективного прогнозирования.
  5. Конвейер. Функциональность конвейера в Spark позволяет вам определять определенный набор процессов и порядок, в котором они должны выполняться. Конвейеры также можно сохранять и использовать позже, что делает их отличным инструментом для масштабируемости и переносимости.

В моем репозитории кода я использовал несколько других моделей, таких как случайный лес, деревья с градиентным усилением. Я также попробовал решить проблему регрессии при прогнозировании скорости ветра на том же наборе данных. Не стесняйтесь обращаться, если у вас есть вопросы.

Спасибо!

Ссылка на мой репозиторий GitHub:

Https://github.com/manikmal/spark_ml