PySpark — это платформа с открытым исходным кодом, разработанная Apache для распределенных вычислений с большими данными. Он предоставляет удобный интерфейс для работы с массивными наборами данных в распределенной среде, что делает его популярным выбором для приложений машинного обучения (в моей предыдущей статье я рассказывал о производительности pandas vs PySpark — PyPark Vs Pandas). Библиотека машинного обучения PySpark под названием MLlib предлагает несколько алгоритмов для построения моделей регрессии и классификации. В этой статье мы рассмотрим, как использовать PySpark для построения моделей регрессии и классификации с примерами.

Мы видим, как Pyspark помогает создавать модели ML с двумя вариантами использования: регрессия и классификация.

Регрессионные модели с PySpark

Регрессия — это контролируемый метод обучения, который используется для прогнозирования непрерывной выходной переменной. В PySpark мы можем использовать линейную регрессию, регрессию дерева решений и регрессию случайного леса для построения регрессионных моделей.

Давайте возьмем набор данных о цене дома и применим регрессионную модель и модель DecisionTreeRegressor для прогнозирования цены дома с учетом его характеристик.

Начните с импорта необходимых библиотек и загрузки набора данных.

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression

spark = SparkSession.builder.appName('LinearRegression').getOrCreate()

#load the data
df = spark.read.csv('house_data.csv',header=True)
df = df.select(df["bedrooms"],df["sqft_lot"],df['condition'],df['price'])
df.describe().toPandas().transpose()

Затем мы можем определить и собрать функции и целевые столбцы и разделить данные на обучающие и тестовые наборы.

assembler = VectorAssembler(inputCols=["bedrooms", "sqft_lot", "condition"],
                            outputCol="features")
data = assembler.transform(df).select("features", "price")

# Split the data into training and testing sets
(trainingData, testData) = data.randomSplit([0.7, 0.3])

После этого мы создаем модель линейной регрессии и модель регрессии дерева решений и подгоняем ее к обучающим данным:

lr = LinearRegression(labelCol="price", featuresCol="features")
lr_model = lr.fit(trainingData

dt = DecisionTreeRegressor(featuresCol='features', labelCol="price")
dt_model = dt.fit(trainingData)

Мы можем использовать обученную модель для прогнозирования тестовых данных и, наконец, мы можем оценить производительность модели, используя среднеквадратичную ошибку, чтобы сравнить две модели.

#valuating the model
lr_model.evaluate(testData)
dt_model.evaluate(testData)

#Predicting and measuring MSE
lr_predictions = lr_model.transform(testData)
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='mse')
lr_mse = evaluator.evaluate(lr_predictions)
print('LR Mean Squared Error:', lr_mse)

#Predicting and measure MSE
dt_predictions = dt_model.transform(testData)
evaluator = RegressionEvaluator(labelCol=target_col, predictionCol='prediction', metricName='mse')
dr_mse = evaluator.evaluate(dt_predictions)
print('LR Mean Squared Error:', dr_mse)

Модели классификации с PySpark

Классификация — это метод машинного обучения, который включает прогнозирование категориальной переменной или метки класса. Бинарная классификация — это тип классификации, который включает прогнозирование одного из двух возможных результатов. В PySpark мы можем использовать логистическую регрессию и классификацию дерева решений для построения моделей бинарной классификации.

Давайте возьмем набор данных ириса и применим модель RandomForest и дерева решений, чтобы предсказать вид с учетом его характеристик.

Начните с импорта необходимых библиотек и загрузки набора данных.

from pyspark.sql import SparkSession
from pyspark.ml.regression import LinearRegression

spark = SparkSession.builder.appName('ClassificationModel').getOrCreate()

iris = spark.read.csv("iris.csv", header=True, inferSchema=True)

iris.describe().toPandas().transpose()

Затем мы можем определить и собрать функции и целевые столбцы и разделить данные на обучающие и тестовые наборы.

species_indexer = StringIndexer(inputCol="species", outputCol="speciesIndex")

iris = species_indexer.fit(iris).transform(iris)

assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"],
                            outputCol="features")
data = assembler.transform(iris).select("features", "speciesIndex")

(trainingData, testData) = data.randomSplit([0.7, 0.3])

После этого мы создаем модель RandomForest и модель регрессии дерева решений и подгоняем ее к обучающим данным:

rf = RandomForestClassifier(featuresCol='features', labelCol='speciesIndex', numTrees=10)
rf_model = rf.fit(trainingData)

dt = DecisionTreeClassifier(featuresCol='features', labelCol='speciesIndex')
dt_model = dt.fit(trainingData)

Мы можем использовать обученную модель для прогнозирования тестовых данных. Наконец, мы можем оценить производительность модели, используя точность для сравнения двух моделей.

# Evaluate the model's performance
rf_model.evaluate(testData)
dt_model.evaluate(testData)


#Measuring accuracy
rf_predictions = rf_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol='speciesIndex', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(rf_predictions)
print('Accuracy:', accuracy*100)

dt_predictions = dt_model.transform(testData)
evaluator = MulticlassClassificationEvaluator(labelCol='speciesIndex', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(dt_predictions)
print('Accuracy:', accuracy*100)

Весь код, использованный для написания этой статьи, можно найти в моем блокноте Colab: https://colab.research.google.com/drive/1_L7i9jmYqxd-KF-PfU193Bn0Rlhay1QC?usp=sharing

давайте рассмотрим несколько примеров того, как запускать код PySpark в AWS. Для приведенных ниже примеров/инструкций вам потребуется учетная запись AWS. Настройте учетную запись AWS. Если у вас еще нет учетной записи AWS, перейдите на веб-сайт AWS и создайте ее. Получив учетную запись AWS, перейдите в Консоль управления AWS.

Использование Amazon EMR:

Шаг 1. Запустите кластер Amazon EMR. В консоли управления AWS перейдите к консоли Amazon EMR. Нажмите «Создать кластер», чтобы начать процесс запуска кластера Amazon EMR.

Шаг 2. Настройка кластера В мастере «Создать кластер» вам необходимо настроить кластер. Вот некоторые ключевые настройки, которые следует учитывать:

  • Выберите подходящие типы инстансов EC2 для вашего кластера.
  • Выберите последнюю версию Amazon EMR и последнюю версию Hadoop.
  • В разделе «Приложения» выберите «Spark».
  • Настройте другие параметры в соответствии с вашим вариантом использования.

Шаг 3. Загрузите свой код PySpark После запуска и работы вашего кластера вы можете загрузить свой код PySpark. Есть несколько способов сделать это, в том числе:

  • Загрузка вашего кода в корзину Amazon S3, а затем загрузка его в ваш кластер.
  • Загрузка кода непосредственно в кластер с помощью Консоли управления AWS или интерфейса командной строки.

Шаг 4. Запустите код PySpark После загрузки кода PySpark вы сможете запустить его в своем кластере Amazon EMR. Есть несколько способов сделать это, в том числе:

  • Использование оболочки PySpark: подключитесь к главному узлу по SSH и запустите оболочку PySpark, введя pyspark в командной строке. Затем вы можете ввести свой код PySpark непосредственно в оболочку.
  • Использование команды Spark-submit: используйте команду spark-submit, чтобы отправить код PySpark в качестве задания в свой кластер.

Использование Amazon SageMaker:

  1. Запустите экземпляр блокнота Amazon SageMaker. Запустите экземпляр блокнота Amazon SageMaker с помощью консоли AWS. На экземпляре должен быть установлен PySpark и все необходимые библиотеки.
  2. Загрузите свой код PySpark: загрузите свой код PySpark в экземпляр блокнота. Убедитесь, что ваш код имеет правильную конфигурацию искры. Например
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

Запустите код PySpark. Запустите код PySpark в экземпляре блокнота.

Использование Amazon EC2: запуск кода PySpark в Amazon EC2 Amazon EC2 (Elastic Compute Cloud) – популярный сервис для запуска виртуальных машин в облаке. Вот шаги, которые необходимо выполнить для запуска кода PySpark на Amazon EC2:

  1. Выберите тип экземпляра Далее вам нужно выбрать тип экземпляра. Выберите тип экземпляра, подходящий для вашего варианта использования. Для запуска кода PySpark мы рекомендуем использовать экземпляр как минимум с 4 виртуальными ЦП и 16 ГБ ОЗУ.
  2. Выберите образ машины Amazon (AMI) Вам необходимо выбрать образ AMI для вашего экземпляра EC2. AMI — это предварительно настроенный образ виртуальной машины, который предоставляет операционную систему и любое необходимое дополнительное программное обеспечение. Для запуска кода PySpark можно использовать один из предварительно настроенных AMI, включающих PySpark и другие библиотеки обработки данных, например Amazon Machine Learning AMI.
  3. Настройка сведений об экземпляре На следующем шаге необходимо настроить сведения об экземпляре. Сюда входят количество экземпляров, сетевые настройки и настройки хранилища. Вы можете использовать настройки по умолчанию или настроить их в соответствии с вашими требованиями.
  4. Добавление хранилища Вам необходимо добавить хранилище для вашего экземпляра. Вы можете выбирать между различными типами хранилища, такими как тома EBS (Elastic Block Store) или тома хранилища экземпляров. Мы рекомендуем использовать тома EBS для большей надежности и возможностей резервного копирования.
  5. Настройка групп безопасности Вам необходимо настроить группы безопасности для управления трафиком к вашему экземпляру. Вы можете создать новую группу безопасности или использовать существующую. Убедитесь, что вы разрешаете входящий трафик через порты, которые вы будете использовать для PySpark (например, 8080 для веб-интерфейса).
  6. Подключение к экземпляру После запуска экземпляра вам необходимо подключиться к нему с помощью SSH (Secure Shell). Вы можете использовать терминальную программу или SSH-клиент для подключения к экземпляру. После подключения вы можете запускать код PySpark с помощью оболочки PySpark или блокнота Jupyter.