Apache Spark быстро набирает обороты как в заголовках, так и в мире, в основном благодаря своей способности обрабатывать потоковые данные. Поскольку так много данных обрабатывается ежедневно, для нас стало важным иметь возможность передавать и анализировать их в режиме реального времени. Кроме того, Apache Spark достаточно быстр, чтобы выполнять исследовательские запросы без выборки. Многие отраслевые эксперты привели все причины, почему вам следует использовать Spark для машинного обучения?

Итак, теперь мы используем Spark Machine Learning Library для решения задачи классификации текста на несколько классов, в частности, PySpark.

Если вы хотите увидеть реализацию с Scikit-Learn, прочтите предыдущую статью.

Данные

Наша задача - классифицировать описание преступлений в Сан-Франциско по 33 заранее определенным категориям. Данные можно скачать с Kaggle.

Учитывая новое описание преступления, мы хотим отнести его к одной из 33 категорий. Классификатор предполагает, что каждое новое описание преступления относится к одной и только одной категории. Это проблема классификации текста на несколько классов.

  • Ввод: описание

Пример: «УКРАДЕННЫЙ АВТОМОБИЛЬ»

  • Вывод: категория

Пример: УГОН АВТОМОБИЛЯ

Чтобы решить эту проблему, мы будем использовать различные методы извлечения признаков вместе с различными алгоритмами контролируемого машинного обучения в Spark. Давайте начнем!

Прием и извлечение данных

Загрузить CSV-файл просто с помощью Spark csv packages.

from pyspark.sql import SQLContext
from pyspark import SparkContext
sc =SparkContext()
sqlContext = SQLContext(sc)
data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')

Вот и все! Мы загрузили набор данных. Приступим к изучению.

Удалите ненужные нам столбцы и посмотрите первые пять строк:

drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y']
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)

Примените printSchema () к данным, которые распечатают схему в древовидном формате:

data.printSchema()

20 основных категорий преступлений:

from pyspark.sql.functions import col
data.groupBy("Category") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

Топ-20 описаний преступлений:

data.groupBy("Descript") \
    .count() \
    .orderBy(col("count").desc()) \
    .show()

Модель конвейера

Spark Machine Learning Pipelines API аналогичен Scikit-Learn. Наш конвейер состоит из трех этапов:

  1. regexTokenizer: Токенизация (с регулярным выражением)
  2. stopwordsRemover: Удалить стоп-слова
  3. countVectors: Подсчет векторов («векторов документов»)
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer
from pyspark.ml.classification import LogisticRegression
# regular expression tokenizer
regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W")
# stop words
add_stopwords = ["http","https","amp","rt","t","c","the"] 
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
# bag of words count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)

StringIndexer

StringIndexer кодирует строковый столбец меток в столбец индексов меток. Индексы находятся в [0, numLabels), отсортированы по частоте меток, поэтому наиболее часто встречающаяся метка получает индекс 0.

В нашем случае столбец метки (Категория) будет закодирован для индексов меток от 0 до 32; самая частая метка (LARCENY / THEFT) будет проиндексирована как 0.

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label")
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
dataset.show(5)

Разделение обучающих и тестовых наборов

# set seed for reproducibility
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
print("Training Dataset Count: " + str(trainingData.count()))
print("Test Dataset Count: " + str(testData.count()))

Количество обучающих наборов данных: 5185
Количество тестовых наборов данных: 2104

Модельное обучение и оценка

Логистическая регрессия с использованием функций вектора подсчета

Наша модель будет делать прогнозы и набирать баллы на тестовом наборе; Затем мы смотрим на 10 лучших прогнозов с наибольшей вероятностью.

lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9610787444388802

Точность отличная!

Логистическая регрессия с использованием функций TF-IDF

from pyspark.ml.feature import HashingTF, IDF
hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000)
idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
lrModel = lr.fit(trainingData)
predictions = lrModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9616202660247297

Результат тот же.

Перекрестная проверка

Давайте теперь попробуем перекрестную проверку, чтобы настроить наши гиперпараметры, и мы будем настраивать только векторы подсчета. Логистическая регрессия.

pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
(trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100)
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0)
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# Create ParamGrid for Cross Validation
paramGrid = (ParamGridBuilder()
             .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter
             .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0)
#            .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations
#            .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features
             .build())
# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, \
                    estimatorParamMaps=paramGrid, \
                    evaluator=evaluator, \
                    numFolds=5)
cvModel = cv.fit(trainingData)

predictions = cvModel.transform(testData)
# Evaluate best model
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9851796929217101

Производительность улучшилась.

Наивный байесовский

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(smoothing=1)
model = nb.fit(trainingData)
predictions = model.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.9625414629888848

Случайный лес

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol="label", \
                            featuresCol="features", \
                            numTrees = 100, \
                            maxDepth = 4, \
                            maxBins = 32)
# Train model with Training Data
rfModel = rf.fit(trainingData)
predictions = rfModel.transform(testData)
predictions.filter(predictions['prediction'] == 0) \
    .select("Descript","Category","probability","label","prediction") \
    .orderBy("probability", ascending=False) \
    .show(n = 10, truncate = 30)

evaluator = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator.evaluate(predictions)

0.6600326922344301

Случайный лес - очень хороший, надежный и универсальный метод, однако нет ничего удивительного в том, что для разреженных данных большой размерности это не лучший выбор.

Очевидно, что нашей моделью в этом эксперименте будет логистическая регрессия с перекрестной проверкой.

На этом мы подошли к концу статьи. Исходный код, с помощью которого создается этот пост, можно найти на Github. Я с нетерпением жду любых отзывов или вопросов.