СОЗДАНИЕ МАШИННОГО ОБУЧЕНИЯ В SPARK С ПОМОЩЬЮ PYTHON

Реальные данные обычно очень неструктурированы. Конвейеры машинного обучения заполняют этот пробел с помощью многоступенчатой ​​структуры, которая автоматически очищает и организует данные, преобразует их в машиночитаемую форму, обучает модели и генерирует прогнозы на постоянной основе.

Конвейеры машинного обучения полезны для автоматизации рабочих процессов, так что вы можете легко комбинировать алгоритмы и беспрепятственно запускать/настраивать свои модели.

Мы создадим конвейер, который будет очищать и преобразовывать наши входные данные с помощью таких методов разработки функций, как:
- Индексирование
- Кодирование
- Векторизация
- Нормализация

1. Клонирование входных данных из репозитория GitHub

!git clone https://github.com/arieljumba/HMP_Dataset.git

Входные данные состоят из общедоступного набора данных акселерометра для обнаружения примитивов человеческого движения.

Это общедоступная коллекция помеченных записей данных акселерометра, которые будут использоваться для создания и проверки моделей ускорения примитивов человеческого движения.

Подробная документация по набору данных представлена ​​в файле MANUAL.TXT на https://github.com/arieljumba/HMP_Dataset.git.

2. Чтение данных

import os
files_list = os.listdir('HMP_Dataset')
filtered_files_list = [x for x in files_list if '_' in x]
print(filtered_files_list)

Полученные результаты

['Descend_stairs','Eat_soup', 'Drink_glass', 'Brush_teeth',
 'Standup_chair','Getup_bed', 'Use_telephone','Pour_water',
 'Comb_hair','Eat_meat','Liedown_bed', 'Sitdown_chair',
 'Climb_stairs']

2. Очистить и организовать наши данные

Данные хранятся в текстовых файлах, содержащих 3 числовых поля, которым мы присвоим имена [‘x’, ‘y’, ‘z’].

Затем мы создадим фрейм данных spark для хранения наших данных в более организованном формате и добавим набор данных, как показано ниже:

from pyspark.sql.types import StructType, StructField, IntegerType
schemer = StructType([
    StructField('x',IntegerType(), True),
    StructField('y',IntegerType(), True),
    StructField('z',IntegerType(), True)
])
df = None
from pyspark.sql.functions import lit
from pyspark.shell import spark
for category in filtered_files_list:
  data_files = os.listdir('HMP_Dataset/'+category)
for data_file in data_files:
    temporary_df = spark.read.option('header','false').option('delimiter',' ').csv('HMP_Dataset/'+category+'/'+data_file,schema = schemer)
    temporary_df = temporary_df.withColumn('class',lit(category))
    temporary_df = temporary_df.withColumn('source',lit(data_file))
if df is None:
      df = temporary_df
    else:
      df = df.union(temporary_df)
df.show()

Наш новый фрейм данных будет выглядеть так:

+---+---+---+--------------+--------------------+
|  x|  y|  z|         class|              source|
+---+---+---+--------------+--------------------+
| 35| 37| 49|Descend_stairs|Accelerometer-201...|
| 57| 35| 37|Descend_stairs|Accelerometer-201...|
| 36| 39| 46|Descend_stairs|Accelerometer-201...|
| 36| 33| 46|Descend_stairs|Accelerometer-201...|
| 37| 38| 41|Descend_stairs|Accelerometer-201...|

3. Индексация строк/меток

Индексатор меток сопоставляет строковый столбец меток со столбцом индексов меток ML. В этом случае то же самое будет применено к столбцу класса

from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='class', outputCol = 'class_indexed')
indexed = indexer.fit(df).transform(df)
indexed.show()

Результаты индексации ярлыков

+---+---+---+--------------+--------------------+-------------+
|  x|  y|  z|         class|              source|class_indexed|
+---+---+---+--------------+--------------------+-------------+
| 35| 37| 49|Descend_stairs|Accelerometer-201...|          9.0|
| 57| 35| 37|Descend_stairs|Accelerometer-201...|          9.0|
| 36| 39| 46|Descend_stairs|Accelerometer-201...|          9.0|
| 36| 33| 46|Descend_stairs|Accelerometer-201...|          9.0|
| 37| 38| 41|Descend_stairs|Accelerometer-201...|          9.0|
| 36| 36| 41|Descend_stairs|Accelerometer-201...|          9.0|
| 32| 43| 44|Descend_stairs|Accelerometer-201...|          9.0

4. Горячее кодирование

Горячий кодировщик, который отображает столбец индексов категорий в столбец двоичных векторов с не более чем одним единичным значением в строке, указывающим индекс входной категории. Нажмите на эту ссылку для получения дополнительной информации.

from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCol = 'class_indexed',outputCol = 'class_encoded')
encoded = encoder.fit(indexed).transform(indexed)
encoded.show()

Результаты кодирования

--------------+--------------------+-------------+--------------+
         class|              source|class_indexed| class_encoded|
--------------+--------------------+-------------+--------------+
Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|
Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|
Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|
Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])

5. Векторный ассемблер

Преобразователь признаков, который объединяет несколько столбцов в векторный столбец и полезен для объединения необработанных признаков и признаков, сгенерированных разными преобразователями признаков, в один вектор признаков. Нажмите на эту ссылку для получения дополнительной информации.

from pyspark.ml.feature import VectorAssembler
vectorizer = VectorAssembler(inputCols = ['x','y','z'],outputCol = 'features'
vectorized = vectorizer.transform(encoded)
vectorized.show()

Результаты векторного ассемблера

+---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+
|  x|  y|  z|         class|              source|class_indexed| class_encoded|        features|       features_norm|
+---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+
| 35| 37| 49|Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|[35.0,37.0,49.0]|[0.49522241997434...|
| 57| 35| 37|Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|[57.0,35.0,37.0]|[0.74568758493638...|
| 36| 39| 46|Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|[36.0,39.0,46.0]|[0.51256263430407...|

6. Нормализация

from pyspark.ml.feature import Normalizer
normalizer = Normalizer(inputCol = 'features' ,outputCol = 'features_norm'
normalized = normalizer.transform(vectorized)
normalized.show()

Результаты нормализации

+--------------------+-------------+--------------+-------+---------
|source|class_indexed| class_encoded|features|     features_norm|
|Accelerometer-201...|          9.0|(12,[9],[1.0])|[35.0,37.0,49.0]|[0.49522241997434...|
|Accelerometer-201...|          9.0|(12,[9],[1.0])|[57.0,35.0,37.0]|[0.74568758493638...|
|Accelerometer-201...|          9.0|(12,[9],[1.0])|[36.0,39.0,46.0]|[0.51256263430407...|

7. Трубопровод

Все вышеперечисленные процессы могут быть связаны в один процесс (конвейер), как показано ниже:

from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import Normalizer
from pyspark.ml import Pipeline
indexer = StringIndexer(inputCol='class', outputCol = 'class_indexed')
encoder = OneHotEncoder(inputCol = 'class_indexed',outputCol = 'class_encoded')
vectorizer = VectorAssembler(inputCols = ['x','y','z'],outputCol = 'features')
normalizer = Normalizer(inputCol = 'features' ,outputCol = 'features_norm')
pipeline = Pipeline(stages =[indexer,encoder,vectorizer,normalizer])
model = pipeline.fit(df)
prediction = model.transform(df)
prediction.show()

Полученные результаты

+---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+
|  x|  y|  z|         class|              source|class_indexed| class_encoded|        features|       features_norm|
+---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+
| 35| 37| 49|Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|[35.0,37.0,49.0]|[0.49522241997434...|
| 57| 35| 37|Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|[57.0,35.0,37.0]|[0.74568758493638...|
| 36| 39| 46|Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|[36.0,39.0,46.0]|[0.51256263430407...|
| 36| 33| 46|Descend_stairs|Accelerometer-201...|          9.0|(12,[9],[1.0])|[36.0,33.0,46.0]|[0.53659669605678...|

Полная тетрадь