Apache Spark быстро набирает обороты как в заголовках, так и в мире, в основном благодаря своей способности обрабатывать потоковые данные. Поскольку так много данных обрабатывается ежедневно, для нас стало важным иметь возможность передавать и анализировать их в режиме реального времени. Кроме того, Apache Spark достаточно быстр, чтобы выполнять исследовательские запросы без выборки. Многие отраслевые эксперты привели все причины, почему вам следует использовать Spark для машинного обучения?
Итак, теперь мы используем Spark Machine Learning Library для решения задачи классификации текста на несколько классов, в частности, PySpark.
Если вы хотите увидеть реализацию с Scikit-Learn, прочтите предыдущую статью.
Данные
Наша задача - классифицировать описание преступлений в Сан-Франциско по 33 заранее определенным категориям. Данные можно скачать с Kaggle.
Учитывая новое описание преступления, мы хотим отнести его к одной из 33 категорий. Классификатор предполагает, что каждое новое описание преступления относится к одной и только одной категории. Это проблема классификации текста на несколько классов.
- Ввод: описание
Пример: «УКРАДЕННЫЙ АВТОМОБИЛЬ»
- Вывод: категория
Пример: УГОН АВТОМОБИЛЯ
Чтобы решить эту проблему, мы будем использовать различные методы извлечения признаков вместе с различными алгоритмами контролируемого машинного обучения в Spark. Давайте начнем!
Прием и извлечение данных
Загрузить CSV-файл просто с помощью Spark csv packages.
from pyspark.sql import SQLContext from pyspark import SparkContext sc =SparkContext() sqlContext = SQLContext(sc) data = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('train.csv')
Вот и все! Мы загрузили набор данных. Приступим к изучению.
Удалите ненужные нам столбцы и посмотрите первые пять строк:
drop_list = ['Dates', 'DayOfWeek', 'PdDistrict', 'Resolution', 'Address', 'X', 'Y'] data = data.select([column for column in data.columns if column not in drop_list]) data.show(5)
Примените printSchema () к данным, которые распечатают схему в древовидном формате:
data.printSchema()
20 основных категорий преступлений:
from pyspark.sql.functions import col data.groupBy("Category") \ .count() \ .orderBy(col("count").desc()) \ .show()
Топ-20 описаний преступлений:
data.groupBy("Descript") \ .count() \ .orderBy(col("count").desc()) \ .show()
Модель конвейера
Spark Machine Learning Pipelines API аналогичен Scikit-Learn. Наш конвейер состоит из трех этапов:
regexTokenizer
: Токенизация (с регулярным выражением)stopwordsRemover
: Удалить стоп-словаcountVectors
: Подсчет векторов («векторов документов»)
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover, CountVectorizer from pyspark.ml.classification import LogisticRegression # regular expression tokenizer regexTokenizer = RegexTokenizer(inputCol="Descript", outputCol="words", pattern="\\W") # stop words add_stopwords = ["http","https","amp","rt","t","c","the"] stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords) # bag of words count countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
StringIndexer
StringIndexer
кодирует строковый столбец меток в столбец индексов меток. Индексы находятся в [0, numLabels)
, отсортированы по частоте меток, поэтому наиболее часто встречающаяся метка получает индекс 0
.
В нашем случае столбец метки (Категория) будет закодирован для индексов меток от 0 до 32; самая частая метка (LARCENY / THEFT) будет проиндексирована как 0.
from pyspark.ml import Pipeline from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler label_stringIdx = StringIndexer(inputCol = "Category", outputCol = "label") pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx]) # Fit the pipeline to training documents. pipelineFit = pipeline.fit(data) dataset = pipelineFit.transform(data) dataset.show(5)
Разделение обучающих и тестовых наборов
# set seed for reproducibility (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100) print("Training Dataset Count: " + str(trainingData.count())) print("Test Dataset Count: " + str(testData.count()))
Количество обучающих наборов данных: 5185
Количество тестовых наборов данных: 2104
Модельное обучение и оценка
Логистическая регрессия с использованием функций вектора подсчета
Наша модель будет делать прогнозы и набирать баллы на тестовом наборе; Затем мы смотрим на 10 лучших прогнозов с наибольшей вероятностью.
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0) lrModel = lr.fit(trainingData) predictions = lrModel.transform(testData) predictions.filter(predictions['prediction'] == 0) \ .select("Descript","Category","probability","label","prediction") \ .orderBy("probability", ascending=False) \ .show(n = 10, truncate = 30)
from pyspark.ml.evaluation import MulticlassClassificationEvaluator evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator.evaluate(predictions)
0.9610787444388802
Точность отличная!
Логистическая регрессия с использованием функций TF-IDF
from pyspark.ml.feature import HashingTF, IDF hashingTF = HashingTF(inputCol="filtered", outputCol="rawFeatures", numFeatures=10000) idf = IDF(inputCol="rawFeatures", outputCol="features", minDocFreq=5) #minDocFreq: remove sparse terms pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, hashingTF, idf, label_stringIdx]) pipelineFit = pipeline.fit(data) dataset = pipelineFit.transform(data) (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100) lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0) lrModel = lr.fit(trainingData) predictions = lrModel.transform(testData) predictions.filter(predictions['prediction'] == 0) \ .select("Descript","Category","probability","label","prediction") \ .orderBy("probability", ascending=False) \ .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator.evaluate(predictions)
0.9616202660247297
Результат тот же.
Перекрестная проверка
Давайте теперь попробуем перекрестную проверку, чтобы настроить наши гиперпараметры, и мы будем настраивать только векторы подсчета. Логистическая регрессия.
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx]) pipelineFit = pipeline.fit(data) dataset = pipelineFit.transform(data) (trainingData, testData) = dataset.randomSplit([0.7, 0.3], seed = 100) lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0) from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # Create ParamGrid for Cross Validation paramGrid = (ParamGridBuilder() .addGrid(lr.regParam, [0.1, 0.3, 0.5]) # regularization parameter .addGrid(lr.elasticNetParam, [0.0, 0.1, 0.2]) # Elastic Net Parameter (Ridge = 0) # .addGrid(model.maxIter, [10, 20, 50]) #Number of iterations # .addGrid(idf.numFeatures, [10, 100, 1000]) # Number of features .build()) # Create 5-fold CrossValidator cv = CrossValidator(estimator=lr, \ estimatorParamMaps=paramGrid, \ evaluator=evaluator, \ numFolds=5) cvModel = cv.fit(trainingData) predictions = cvModel.transform(testData) # Evaluate best model evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator.evaluate(predictions)
0.9851796929217101
Производительность улучшилась.
Наивный байесовский
from pyspark.ml.classification import NaiveBayes nb = NaiveBayes(smoothing=1) model = nb.fit(trainingData) predictions = model.transform(testData) predictions.filter(predictions['prediction'] == 0) \ .select("Descript","Category","probability","label","prediction") \ .orderBy("probability", ascending=False) \ .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator.evaluate(predictions)
0.9625414629888848
Случайный лес
from pyspark.ml.classification import RandomForestClassifier rf = RandomForestClassifier(labelCol="label", \ featuresCol="features", \ numTrees = 100, \ maxDepth = 4, \ maxBins = 32) # Train model with Training Data rfModel = rf.fit(trainingData) predictions = rfModel.transform(testData) predictions.filter(predictions['prediction'] == 0) \ .select("Descript","Category","probability","label","prediction") \ .orderBy("probability", ascending=False) \ .show(n = 10, truncate = 30)
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator.evaluate(predictions)
0.6600326922344301
Случайный лес - очень хороший, надежный и универсальный метод, однако нет ничего удивительного в том, что для разреженных данных большой размерности это не лучший выбор.
Очевидно, что нашей моделью в этом эксперименте будет логистическая регрессия с перекрестной проверкой.
На этом мы подошли к концу статьи. Исходный код, с помощью которого создается этот пост, можно найти на Github. Я с нетерпением жду любых отзывов или вопросов.