СОЗДАНИЕ МАШИННОГО ОБУЧЕНИЯ В SPARK С ПОМОЩЬЮ PYTHON
Реальные данные обычно очень неструктурированы. Конвейеры машинного обучения заполняют этот пробел с помощью многоступенчатой структуры, которая автоматически очищает и организует данные, преобразует их в машиночитаемую форму, обучает модели и генерирует прогнозы на постоянной основе.
Конвейеры машинного обучения полезны для автоматизации рабочих процессов, так что вы можете легко комбинировать алгоритмы и беспрепятственно запускать/настраивать свои модели.
Мы создадим конвейер, который будет очищать и преобразовывать наши входные данные с помощью таких методов разработки функций, как:
- Индексирование
- Кодирование
- Векторизация
- Нормализация
1. Клонирование входных данных из репозитория GitHub
!git clone https://github.com/arieljumba/HMP_Dataset.git
Входные данные состоят из общедоступного набора данных акселерометра для обнаружения примитивов человеческого движения.
Это общедоступная коллекция помеченных записей данных акселерометра, которые будут использоваться для создания и проверки моделей ускорения примитивов человеческого движения.
Подробная документация по набору данных представлена в файле MANUAL.TXT на https://github.com/arieljumba/HMP_Dataset.git.
2. Чтение данных
import os files_list = os.listdir('HMP_Dataset') filtered_files_list = [x for x in files_list if '_' in x] print(filtered_files_list)
Полученные результаты
['Descend_stairs','Eat_soup', 'Drink_glass', 'Brush_teeth', 'Standup_chair','Getup_bed', 'Use_telephone','Pour_water', 'Comb_hair','Eat_meat','Liedown_bed', 'Sitdown_chair', 'Climb_stairs']
2. Очистить и организовать наши данные
Данные хранятся в текстовых файлах, содержащих 3 числовых поля, которым мы присвоим имена [‘x’, ‘y’, ‘z’].
Затем мы создадим фрейм данных spark для хранения наших данных в более организованном формате и добавим набор данных, как показано ниже:
from pyspark.sql.types import StructType, StructField, IntegerType schemer = StructType([ StructField('x',IntegerType(), True), StructField('y',IntegerType(), True), StructField('z',IntegerType(), True) ]) df = None from pyspark.sql.functions import lit from pyspark.shell import spark for category in filtered_files_list: data_files = os.listdir('HMP_Dataset/'+category) for data_file in data_files: temporary_df = spark.read.option('header','false').option('delimiter',' ').csv('HMP_Dataset/'+category+'/'+data_file,schema = schemer) temporary_df = temporary_df.withColumn('class',lit(category)) temporary_df = temporary_df.withColumn('source',lit(data_file)) if df is None: df = temporary_df else: df = df.union(temporary_df) df.show()
Наш новый фрейм данных будет выглядеть так:
+---+---+---+--------------+--------------------+ | x| y| z| class| source| +---+---+---+--------------+--------------------+ | 35| 37| 49|Descend_stairs|Accelerometer-201...| | 57| 35| 37|Descend_stairs|Accelerometer-201...| | 36| 39| 46|Descend_stairs|Accelerometer-201...| | 36| 33| 46|Descend_stairs|Accelerometer-201...| | 37| 38| 41|Descend_stairs|Accelerometer-201...|
3. Индексация строк/меток
Индексатор меток сопоставляет строковый столбец меток со столбцом индексов меток ML. В этом случае то же самое будет применено к столбцу класса
from pyspark.ml.feature import StringIndexer indexer = StringIndexer(inputCol='class', outputCol = 'class_indexed') indexed = indexer.fit(df).transform(df) indexed.show()
Результаты индексации ярлыков
+---+---+---+--------------+--------------------+-------------+ | x| y| z| class| source|class_indexed| +---+---+---+--------------+--------------------+-------------+ | 35| 37| 49|Descend_stairs|Accelerometer-201...| 9.0| | 57| 35| 37|Descend_stairs|Accelerometer-201...| 9.0| | 36| 39| 46|Descend_stairs|Accelerometer-201...| 9.0| | 36| 33| 46|Descend_stairs|Accelerometer-201...| 9.0| | 37| 38| 41|Descend_stairs|Accelerometer-201...| 9.0| | 36| 36| 41|Descend_stairs|Accelerometer-201...| 9.0| | 32| 43| 44|Descend_stairs|Accelerometer-201...| 9.0
4. Горячее кодирование
Горячий кодировщик, который отображает столбец индексов категорий в столбец двоичных векторов с не более чем одним единичным значением в строке, указывающим индекс входной категории. Нажмите на эту ссылку для получения дополнительной информации.
from pyspark.ml.feature import OneHotEncoder encoder = OneHotEncoder(inputCol = 'class_indexed',outputCol = 'class_encoded') encoded = encoder.fit(indexed).transform(indexed) encoded.show()
Результаты кодирования
--------------+--------------------+-------------+--------------+ class| source|class_indexed| class_encoded| --------------+--------------------+-------------+--------------+ Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])| Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])| Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])| Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])
5. Векторный ассемблер
Преобразователь признаков, который объединяет несколько столбцов в векторный столбец и полезен для объединения необработанных признаков и признаков, сгенерированных разными преобразователями признаков, в один вектор признаков. Нажмите на эту ссылку для получения дополнительной информации.
from pyspark.ml.feature import VectorAssembler vectorizer = VectorAssembler(inputCols = ['x','y','z'],outputCol = 'features' vectorized = vectorizer.transform(encoded) vectorized.show()
Результаты векторного ассемблера
+---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+ | x| y| z| class| source|class_indexed| class_encoded| features| features_norm| +---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+ | 35| 37| 49|Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])|[35.0,37.0,49.0]|[0.49522241997434...| | 57| 35| 37|Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])|[57.0,35.0,37.0]|[0.74568758493638...| | 36| 39| 46|Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])|[36.0,39.0,46.0]|[0.51256263430407...|
6. Нормализация
from pyspark.ml.feature import Normalizer normalizer = Normalizer(inputCol = 'features' ,outputCol = 'features_norm' normalized = normalizer.transform(vectorized) normalized.show()
Результаты нормализации
+--------------------+-------------+--------------+-------+--------- |source|class_indexed| class_encoded|features| features_norm| |Accelerometer-201...| 9.0|(12,[9],[1.0])|[35.0,37.0,49.0]|[0.49522241997434...| |Accelerometer-201...| 9.0|(12,[9],[1.0])|[57.0,35.0,37.0]|[0.74568758493638...| |Accelerometer-201...| 9.0|(12,[9],[1.0])|[36.0,39.0,46.0]|[0.51256263430407...|
7. Трубопровод
Все вышеперечисленные процессы могут быть связаны в один процесс (конвейер), как показано ниже:
from pyspark.ml.feature import StringIndexer from pyspark.ml.feature import OneHotEncoder from pyspark.ml.feature import VectorAssembler from pyspark.ml.feature import Normalizer from pyspark.ml import Pipeline indexer = StringIndexer(inputCol='class', outputCol = 'class_indexed') encoder = OneHotEncoder(inputCol = 'class_indexed',outputCol = 'class_encoded') vectorizer = VectorAssembler(inputCols = ['x','y','z'],outputCol = 'features') normalizer = Normalizer(inputCol = 'features' ,outputCol = 'features_norm') pipeline = Pipeline(stages =[indexer,encoder,vectorizer,normalizer]) model = pipeline.fit(df) prediction = model.transform(df) prediction.show()
Полученные результаты
+---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+ | x| y| z| class| source|class_indexed| class_encoded| features| features_norm| +---+---+---+--------------+--------------------+-------------+--------------+----------------+--------------------+ | 35| 37| 49|Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])|[35.0,37.0,49.0]|[0.49522241997434...| | 57| 35| 37|Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])|[57.0,35.0,37.0]|[0.74568758493638...| | 36| 39| 46|Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])|[36.0,39.0,46.0]|[0.51256263430407...| | 36| 33| 46|Descend_stairs|Accelerometer-201...| 9.0|(12,[9],[1.0])|[36.0,33.0,46.0]|[0.53659669605678...|