"Машинное обучение"

Прогнозирование временных рядов с использованием Spark

Прогнозирование пешеходного движения на следующий час с помощью Spark

Введение в цель

В наши дни в высокотехнологичных или умных городах подсчет пешеходов можно отслеживать путем развертывания датчиков в определенных местах, которые могут подсчитывать количество пешеходов каждый час (согласно данным, используемым для этого блога) или по мере необходимости. Из названия самого сообщения можно понять, что здесь мы попытаемся спрогнозировать количество пешеходов или пешеходного движения в определенных местах на следующий час на основе данных за предыдущий час (ы). Этот метод также называется одношаговым прогнозированием временного ряда, когда мы прогнозируем следующее значение с помощью предыдущих значений. Следовательно, это проблема типа регрессии временных рядов, поскольку данные для прогнозирования имеют непрерывный характер. Используя эти прогнозы, мы можем выбрать места с наибольшим трафиком, которые затем могут быть использованы определенными компаниями для продвижения своих продуктов, исполнителей в музыкальной и развлекательной индустрии, чтобы убедиться, что их слышит наибольшее количество людей и т. Д.

Приведенное выше утверждение проясняет цель, но в то же время дает возможность ответить на некоторые основные вопросы, которые могут прийти в голову человеку. Давайте сначала ответим на эти вопросы:

Какие инструменты будут использоваться для прогнозирования временных рядов?

Согласно этому блогу, используемый инструмент - Apache Spark. Используемый язык программирования - Python. Вы можете перейти на мою страницу github, чтобы получить полный код.

Какой тип данных будет использоваться в этом блоге?

В этом блоге будут использоваться два набора данных

  • Данные о количестве пешеходов

  • Данные сенсора

К сожалению, фактические используемые данные не могут быть загружены из-за проблем с авторским правом, но в то же время я поделился структурой данных, поэтому можно использовать любые данные аналогичной структуры или данные, которые могут быть преобразованы в аналогичную структуру.

Теперь, когда мы рассмотрели основные вопросы и поставили цель, давайте перейдем к ее достижению.

Поток, используемый для достижения цели

Шаги

  1. Загрузка данных
  2. Исследование данных
  3. Извлечение и преобразование признаков
  4. Построение и сохранение модели

Загрузка данных

Чтобы сначала загрузить данные, нам нужно создать схему данных для чтения. Как показано ниже.

Функцию StructType можно использовать для определения структуры, а функцию StructField можно использовать для добавления каждого атрибута с соответствующим типом данных.

После создания схемы мы можем легко загрузить данные с помощью функции Spark spark.read.format('csv'), поскольку исходные данные представлены в формате, разделенном запятыми, а также функцию schema() можно использовать для предоставления схемы (созданной выше) в качестве параметра для чтения данных. Можно использовать другие параметры, например, для указания формата отметки времени и т. Д. Взгляните на приведенный ниже код.

Если вы хотите взглянуть на полученную структуру данных, чтобы убедиться, что данные показывают правильную схему, ее можно проверить с помощью функции printSchema(). См. Приведенный ниже код и вывод.

Исследование данных

Теперь наступает часть, в которой нам нужно изучить данные, чтобы увидеть определенные тенденции / закономерности, которые помогут выбрать функции, которые будут использоваться для построения модели машинного обучения.

Поскольку данные были загружены в фреймы данных Spark, которые не так просто построить напрямую, необходимые данные были преобразованы в фрейм данных Python pandas для построения.

Сначала мы построим гистограмму, чтобы увидеть распределение столбца Hourly_Counts, используя приведенный ниже код.

Вот как будет выглядеть график гистограммы, очевидно, в зависимости от распределения ваших данных гистограмма может меняться. Обратите внимание, что для этого графика масштаб был изменен на логарифмический масштаб, чтобы лучше его визуализировать.

Многие другие визуализации могут быть созданы после преобразования необходимых данных в фреймворк pandas. Обратитесь к коду, чтобы узнать, какие еще визуализации можно создать.

Извлечение и преобразование функций

Нам нужно будет создать столбец Previous_Hour_Counts из столбца Hourly_Counts, используя оконную функцию под названием lag(). Мы создали задержку в столбце Hourly_Counts, разбили столбец Sensord_ID и упорядочили столбец Date_Time, чтобы убедиться, что значение счетчика одного датчика не передается другому датчику при создании задержки. Каждое значение ячейки столбца Hourly_Counts будет сдвинуто на одну строку вверх, и столбец получит имя Previous_Hour_Counts. Ниже приведен код и анимированный пример того, как будут выглядеть данные в столбце Previous_Hour_Counts.

Обязательно удалите строки, где Previous_Hour_Counts равно NULL, поскольку эти строки не смогут участвовать в прогнозировании.

Нам также необходимо удалить ненужные данные для повышения эффективности. Мы можем фильтровать данные только в период, скажем, с 2016 по 2018 год и только для часов с 9:00 до 23:00 включительно, что имело бы смысл, потому что до этого времени общее мнение гласит, что количество пешеходов будет низким, поскольку часы с 23:00 до 8:00 являются `` необоснованными ''. 'для большого количества людей, которые гуляют по улицам.

Глядя на выбор функций

ID: этот столбец является уникальным идентификатором для каждой строки и поэтому не имеет значения с точки зрения прогноза и, следовательно, не должен использоваться для прогнозирования.

Год, Месяц, Дата, Время: Эти столбцы могут оказаться очень полезными для прогнозирования согласно исследованию (пожалуйста, проверьте страницу github для кода). Они выглядят очень важными, поскольку демонстрируют закономерности, которые можно изучить с помощью алгоритма машинного обучения, и, следовательно, помогают в прогнозировании.

Sensor_ID: эта функция также предоставит много информации, потому что некоторые датчики / местоположения могут всегда получать большое количество пешеходов, а некоторые другие могут получать низкие значения. Следовательно, эта функция может отображать шаблоны.

Hourly_Counts: Этот столбец предназначен для обучения моделей регрессии.

Previous_Hour_Counts: Как видно из визуализаций, столбец Hourly_Counts может быть очень хорошей функцией для прогнозирования следующего значения на основе исторического значения, и поэтому мы можем использовать это как функцию для прогнозирования.

Столбцы «Date_Time», «MonthNum», «DayofWeek», «Mdate», «Sensor_Name» необходимо удалить, поскольку они не добавляют особой ценности модели или их категориальные аналоги уже добавлены.

Нам нужно преобразовать типы данных определенных столбцов в нужные нам типы данных, Time в StringType (), Previous_Hour_Counts в IntegerType (). , Hourly_Counts до IntegerType (), Sensor_ID до StringType ().

Имя столбца Hourly_Counts можно изменить на target (это необязательно), используя приведенный ниже код.

Затем нам нужно использовать технику String Indexing для кодирования строковых / категориальных столбцов для маркировки индексов. Теперь для этих индексированных значений (из StringIndexer) можно использовать технику One-Hot Encoding, чтобы модель могла использовать эти индексированные значения в качестве функций. При использовании One-Hot Encoding эти значения кодируются таким образом, поскольку их порядковые значения не будут иметь никакого значения и могут отрицательно повлиять на модель. Чтобы использовать созданные выше функции, нам нужно собрать их в векторы с помощью Vector Assembler, чтобы они могли использоваться моделями. Давайте создадим этапы / шаги для этих процессов, чтобы можно было использовать их позже на этапе построения модели.

Построение и сохранение модели

В этой части мы создадим модель машинного обучения с обучающими данными, а затем вычислим метрики для этой модели, используя тестовые данные для оценки модели. Мы будем использовать базовую модель Дерево решений. Теперь приступим к созданию этапа построения модели. Для других моделей смотрите код.

Теперь, используя API ML Pipeline Spark, мы можем использовать эти этапы для создания конвейера. Одним из основных преимуществ использования API конвейера является то, что он делает код более управляемым и читаемым.

Следующая часть состоит в том, чтобы разделить данные на обучение и тестирование, а затем использовать данные обучения для построения модели и получения прогнозов для тестовых данных.

Теперь можно рассчитать метрики для оценки модели, в этом посте использовались значения RMSE и R-квадрат. Есть еще много других показателей, которые можно использовать в зависимости от вашего выбора и варианта использования.

Чтобы сохранить / сохранить модель, мы можем использовать приведенный ниже код.

Приведенный выше код используется, потому что эта модель является моделью конвейера, и сначала нам нужно извлечь модель, а затем сохранить ее, и stages[-1] помогает нам получить объект модели, который затем можно сохранить с помощью функции save(). Мы также можем сохранить модель конвейера напрямую, используя функцию save() без stages[-1].

Не пропустите следующий пост, который является продолжением этого, где вы можете фактически применить модель к потоковой передаче данных, а также визуализировать ее.

Загляните на страницу github, чтобы узнать о многом другом, что я сделал в записной книжке Python, но для краткости пришлось сократить в посте.

Большое спасибо за то, что нашли время, чтобы прочитать сообщение, любезно оставляйте комментарии, если вы хотите обсудить что-либо, связанное с этим сообщением, я сделаю все возможное, чтобы помочь вам.