Это третья часть серии Apache spark, в которой мы увидим практическую реализацию одной задачи машинного обучения. если вы не читали мои предыдущие посты о искровых сериях, пожалуйста, посетите их, прежде чем читать эту статью.

Введение

Spark ML и Spark MLlib — это библиотеки машинного обучения, созданные на основе Apache Spark, платформы обработки больших данных с открытым исходным кодом. Эти библиотеки предоставляют масштабируемые, распределенные реализации многих распространенных алгоритмов и инструментов машинного обучения для предварительной обработки данных, проектирования функций, выбора и оценки моделей.

Основное различие между Spark ML и Spark MLlib заключается в предоставляемом ими API. Spark ML — это более новая библиотека, которая предоставляет API более высокого уровня на основе DataFrames, что упрощает ее использование и лучше интегрируется с остальной частью экосистемы Spark. Spark MLlib, с другой стороны, предоставляет API более низкого уровня, основанный на RDD (Resilient Distributed Datasets), который может быть более гибким, но требует дополнительных усилий по программированию.

Некоторые из алгоритмов и инструментов машинного обучения, доступных в обеих библиотеках, включают:

  • Классификация: логистическая регрессия, деревья решений, случайные леса, наивный байесовский алгоритм и т. д.
  • Регрессия: линейная регрессия, обобщенная линейная регрессия и т. д.
  • Кластеризация: k-средние, смешанные модели Гаусса и т. д.
  • Совместная фильтрация: альтернативный метод наименьших квадратов и т. д.
  • Разработка признаков: векторный ассемблер, однократное кодирование и т. д.
  • Выбор и оценка модели: перекрестная проверка, настройка гиперпараметров и т. д.

И Spark ML, и Spark MLlib предназначены для масштабирования и могут обрабатывать большие наборы данных, распределяя вычисления по кластеру машин. Это делает их идеальными для приложений с большими данными, где традиционные инструменты машинного обучения могут с трудом справляться с размером данных.

Разработка функций

Это самый первый шаг в любом подходе к решению задач машинного обучения. В этом разделе мы изучим 3 различных метода разработки функций, как показано ниже.

  • Индексатор строк
  • OneHotEncoder (ОНЕ)
  • ВекторАссемблер

Индексатор строк

В библиотеке PySpark ML (машинное обучение) StringIndexer — это преобразователь признаков, который используется для преобразования столбца категориальной строки в числовой столбец. Он сопоставляет категориальные значения числовым значениям, присваивая уникальный индекс каждому уникальному значению во входном столбце. Это преобразование необходимо, потому что многие алгоритмы машинного обучения требуют, чтобы входные признаки были числовыми значениями.

StringIndexer присваивает индекс каждому уникальному значению во входном столбце, начиная с 0, в порядке частоты. Например, если входной столбец имеет значения ["красный", "зеленый", "красный", "синий", "зеленый", "зеленый"], StringIndexer присвоит значения ["красный"=0, " зеленый»=1, «синий»=2].

давайте посмотрим на код практической реализации концепции.

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer
#%%
spark = SparkSession.builder.master("local[*]").appName("sparkdev-stringindexer-demo").getOrCreate()
# define the structure to the data frame
schema = StructType([
    StructField(name="id", dataType=IntegerType(), nullable=False),
    StructField(name="Technology", dataType=StringType(), nullable=False)
])

df = spark.createDataFrame([(0,"Java"),(1,"NodeJS"),(2,"SpringBoot"),(3,"Java"),(4,"MongoDB"),(5,"SpringBoot")],schema=schema,verifySchema=True)
#%%
df.printSchema()
df.show(truncate=False)
#%%
indexer = StringIndexer(inputCol="Technology", outputCol="TechnologyIndexed", stringOrderType="frequencyAsc")
indexed = indexer.fit(df).transform(df)
indexed.show()

OneHotEncoder

В Spark ML OneHotEncoder — это преобразователь, который преобразует категориальные переменные в векторный формат, который можно использовать в алгоритмах машинного обучения. Он берет столбец категориальных признаков и генерирует новый столбец бинарных векторов, где каждый вектор представляет уникальную категорию.

давайте посмотрим на код практической реализации концепции.

from pyspark.ml.feature import OneHotEncoder
#%%
encoder = OneHotEncoder(inputCols=["TechnologyIndexed"], outputCols=["TechnologyVec"], dropLast=False)
encoded = encoder.fit(indexed).transform(indexed)
encoded.show(truncate=False)

encoder1 = OneHotEncoder(inputCols=["TechnologyIndexed"], outputCols=["TechnologyVec"])
encoded = encoder1.fit(indexed).transform(indexed)
encoded.show(truncate=False)

Векторный ассемблер

В PySpark ML VectorAssembler — это преобразователь, который собирает несколько столбцов функций в один векторный столбец. Он берет список имен входных столбцов и создает новый столбец, содержащий вектор, состоящий из значений входных столбцов.

from pyspark.ml.feature import VectorAssembler
#%%
# Sample data
data = [(0, 1.2, 4.3, 5.4), (1, 2.2, 3.1, 6.5), (2, 0.9, 2.3, 5.6), (3, 1.5, 4.1, 7.2)]

# Create a dataframe
df = spark.createDataFrame(data, ["id", "feat1", "feat2", "feat3"])
df.show()
# Create a VectorAssembler
assembler = VectorAssembler(inputCols=["feat1", "feat2", "feat3"], outputCol="features")

# Transform the data
output = assembler.transform(df)

# Select the relevant columns and show the output
output.select(col("id"), col("features")).show()

PySpark ML

С приведенным выше введением и примерами стратегий реализации разработки функций давайте начнем с машинного обучения и его реализации с помощью pyspark. в этом мы возьмем самый известный игрушечный набор данных iris .

Без дальнейших задержек давайте начнем с кода

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#%%
spark = SparkSession.builder.appName("sparkdev-ML-tutorial").master("local[*]").getOrCreate()
#%%
df = spark.read.csv(path='iris_dataset.csv',header=True)
df.printSchema()
df.show(10, truncate=False)

давайте преобразуем строковый тип данных в двойной для каждого столбца функций, поскольку мы будем иметь дело только с числовыми типами данных.

input_columns = ['sepal_length(cm)','sepal_width(cm)','petal_length(cm)','petal_width(cm)']

# Assuming your dataframe is named "df" and the columns you want to convert are "col1", "col2", "col3"
df = df.select([col(c).cast("double").alias(c) if c in input_columns else col(c) for c in df.columns])
df.printSchema()
df.show(5)

использование векторного ассемблера, чтобы получить все функции в один столбец

from pyspark.ml.feature import VectorAssembler
#%%
vector_assembler = VectorAssembler(inputCols=input_columns, outputCol="features", handleInvalid="skip")
transformed_df = vector_assembler.transform(df)
transformed_df.show(5, truncate=False)

required_df = transformed_df.drop('sepal_length(cm)','sepal_width(cm)','petal_length(cm)','petal_width(cm)')
required_df.show(5, truncate=False)
required_df.select(col("target")).distinct().show(truncate=False)

используйте индексатор строк для преобразования целевого столбца

from pyspark.ml.feature import StringIndexer
#%%
string_indexer = StringIndexer(inputCol='target', outputCol='targetIndexed')
project_df = string_indexer.fit(required_df).transform(required_df)
project_df.show(5, truncate=False)

Давайте теперь создадим поезд и тестовый набор с правилом 80–20.

(train_set, test_set) = project_df.randomSplit([0.8,0.2])

в этом примере мы будем использовать 2 известных алгоритма классификации DecisionTreeClassifier и RandomForestClassifier.

from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#%%
dt = DecisionTreeClassifier(featuresCol='features', labelCol='targetIndexed')
model = dt.fit(train_set)
#%%
predictions = model.transform(test_set)
predictions.show(10, truncate=False)

evaluator = MulticlassClassificationEvaluator(labelCol='targetIndexed', predictionCol='prediction')
accuracy = evaluator.evaluate(predictions)
print('Accuracy=', accuracy)
print('Test Accuracy=', 1.0-accuracy)

rfc = RandomForestClassifier(featuresCol='features',labelCol='targetIndexed', numTrees=10)
model_rfc = rfc.fit(train_set)
#%%
predictions_rfc = model_rfc.transform(test_set)
predictions_rfc.show(5, truncate=False)

evaluator = MulticlassClassificationEvaluator(labelCol='targetIndexed', predictionCol='prediction')
accuracy = evaluator.evaluate(predictions_rfc)
print('Accuracy=', accuracy)
print('Test Error=', 1.0-accuracy)

заключение

В заключение, PySpark ML — это мощная библиотека машинного обучения, которая позволяет разработчикам и специалистам по данным создавать масштабируемые и эффективные конвейеры машинного обучения на крупномасштабных наборах данных. Благодаря обширному набору алгоритмов и инструментов машинного обучения PySpark ML упрощает процесс создания и развертывания моделей машинного обучения в любом масштабе.

Некоторые из ключевых преимуществ использования PySpark ML включают в себя:

  1. Масштабируемость: PySpark ML может обрабатывать большие наборы данных и может распределять вычисления по кластеру машин, что делает его подходящим для приложений с большими данными.
  2. Универсальность: PySpark ML предоставляет широкий спектр алгоритмов машинного обучения, инструментов предварительной обработки и преобразования данных, а также показателей оценки на выбор, позволяя разработчикам выбирать наиболее подходящий подход для своего конкретного случая использования.
  3. Интеграция: PySpark ML легко интегрируется с другими библиотеками PySpark и инструментами обработки данных, что упрощает создание сквозных конвейеров данных.
  4. Простота использования: PySpark ML предоставляет простой и интуитивно понятный API, позволяющий разработчикам и специалистам по обработке и анализу данных быстро освоиться и приступить к созданию моделей машинного обучения.

В целом, PySpark ML — это мощный инструмент для машинного обучения на больших данных. Благодаря своей масштабируемости, универсальности, интеграции и простоте использования он позволяет разработчикам и специалистам по данным создавать и развертывать модели машинного обучения в любом масштабе, раскрывая потенциал больших данных для широкого спектра приложений.

весь код для всех трех частей доступен в моем репозитории github, чтобы вы могли скачать и поиграть. Счастливого обучения