Обучайте и оценивайте тысячи независимых моделей прогнозирования продаж для каждого магазина за считанные секунды.

В этой статье будет показано, как специалисты по машинному обучению могут комбинировать знакомые инструменты и методы, такие как Python и инструменты моделирования с открытым исходным кодом, а также гибкость и масштабируемость Teradata Vantage для создания, хранения, мониторинга и выполнения моделей машинного обучения в масштабе.

Машинное обучение произвело революцию в том, как устроен мир; предприятия, люди, наша окружающая среда и наши машины были изменены силой этих новых инструментов и методов. Teradata Vantage предлагает еще одну революцию в области машинного обучения. возможность перехода от моделей, построенных и развернутых как единые обобщенные настраиваемые выходные данные, к процессу, в котором настраиваемые гиперсегментированные модели могут создаваться, управляться и выполняться параллельно в масштабе от тысяч до миллионов.

Как правило, модели машинного обучения строятся по одной или несколько за раз путем применения процесса обучения модели к набору данных, а затем практикующий специалист оценивает «лучшую» модель, корректирует входные данные и гиперпараметры, а затем пытается снова. Окончательная модель затем развертывается для новых поступающих данных и ответов; это Мошенничество? Когда мой инвентарь закончится? Что я должен предложить клиенту?

Что, если бы мы могли строить и оценивать модели не по одной за раз и не по небольшим группам типов пользователей, сегментов или продуктовых линеек; но для КАЖДОГО пользователя, КАЖДОГО артикула продукта в КАЖДОМ магазине или КАЖДОГО автомобиля в дороге? Если бы у каждого объекта была собственная модель, которая обучалась бы для своего определенного набора функций или диапазона значений. Что, если бы мы рассматривали все артефакты машинного обучения (входные данные, гиперпараметры, метрики оценки и выходные данные) как «наборы» данных, а не как единичные экземпляры?

В обычном способе ведения дел есть сила; многие методы машинного обучения и искусственного интеллекта действительно подходят только для отдельных входов и выходов. Например, может не иметь смысла пытаться обучить модель прогнозирования сердечных заболеваний на основе ЭКГ одного пациента. Тем не менее, для таких сценариев использования, как прогнозирование для каждого магазина по продукту, микросегментация или неконтролируемые задачи обучения, такие как распознавание изображений, возможность использовать машинное обучение и оценку в массовых масштабах представляет собой революционно новую ценность. параллельный масштаб.

Собираем вместе с Teradata Vantage

Для этой реализации мы будем использовать несколько мощных функций аналитической базы данных Teradata Vantage:

  • Массовая параллельная обработка. Teradata — одна из оригинальных и самых мощных аналитических баз данных MPP без совместного использования в мире. Более сорока лет разработок и инноваций привели к постоянным улучшениям в планировании рабочих нагрузок, поддержке параллелизма и оптимизации обработки, чтобы обеспечить самый масштабируемый и мощный аналитический механизм в мире.
  • СКРИПТ Оператор таблицы. Оператор таблицы SQL SCRIPT с умопомрачительно-скучным названием скрывает в себе невероятно мощную возможность, встроенную в архитектуру Teradata. Вкратце, эта функция позволяет пользователю выполнять Python (и другие исполняемые файлы) на каждом параллельном рабочем потоке в кластере Teradata (называемом AMP; в системах могут быть тысячи AMP). Данные передаются в сценарий с использованием стандартного ввода (stdin) и передаются из сценария с помощью стандартного вывода (stdout). Это позволяет нам взять порцию данных (разделенную по значению, диапазону строк или всем строкам), выполнить произвольный код для набора данных, а затем вывести практически что угодно. Эта возможность является сутью нашего параллельного обучения модели, позволяя каждому работнику брать свой собственный набор данных, подгонять модель/прогнозировать серию и выводить результирующий набор данных обратно в функцию.
  • Teradataml метод map_partition. Если вы читали документацию выше, оператор таблицы SCRIPT — это функция SQL; тот, который (среди прочего) требует, чтобы пользователь загружал скрипт Python в узлы базы данных, используя другие функции SQL. Однако это стало намного проще с появлением клиентской библиотеки Python для Teradata; терадатамл. map_partition выполнит тяжелую работу для функции SCRIPT — в одном вызове метода она сериализует клиентскую функцию в кластер и выполняет ее с данными, поскольку они находятся на узлах базы данных. Это похоже на методы Pandas map() или map_apply(), которые передают функцию в строки или столбцы; но в нашем случае он будет проецировать эту функцию на данные во всех AMP, тем самым выполняя более тысячи параллельных сред выполнения Python.

Дизайн решения

Далее следует сокращенный пример того, как выполнять ключевые шаги в процессе микропрогнозирования, и к нему применимы некоторые серьезные оговорки. Этот код не охватывает критически важные элементы прогнозирования временных рядов, такие как периодичность, стационарность и т. д. Я использую прогнозирование как простую иллюстрацию возможностей комбинированной архитектуры и клиентских библиотек Python.

Исходные данные

Данные, которые я использую для этой демонстрации, — это набор данных Rossmann Sales, который доступен в Kaggle и предоставляет удобный набор данных временных рядов с 1115 магазинами и 942 ежедневными наблюдениями за количеством продаж, наряду с другими функциями, такими как продвижение, день недели и др.

Модель прогнозирования

В этом упражнении я буду использовать класс statsmodels ARIMA (авторегрессивная интегрированная скользящая средняя), так как он довольно популярен и может быстро генерировать некоторые результаты, хотя, возможно, и не самые точные. Существует множество других библиотек прогнозирования временных рядов, и с любой из них можно использовать тот же подход.

Контур блокнота

Целью этого упражнения будет создание демонстрационной записной книжки, которая позволит пользователю выполнять прогноз временных рядов по множеству различных рядов параллельно. Шаги высокого уровня следующие:

  1. Подключиться к нашей целевой системе
  2. Проверить набор данных
  3. Построить базовую модель ARIMA на клиенте, оценить точность модели
  4. Создайте простую модель параллельного выполнения на подмножестве данных
  5. Расширьте простую модель — используйте полный набор данных и добавьте показатели оценки.

Реализация ноутбука Jupyter

Шаг 1 — Импорт и подключение

Импортируйте клиентские библиотеки и создайте экземпляр соединения с нашей целевой системой. Функция модуля create_context() установит соединение SQLAlchemy с целевой системой Teradata Vantage:

import warnings
warnings.filterwarnings('ignore')

import os, json, getpass
from datetime import datetime, timedelta

import pandas as pd
import numpy as np
import seaborn as sns

# Teradata python client libraries
from teradataml import *
from teradatasqlalchemy.types import *

from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.arima_model import ARIMAResults

%matplotlib inline
sns.set()

eng = create_context(host = 'host', user = 'user', password = getpass.getpass())

Шаг 2. Проверка набора данных

Одной из наиболее ценных функций клиентской библиотеки teradataml является возможность работать с данными в том виде, в каком они находятся на сервере, а не копировать большие объемы данных между клиентом и сервером. Кроме того, SQLAlchemy ORM преобразует синтаксис Pandas в базовый SQL, поэтому пользователи могут писать знакомый код, открывающий возможности масштабируемой обработки на стороне сервера. Здесь мы проверим набор данных при удалении — обратите внимание, что мы ничего не копируем, кроме небольшого количества данных результатов, которые мы запрашиваем:

# Create a pointer to the dataset in Vantage

tdf_sales = DataFrame('"TRNG_DataScienceExploration"."ROSSMANN_SALES"')
tdf_sales.head(5)

Шаг 3. Проверка простого прогноза ARIMA

Прежде всего, я хотел бы немного упростить свой код. Я буду использовать небольшое количество SQL для предварительного форматирования моего фрейма данных и выбирать только те столбцы, которые мне нужны — это легко сделать с помощью метода .from_query() в моем конструкторе teradataml DataFrame. Затем я скопирую клиенту данные о продажах только одного Магазина и прогоню их через процесс подбора модели и прогнозирования.

# Construct my "Virtual DataFrame" from a SQL query:
qry = '''
SELECT "Date" as recording_date, Store, Sales
FROM "TRNG_DataScienceExploration"."ROSSMANN_SALES"
'''

tdf_analytic_set = DataFrame.from_query(qry)

# Copy a single store's data to the client
df = tdf_analytic_set[tdf_analytic_set['Store'] == 1].to_pandas(all_rows = True)
df = df.reset_index(level = 0)

# Create my model and prediction
model = ARIMA(df['Sales'].sort_index(), order=(8,0,6), missing = 'drop')
model_fit = model.fit()

df['prediction'] = model_fit.predict(start=0, end=100, dynamic = False)
df = df[['Sales','prediction']].sort_index().iloc[0:100]

# plot the results - visualize the model accuracy
df.plot(figsize=(12,8));

На первый взгляд эта модель выглядит «нормально»; давайте посчитаем некоторые дополнительные метрики точности:

  • SMAPE (симметричная средняя абсолютная ошибка в процентах)
  • MAE (средняя абсолютная ошибка)
  • RMSE (среднеквадратичная ошибка)
print('smape: ' , str((100/100) * np.sum(2 * np.abs(df['prediction'] - df['Sales'])/(np.abs(df['Sales']) + np.abs(df['prediction'])))))  # MAPE
print('mae: ', str(np.mean(np.abs(df['prediction'] - df['Sales']))))
print('rmse', str(np.mean((df['prediction'] - df['Sales'])**2)**.5))

Эхх — не очень; но мы здесь не для точности модели. Кажется, это работает несколько на локальном клиенте.

Шаг 4. Создайте простую функцию прогнозирования на стороне сервера

Подобно методам Pandas map() или map_apply(), метод teradataml map_partition() принимает функцию в качестве аргумента. Эта функция будет принимать итерацию, представляющую строки входных данных, а также любые дополнительные параметры, которые мы хотим определить; и вернуть массив, серию или кадр данных, представляющий результаты. Здесь мы добавим начальный и конечный диапазон для процесса прогнозирования и вернем результирующий кадр данных:

def arima_predict(rows, start, end):
    # Read the rows of data from standard input
    df = rows.read()
    
    # some AMPs may not receive data
    if df.shape[0] > 0:
        # format the incoming dataframe with a datetime index
        df['recording_date'] = pd.to_datetime(df['recording_date'], format = '%Y/%m/%d').dt.date
        df = df.set_index('recording_date')
        
        # perform the modeling and prediction
        model = ARIMA(df['Sales'].sort_index(), order=(8,0,6), missing = 'drop')
        model_fit = model.fit()
        df['prediction'] = model_fit.predict(start = start, end = end, dynamic = False)
        
        # format the date for output to stoud
        df = df.reset_index()
        df['recording_date'] = df['recording_date'].astype(str)
        df.columns = ['recording_date', 'Store', 'Sales', 'prediction']
        
    return df

Чтобы правильно транслировать эту функцию в данные на стороне сервера, мы создаем новый «Виртуальный DataFrame» (опять же — ссылка на набор данных на сервере), который является результатом вызова метода map_partition. Этот метод принимает несколько ключевых параметров;

  • Лямбда-выражение, которое будет транслировать функцию через итерируемый объект. Итерируемый — это группировка каждого Store, определяемая значением data_partition_column. В этом случае мы ограничили количество Магазинов, используя только любой идентификатор Магазина, меньший или равный 10.
  • Вызов функции и любые дополнительные аргументы ключевого слова.
  • «Столбец раздела» — это значение столбца для разделения данных. На сервере среда выполнения python будет выполняться для каждого значения раздела.
  • Словарь, определяющий поля и формат возвращаемых данных. Обратите внимание, что это не возвращает данные клиенту, это определяет формат результирующего набора данных на сервере — в данном случае виртуальный фрейм данных «tdf_predict».
# Push the function to Vantage and return the ordered dataframe columns including the prediction

tdf_predict = tdf_analytic_set[tdf_analytic_set['Store'] <= 10].map_partition(lambda rows: arima_predict(rows, start = 0, end = 100), 
                  data_partition_column = 'Store', 
                  returns = {'recording_date':DATE(), 'Store':INTEGER(), 'Sales':INTEGER(), 'prediction':INTEGER()})

Итак, теперь у нас есть структура данных, которая находится на сервере (которую мы назвали tdf_predict), которая представляет исходные столбцы; «recording_date», «Магазин» и «Продажи», к которым добавлен прогноз для определенного диапазона индексов, для которого мы направили функцию для прогнозирования. Модель была подобрана и спрогнозирована на основе уникального набора данных, представленного объемом продаж одного магазина. Если мы хотим, мы можем скопировать прогноз обратно клиенту и посмотреть:

# Grab one store's predictions and plot them
new_df = tdf_predict[tdf_predict['Store'] == 9].to_pandas(all_rows = True)
new_df.sort_index()[['Sales', 'prediction']].iloc[1:100].plot(figsize=(12,8));

Шаг 5. Расширьте это на все данные и внедрите показатели оценки

На последнем этапе процесса мы объединим этапы подбора модели, прогнозирования и оценки в серверной функции. Эта новая функция будет принимать значения p, d, q, начальные/конечные срезы и количество шагов для прогнозирования в качестве параметров. Затем функция будет возвращать только прогнозируемые значения, а также показатели оценки модели и дату выполнения:

# Perform a one-shot fit, predict, analyze, and forecast per partition, pass the start and end slices

def arima_predict_forecast(rows, p, d, q, start, end, steps):
    df = rows.read()
    if df.shape[0] > 0:
        df['recording_date'] = pd.to_datetime(df['recording_date'], format = '%Y/%m/%d').dt.date
        df = df.set_index('recording_date')
        df = df.sort_index()
        model = ARIMA(df['Sales'], order=(p,d,q), missing = 'drop')
        model_fit = model.fit()
        df['prediction'] = model_fit.predict(start = start, end = end, dynamic = False)
        
        #calculate some accuracy scores based on the prediction slice
        smape = (100/(end-start)) * np.sum(2 * np.abs(df['prediction'].iloc[start:end] - df['Sales'].iloc[start:end])/(np.abs(df['Sales'].iloc[start:end]) + np.abs(df['prediction'].iloc[start:end]))) # smape
        rmse = np.mean((df['prediction'].iloc[start:end] - df['Sales'].iloc[start:end])**2)**.5  # RMSE
        mae = np.mean(np.abs(df['prediction'].iloc[start:end] - df['Sales'].iloc[start:end]))    # MAE
        
        # when the model was trained
        training_date = str(datetime.datetime.now().date())
        
        # now - run the forecast, and prepare the data for stdout
        results = model_fit.forecast(steps = steps)
        df_return = pd.DataFrame(results, index = results.index, columns = ['predicted_mean']).reset_index().rename(columns = {'index':'FC_Date'})

        # Add the metadata - Store ID, training data, and accuracy scores
        df_return['Store'] = df['Store'].iloc[0]
        df_return['run_date'] = training_date
        df_return['SMAPE'] = smape
        df_return['RMSE'] = rmse
        df_return['MAE'] = mae
        
        return df_return

Теперь вызовите метод со ВСЕМИ данными:

%%time
# map the function to the per-store model dimension, and forecast a defined number of periods

# Create the dictionary to define the format of my return data

returns_dict = {'FC_Step':DATE(), 'forecast':INTEGER(), 'Store':INTEGER(), 'run_date':DATE(), 'SMAPE':FLOAT(), 'RMSE':FLOAT(), 'MAE':FLOAT()}

all_stores_forecast = tdf_analytic_set.map_partition(lambda rows: arima_predict_forecast(rows, 
                                                                                           p = 8, 
                                                                                           d = 0, 
                                                                                           q = 6, 
                                                                                           start = 0, 
                                                                                           end = 100, 
                                                                                           steps = 20), 
                                                        data_partition_column = 'Store', 
                                                        returns = returns_dict)

Магия %%time предназначена для демонстрации производительности вызова этого метода. В настоящее время мы параллельно выполняем 1115 прогнозов временных рядов, включая подгонку моделей, оценку и прогнозирование:

Чтобы выполнить все эти шаги на относительно небольшой демонстрационной системе, потребовалось 1:17 — представьте, что вы пытаетесь запустить 1115 прогнозов итеративно или выполнить трансляцию на одно ядро ​​​​ЦП с использованием собственного Python.

Простая проверка — давайте возьмем 25 магазинов и свернем данные, чтобы увидеть, как выглядят прогнозные значения:

df_forecast = all_stores_forecast[all_stores_forecast['Store'] <= 25].to_pandas(all_rows = True)
df_forecast = df_forecast.reset_index()
df_forecast = df_forecast.loc[df_forecast['FC_Step'].apply(lambda x: x != datetime.date(1970, 1, 1))]
df_forecast.pivot(index = 'FC_Step', columns = 'Store', values = 'forecast')

Заключение

Теперь у нас есть структура данных на сервере, которая представляет 20-дневный прогноз объема продаж для каждого отдельного магазина в компании. Было бы тривиально распространить это на разбиение по магазинам и SKU — создание многих тысяч или десятков тысяч прогнозов одновременно.

Теперь у нас также есть метаданные, отражающие точность более 1000 моделей — мы можем делать действительно интересные вещи, такие как анализ распределения этих показателей, тепловая карта всей совокупности магазинов (или SKU, или комбинаций store-sku), анализировать, как точность меняется с течением времени. время или любой другой аналитический процесс, который имеет дело с большими объемами данных.

Расширяя взгляды на то, как мы можем создавать, оценивать и хранить модели и данные моделей, мы можем начать рассматривать продукты машинного обучения как сами наборы данных — может быть, большие или сложные, но тем не менее наборы данных, которыми можно управлять. классифицировать и анализировать, как и любой другой актив данных.