Вступление

Машинное обучение быстро становится частью нашей повседневной жизни: оно позволяет программному обеспечению и устройствам управлять процедурами без вмешательства человека и, кроме того, дает нам возможность автоматизировать, стандартизировать и упростить многие повседневные задачи. Одна интересная тема - это, например, домашняя автоматизация, где теперь можно использовать интеллектуальное освещение, интеллектуальное отопление и автономных роботов, которые убирают полы даже в сложных домашних ландшафтах, заполненных препятствиями.

Вообще говоря, информация, которую можно получить с подключенных устройств, почти бесконечна. Дешевая стоимость сбора данных и вычислительная мощность для управления большими данными сделали машинное обучение доступным для многих сценариев использования. Один из самых интересных - это получение и анализ подключенных к IoT устройств в режиме реального времени.

В этой статье мы хотели бы поделиться решением, которое использует преимущества AWS Managed Services для обработки больших объемов данных в режиме реального времени, поступающих с одного или нескольких устройств, подключенных к IoT. Мы подробно покажем, как настроить конвейер, чтобы предоставить потенциальным пользователям доступ к результатам прогнозирования почти в реальном времени на основе полученных данных IoT.

Решение также исследует некоторые ключевые концепции, связанные с машинным обучением, заданиями ETL, очисткой данных и подготовкой озера данных.

Но прежде чем перейти к проектированию кода и инфраструктуры, необходимо сделать небольшое резюме по ML, IoT и ETL. Давайте вместе погрузимся в это!

Интернет вещей, машинное обучение и преобразование данных: ключевые понятия

Интернет вещей

Интернет вещей (IoT) - это распространенный способ описания набора взаимосвязанных физических устройств - «вещей», оснащенных датчиками, которые обмениваются данными друг с другом и через Интернет.

Интернет вещей быстро развивался благодаря снижению стоимости интеллектуальных датчиков и конвергенции множества технологий, таких как аналитика в реальном времени, машинное обучение и встроенные системы.

Конечно, традиционные области встроенных систем, беспроводных сенсорных сетей, систем управления и автоматизации также вносят свой вклад в мир Интернета вещей.

Машинное обучение

Машинное обучение возникло как эволюция искусственного интеллекта. Традиционное машинное обучение требовало, чтобы программисты писали сложные эвристики, которые трудно поддерживать, чтобы выполнять традиционно человеческие задачи (например, распознавание текста на изображениях) с помощью компьютера.

С помощью машинного обучения взаимосвязи между данными изучает сама система.

Например, в шахматной игре больше нет алгоритма, который заставляет играть в шахматы, но, предоставляя набор данных функций, касающихся шахматных игр, модель учится играть сама.

Машинное обучение также имеет смысл в распределенном контексте, где прогноз должен масштабироваться.

Преобразование данных

В конвейере машинного обучения данные должны быть единообразными, то есть стандартизованными. Различия в данных могут возникать из-за разнородных источников, таких как разная схема таблицы БД или разные рабочие процессы приема данных.

Преобразование (ETL: извлечение, преобразование, загрузка) данных, таким образом, является важным шагом во всех конвейерах машинного обучения. Стандартизированные данные важны не только для обучения модели машинного обучения, но также их намного проще анализировать и визуализировать на предварительном этапе обнаружения данных.

В общем, для очистки и форматирования данных обычно используются Scipy Pandas и подобные библиотеки.

- NumPy: библиотека для управления многомерными массивами, она в основном используется на этапе импорта и чтения набора данных.

- Pandas Dataframe: библиотека для управления данными в табличном формате. Он берет точки данных из файлов CSV, JSON, Excel и pickle и преобразует их в таблицы.

- SciKit-Learn: библиотека для окончательной обработки данных и обучения.

Очистка и форматирование данных необходимы для обеспечения наилучших шансов, что модель хорошо сходится к желаемому решению.

Трубопровод

Для достижения результата мы будем широко использовать то, что AWS дает нам в плане управляемых сервисов. Вот простой набросок, показывающий основных участников нашего конвейера машинного обучения.

Давайте посмотрим на назначение каждого компонента, прежде чем вдаваться в подробности каждого из них.

Конвейер состоит из 5 основных этапов: прием, подготовка озера данных, преобразование, обучение, вывод.

На этапе приема данные с наших подключенных устройств будут получены с помощью AWS IoT Core, что позволит подключать их к сервисам AWS без управления серверами и сложностей связи. Данные с устройств будут отправляться с использованием протокола MQTT, чтобы минимизировать объем кода и пропускную способность сети. При необходимости IoT Core также может управлять аутентификацией устройства.

Для отправки информации в озеро данных S3 мы будем использовать Amazon Kinesis Data Firehose, в котором есть встроенное действие для чтения сообщений IoT Core.

Чтобы преобразовать данные и сделать их доступными для AWS SageMaker, мы будем использовать AWS Glue: сервис бессерверной интеграции данных, который упрощает поиск, подготовку и объединение данных для аналитики, машинного обучения и разработки приложений. AWS Glue предоставляет все возможности, необходимые для интеграции данных, чтобы начать анализ и использование данных за считанные минуты, а не месяцы.

Наконец, чтобы обучить и затем развернуть нашу модель для онлайн-вывода, мы покажем, как использовать встроенные алгоритмы SageMaker, в частности, DeepAR.

Прием: от ядра Интернета вещей до огненного шланга Kinesis

Для подключения нашего тестового устройства к AWS мы использовали возможности AWS IoT Core. Далее мы предполагаем, что у читателя уже есть готовая учетная запись AWS.

AWS IoT Core

Перейдите в свою учетную запись и найдите «IoT Core», затем на странице службы в меню боковой панели выберите «Начать», а затем выберите «На борту устройства».

Следуйте инструкциям мастера, чтобы подключить устройство, как мы. Целью является:

  1. Создайте AWS IoT Thing
  2. Загрузите запрошенный код прямо на свое устройство, чтобы разрешить подключение к AWS.

Это важно, потому что мы также подключаем Kinesis Firehose для чтения сообщений, отправленных из IoT Core. В качестве примечания помните, что вам нужен доступ к устройству, и это устройство должно иметь TCP-соединение с общедоступным Интернетом через порт 8883.

Следуя указаниям мастера, выберите Linux в качестве ОС и SDK (в нашем случае Node.js):

После этого мы дали название новой «штуке» и получили комплект для подключения, в который входят:

  • Выбран SDK
  • Пример программы
  • Сертификаты, необходимые для подключения устройства

После загрузки инициализируйте новый проект Node.js и установите AWS-IoT-device-SDK. Это установит необходимые модули узла; после этого можно запустить включенный сценарий start.sh, включив все сертификаты, загруженные вместе с комплектом, в тот же каталог проекта.

Мы разработали наш пример, используя device-example.js в качестве простой основы, чтобы понять, как подключить устройство к AWS IoT. Соответствующая часть скрипта основана на следующих строках, выделенных жирным шрифтом.

const deviceModule = require(‘aws-iot-device-sdk’).device;
const cmdLineProcess = require(‘aws-iot-device-sdk/examples/lib/cmdline’);
processPollutionData = (args) => {
// Device properties which are needed
const device = deviceModule({
keyPath: args.privateKey,
certPath: args.clientCert,
caPath: args.caCert,
clientId: args.clientId,
region: args.region,
baseReconnectTimeMs: args.baseReconnectTimeMs,
keepalive: args.keepAlive,
protocol: args.Protocol,
port: args.Port,
host: args.Host,
debug: args.Debug
});
const minimumDelay = 250; // ms
const interval = Math.max(args.delay, minimumDelay);
// Send device information
setInterval(function() {
// Prepare Data to be sent by the device
const payload = {
ozone: Math.round(Math.random() * 100),
particullate_matter: Math.round(Math.random() * 100),
carbon_monoxide: Math.round(Math.random() * 100),
sulfure_dioxide: Math.round(Math.random() * 100),
nitrogen_dioxide: Math.round(Math.random() * 100),
longitude: 10.250786139881143,
latitude: 56.20251117218925,
timestamp: new Date()
};
device.publish(‘<YOUR_TOPIC>’, JSON.stringify(payload));
}, interval);
// Device callbacks, for the purpose of this example we have put
// some simple console logs
device.on(‘connect’, () => { console.log(‘connect’); });
device.on(‘close’, () => { console.log(‘close’); });
device.on(‘reconnect’, () => { console.log(‘reconnect’); });
device.on(‘offline’, () => { console.log(‘offline’); });
device.on(‘error’, (error) => { console.log(‘error’, error); });
device.on(‘message’, (topic, payload) => {
console.log(‘message’, topic, payload.toString());
});
}
// this is a precooked module from aws to launch
// the script with arguments
module.exports = cmdLineProcess;
// Start App
if (require.main === module) {
cmdLineProcess(‘connect to the AWS IoT service using MQTT’,
process.argv.slice(2), processPollutionData);
}

Нам потребовались модули Node.js, необходимые для подключения устройства к AWS и публикации в соответствующей теме. Вы можете считывать данные со своего датчика любым способом, например, если устройство может записывать данные с датчика в определенном месте, просто прочтите и приведите эти данные в строку с помощью device.publish ('‹YOUR_TOPIC›', JSON. stringify (полезная нагрузка)).

Последняя часть кода просто вызывает основную функцию, чтобы начать отправку информации на консоль.

Для запуска сценария используйте сценарий start.sh, входящий в комплект разработчика, обязательно указывайте на свой код, а не на пример кода от AWS. Оставьте сертификаты и идентификатор клиента как есть, потому что они были сгенерированы из вашей предыдущей настройки.

Примечание. Для этой статьи код устройства слишком упрощен, не используйте его так в производственной среде.

Чтобы проверить, что все работает должным образом, войдите в консоль AWS IoT, перейдите в раздел Тест на левой боковой панели, при появлении запроса введите название своей темы и нажмите «Подписаться на тему». Если все настроено правильно, вы должны увидеть что-то вроде скриншота ниже:

Теперь нам нужно подключить Kinesis Firehose, чтобы начать отправку данных на S3.

Kinesis Firehose (Огненный шланг Kinesis)

Постоянное обновление озера данных для заполнения регистра данных данными, отправляемыми устройством, чрезвычайно важно, чтобы избежать проблемы, называемой дрейф концепций , которая возникает, когда происходит постепенное смещение развернутая модель применительно к миру реальных данных; это происходит потому, что исторические данные больше не могут отражать возникшую проблему.

Чтобы преодолеть эту проблему, мы должны обеспечить эффективную регистрацию и средства, позволяющие понять, когда вмешиваться в модель, например. путем повторного обучения или обновления версии и повторного развертывания обновленной версии. Чтобы решить эту «проблему», мы определяем действие Kinesis Firehose, в частности, для автоматической регистрации и передачи каждого сообщения MQTT, доставленного с устройства, непосредственно в Amazon S3, чтобы всегда обеспечивать наше озеро данных свежими данными.

Создайте поток Firehose

Чтобы создать поток Firehose, найдите «Kinesis firehose» в строке поиска службы, выберите его, затем «Создать поток доставки», как показано на рисунке:

Выберите допустимое имя в разделе «Имя потока доставки», «Direct PUT или другие источники» в разделе «Источники», а затем, на следующей странице, оставьте все как есть (мы преобразуем данные в S3 позже), наконец, в на последней странице выберите S3 в качестве места назначения и в конечном итоге добавьте префикс к данным, вставленным в корзину. Нажмите «Далее» и создайте поток.

Создайте правило Интернета вещей

Чтобы использовать поток, мы должны подключить его к AWS IoT с помощью правила IoT; это правило позволит Kinesis получать сообщения и записывать их в корзину S3. Чтобы настроить AWS IoT для отправки в Firehose, мы выполнили следующие действия:

  1. При создании правила в консоли AWS IoT на странице «Создание правила» в разделе «Установить одно или несколько действий» выберите «Добавить действие».
  2. Выберите «Отправить сообщение в поток Amazon Kinesis Firehose».

3. В качестве имени потока выберите только что созданный поток доставки Kinesis Data Firehose.

4. В качестве разделителя выберите символ-разделитель, который будет вставляться между записями, например запятая (,).

5. В качестве имени роли IAM выберите «Создать новую роль».

6. Выберите «Добавить действие».

Это пример того, как будет создано такое правило:

{
"topicRulePayload": {
"sql": "SELECT * FROM '<your_topic_name>'",
"ruleDisabled": false,
"awsIotSqlVersion": "2016-03-23",
"actions": [
{
"firehose": {
"deliveryStreamName": "<your_firehose_stream>",
"roleArn": "arn:aws:iam::<account_number>:role/<role_name>"
}
}
]
}
}

Если все в порядке, вы скоро начнете видеть данные, отображаемые в вашем сегменте, как в примере ниже:

И открытие одного из них покажет данные, сгенерированные с нашего устройства!

Datalake: S3

Amazon Simple Storage Service - это сервис объектного хранилища, идеально подходящий для создания базы данных. Обладая практически неограниченной масштабируемостью, база данных Amazon S3 предоставляет множество преимуществ при разработке аналитики для больших данных.

Централизованная архитектура данных S3 упрощает создание многопользовательской среды, в которой несколько пользователей могут использовать свой собственный инструмент анализа больших данных для анализа общего набора данных.

Более того, S3 легко интегрируется с другими веб-сервисами Amazon, такими как Amazon Athena, Amazon Redshift и, как в представленном случае, Amazon Glue.

S3 также позволяет отделить хранилище от вычислений и обработки данных, чтобы оптимизировать затраты и рабочие процессы обработки данных, а также сохранить решение сухим, масштабируемым и поддерживаемым.

Кроме того, S3 позволяет хранить любые типы структурированных, полуструктурированных или даже неструктурированных данных в их собственном формате. В нашем случае мы просто заинтересованы в сохранении фиктивных данных с тестового устройства, чтобы делать простые прогнозы.

Процесс ETL: AWS Glue

Даже если данные сохраняются на Amazon S3 практически в реальном времени, этого все равно недостаточно для обучения модели Amazon SageMaker. Как мы объяснили во введении, на самом деле данные должны быть подготовлены, и при работе с предопределенными алгоритмами AWS SageMaker следует помнить о некоторых значениях по умолчанию.

Например, SageMaker не принимает заголовки, и в случае, если мы хотим определить контролируемое обучение, нам также необходимо указать основную истину в качестве первого столбца набора данных.

В этом простом примере мы использовали Glue studio для преобразования необработанных данных во входном сегменте S3 в структурированные файлы паркета для сохранения в выделенном выходном сегменте. Выходная корзина будет использоваться Sagemaker в качестве источника данных.

Клей Crawler

Во-первых, нам нужен Crawler для чтения из исходного ведра для создания Glue Schema. Чтобы создать его, перейдите на страницу AWS Glue, выберите Сканеры в разделе «Консоль Glue», добавьте новый сканер, просто указав имя, выбрав исходную корзину S3 и корневую папку, созданную Kinesis Data Firehose. . На основе этой информации будет создана новая схема. Оставьте все остальные параметры по умолчанию.

После создания сканера активируйте его, нажав «Запустить поисковый робот».

Следующим шагом является настройка задания Glue Studio с использованием Каталога в качестве источника данных.

Работа ETL

Задание AWS Glue Studio состоит как минимум из 3 основных узлов: источник, преобразование и цель. Нам необходимо настроить все три узла для определения сканера, способного читать и преобразовывать данные на лету.

Для этого мы сделали следующий шаг:

  1. Выберите Создание заданий и управление ими на панели управления AWS Glue Studio.
  2. На странице «Управление заданиями» выберите источник и цель, добавленные в параметр графика. Затем выберите S3 для источника и S3 для цели.

3. Нажмите кнопку «Создать», чтобы начать процесс создания задания.

Теперь вы увидите график с тремя узлами, который представляет этапы процесса ETL. Когда AWS Glue получает указание на чтение из источника данных S3, он также создает внутреннюю схему, которая называется Каталог данных Glue.

Чтобы настроить исходный узел, щелкните его на графике:

  1. На вкладке «Свойства узла» в поле «Имя» введите имя, уникальное для этого задания. Введенное вами значение используется в качестве метки для узла источника данных на графике. Выберите Свойства источника данных - вкладка S3 на панели сведений об узле.
  2. Выберите свою базу данных сканера из списка доступных баз данных в каталоге данных AWS Glue.
  3. Выберите правильную таблицу из Каталога.

То же самое можно сделать и для узла преобразования: щелкнув по нему, можно определить, какое преобразование мы хотим применить к входным данным. Здесь вы также можете убедиться, что данные JSON импортированы правильно:

Наконец, мы можем выбрать целевой узел, снова указав S3 в качестве цели и используя .parquet в качестве выходного формата.

Теперь нам нужно установить параметры задания ETL для только что созданного рабочего процесса. Перейдите на вкладку «Сведения о задании» справа, дайте имя и выберите роль, способную управлять данными и повторное развертывание на S3.

Остальное оставьте по умолчанию.

Обратите внимание, что у вас должен быть этот фрагмент на вкладке «Доверительные отношения» роли, чтобы позволить Glue принять его на себя:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": { "Service": "glue.amazonaws.com" },
"Action": "sts:AssumeRole"
}
]
}

Если все определено правильно, то задание запустится и начнется преобразование ваших данных в формат паркета. Файлы будут помещены в ваш выходной каталог в выбранную вами корзину.

Оптимизация набора данных: почему паркет вместо CSV

Мы решили использовать паркет вместо формата данных CSV для целевого набора данных. Parquet - это столбчатый формат с высокой степенью сжатия, в котором используется алгоритм уничтожения и сборки записей, который значительно превосходит простое сглаживание вложенных пространств имен. Он имеет следующие преимущества:

  • Он обеспечивает эффективность по сравнению с строковыми файлами, такими как CSV. При запросе столбчатое хранилище с пропуском нерелевантных данных может быть выполнено очень быстро.
  • Запросы на агрегирование занимают меньше времени по сравнению со строковыми базами данных, что минимизирует задержку при доступе к данным.
  • Apache Parquet может поддерживать расширенные вложенные структуры данных.
  • Parquet поддерживает гибкие параметры сжатия и эффективные схемы кодирования.
  • Apache Parquet лучше всего работает с интерактивными и бессерверными технологиями, такими как AWS Athena, Amazon Redshift и AWS Glue.

Также по сравнению с файлом, хранящимся в формате .csv, у нас есть следующие преимущества с точки зрения экономии:

  • Amazon Athena и Redshift Spectrum будут взимать плату в зависимости от количества данных, сканируемых по запросу.
  • Amazon взимает плату в соответствии с объемом данных, хранящихся на S3.

Этап машинного обучения: прогнозирование с помощью Amazon SageMaker

Amazon SageMaker предлагает 17 готовых алгоритмов, которые охватывают множество тем, связанных с проблемами машинного обучения. В нашем случае мы хотели упростить разработку модели прогнозирования для данных, полученных с нашего устройства, поэтому вместо того, чтобы показывать, как создать свой собственный алгоритм, как в нашей предыдущей статье, на этот раз мы будем использовать предварительно приготовленный.

Как объяснялось ранее, помимо очистки данных, наш процесс ETL был выполнен для преобразования данных в совместимость с готовыми алгоритмами SageMaker.

SageMaker API и библиотека Sklearn предлагают методы для извлечения данных, вызова метода обучения, сохранения модели и развертывания ее в производственной среде для интерактивного или пакетного вывода.

Начните с перехода на страницу SageMaker и создайте новый экземпляр записной книжки, для этой статьи мы выбрали ml.t3.medium. Добавьте имя и создайте новую роль IAM.

Оставьте все остальное по умолчанию и нажмите «Создать записную книжку».

Доступ к нему можно получить либо с помощью Юпитера, либо от лаборатории Юпитера, мы выбрали второе. Нам удалось создать простую записную книжку, иллюстрирующую все этапы использования предварительно поддерживаемого алгоритма DeepAR от AWS Sagemaker.

Примечание: код создан исключительно для этой статьи и не предназначен для производства, так как предварительное исследование данных и проверка результатов не проводятся. Тем не менее, весь представленный код протестирован и может использоваться для случаев использования, аналогичных представленному.

Начнем с импорта всех необходимых библиотек:

import time
import io
import math
import random
import numpy as np
import pandas as pd
import JSON
import matplotlib.pyplot as plt
import boto3
import sagemaker
from sagemaker import get_execution_role
# set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

Мы также устанавливаем начальное число для наших случайных методов, чтобы обеспечить воспроизводимость. После этого нам нужно восстановить наши файлы parquet из S3 и получить от них фрейм данных Pandas.

bucket = "<your_bucket_name>"
data = "output"
model = "model"
sagemaker_session = sagemaker.Session()
role = get_execution_role()
s3_data_path = f"{bucket}/{data}"
s3_output_path = f"{bucket}/{model}/"

Сначала мы подготавливаем все «пути» S3, которые будут использоваться в записной книжке, и создаем сеанс SageMaker и действительную IAM роль с помощью get_execution_role (). Как видите, SageMaker позаботится обо всех этих аспектах за нас.

from sagemaker.amazon.amazon_estimator import get_image_uri
image_uri = get_image_uri(boto3.Session().region_name, "forecasting-deepar")

На шаге выше мы восстанавливаем наш Оценщик прогнозов DeepAR. Оценщик - это класс в SageMaker, способный генерировать и тестировать модель, которая затем будет сохранена на S3.

Перед тем, как приступить к чтению файлов паркета, мы также добавим в наш эксперимент пару констант:

freq = "H"
prediction_length = 24
context_length = 24 # usually prediction and context are set equal or similar

Используя freq (частота), мы говорим, что хотим анализировать TimeSeries по почасовым показателям. Для прогноза и продолжительности контекста задано значение 1 день, и это, соответственно, количество часов, которые мы хотим спрогнозировать в будущем, и сколько часов в прошлом мы будем использовать для прогноза. Обычно эти значения определяются в днях, поскольку набор данных намного больше.

Мы создали два вспомогательных метода для чтения из паркетных файлов:

# Read single parquet file from S3
def pd_read_s3_parquet(key, bucket, s3_client=None, **args):
if not s3_client:
s3_client = boto3.client('s3')
obj = s3_client.get_object(Bucket=bucket, Key=key)
return pd.read_parquet(io.BytesIO(obj['Body'].read()), **args)
# Read multiple parquets from a folder on S3 generated by spark
def pd_read_s3_multiple_parquets(filepath, bucket, **args):
if not filepath.endswith('/'):
filepath = filepath + '/'  # Add '/' to the end
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')
s3_keys = [item.key for item in s3.Bucket(bucket).objects.filter(Prefix=filepath)
if item.key.endswith('.parquet')]
if not s3_keys:
print('No parquet found in', bucket, filepath)
dfs = [pd_read_s3_parquet(key, bucket=bucket, s3_client=s3_client, **args)
for key in s3_keys]
return pd.concat(dfs, ignore_index=True)

Затем мы фактически читаем наборы данных:

# get all retrieved parquet in a single dataframe with helpers functions
df = pd_read_s3_multiple_parquets(data, bucket)
df = df.iloc[:, :8] # get only relevant columns
df['hour'] = pd.to_datetime(df['timestamp']).dt.hour #add hour column for the timeseries format
# split in test and training
msk = np.random.rand(len(df)) < 0.8 # 80% mask
# Dividing in test and training
training_df = df[msk]
test_df = df[~msk]

Здесь мы манипулируем набором данных, чтобы его можно было использовать с DeepAR, который имеет собственный формат ввода. Мы используем df.iloc [:,: 8], чтобы сохранить только исходные столбцы без столбцов, созданных Glue Schema. Мы создаем новый столбец час, чтобы ускорить процесс. Наконец, мы разделили набор данных в пропорции 80/20 для обучения и тестирования.

Затем мы временно записываем данные обратно в S3, как того требует DeepAR, создавая файлы JSON с сериями в них.

# We need to resave our data in JSON because this is how DeepAR works
# Note: we know this is redundant but is for the article to show how many ways
# there are to transform dataset back and forth from when data is acquired
train_key = 'deepar_training.json'
test_key  = 'deepar_test.json'
# Write data in DeepAR format
def writeDataset(filename, data):
file=open(filename,'w')
previous_hour = -1
for hour in data['hour']:
if not math.isnan(hour):
if hour != previous_hour:
previous_hour = hour
# One JSON sample per line
line = f"\"start\":\"2021-02-05 {int(hour)}:00:00\",\"target\":{data[data['hour'] == hour]['ozone'].values.tolist()}"
file.write('{'+line+'}\n')

Сгенерированные документы JSON имеют следующий формат:

{"start":"2021-02-05 13:00:00","target":[69.0, 56.0, 2.0, …]}

После этого мы можем записывать наши файлы JSON в S3.

writeDataset(train_key, training_df)
writeDataset(test_key, test_df)
train_prefix   = 'model/train'
test_prefix    = 'model/test'
train_path = sagemaker_session.upload_data(train_key, bucket=bucket, key_prefix=train_prefix)
test_path  = sagemaker_session.upload_data(test_key,  bucket=bucket, key_prefix=test_prefix)

Для этого мы используем sagemaker_session.upload_data (), передавая местоположение вывода. Теперь мы можем, наконец, определить оценщик:

estimator = sagemaker.estimator.Estimator(
sagemaker_session=sagemaker_session,
image_uri=image_uri,
role=role,
instance_count=1,
instance_type="ml.c4.xlarge",
base_job_name="pollution-deepar",
output_path=f"s3://{s3_output_path}",
)

Мы передадим оценщику сеанс SageMaker, изображение алгоритма, тип экземпляра и выходной путь модели. Нам также необходимо настроить некоторые гиперпараметры:

hyperparameters = {
"time_freq": freq,
"context_length": str(context_length),
"prediction_length": str(prediction_length),
"num_cells": "40",
"num_layers": "3",
"likelihood": "gaussian",
"epochs": "20",
"mini_batch_size": "32",
"learning_rate": "0.001",
"dropout_rate": "0.05",
"early_stopping_patience": "10",
}
estimator.set_hyperparameters(**hyperparameters)

Эти значения взяты непосредственно из официальных примеров AWS на DeepAR. Нам также необходимо передать оценщику два канала, обучение и тест, чтобы начать процесс настройки.

data_channels = {"train": train_path, "test": test_path}
estimator.fit(inputs=data_channels)

После обучения и тестирования модели мы можем развернуть ее с помощью Предиктора в реальном времени.

# Deploy for real time prediction
job_name = estimator.latest_training_job.name
endpoint_name = sagemaker_session.endpoint_from_job(
job_name=job_name,
initial_instance_count=1,
instance_type='ml.m4.xlarge',
role=role
)
predictor = sagemaker.predictor.RealTimePredictor(
endpoint_name,
sagemaker_session=sagemaker_session,
content_type="application/json")

Предиктор создает конечную точку, которая видна из консоли AWS.

Конечная точка может быть вызвана любым приложением с включенным REST, передающим запрос в формате, подобном приведенному ниже:

{
"instances": [
{
"start": "2021-02-05 00:00:00",
"target": [88.3, 85.4, ...]
}
],
"configuration": {
"output_types": ["mean", "quantiles", "samples"],
"quantiles": ["0.1", "0.9"],
"num_samples": 100
}
}

«Цели» - это некоторые выборочные значения, начинающиеся с периода, установленного в «start», по которому мы хотим сгенерировать прогноз.

Наконец, если нам больше не нужна конечная точка, мы можем удалить ее с помощью:

sagemaker_session.delete_endpoint(endpoint_name)

Вывод в реальном времени: от концепции до производства

Вывод в реальном времени относится к предсказанию, которое некоторые модели дают в реальном времени. Это типичный вариант использования многих рекомендательных систем или, как правило, когда прогноз касается одноразового использования. Применяется при:

  • Мы имеем дело с динамическими данными.
  • У нас требуется низкая задержка.
  • Нам нужны прогнозы в реальном времени.
  • Для него характерно одно предсказание.

Как правило, им немного сложнее управлять по сравнению с тем, что мы делали в ноутбуке, и обычно он определяется в отдельном конвейере из-за его природы высокой доступности и быстрого времени отклика.

При развертывании с использованием SageMaker API можно создать процесс развертывания, очень похожий на то, как развертывается или обновляется веб-приложение, с учетом таких вещей, как перенаправление трафика и методы развертывания, такие как Blue / Green или Canary. Мы хотим поделиться с вами кратким руководством по обоим методам, чтобы вы могли попробовать их самостоятельно!

Как развернуть

  1. Создайте модель с помощью CreateModelApi.
  2. Создайте конечную точку HTTPS, используя CreateEndpointConfigApi, введя в качестве свойств:
  • Модель
  • Варианты производства
  • Тип экземпляра
  • Количество экземпляров
  • Масса

3. Завершите создание конечной точки с помощью CreateEndpointApi. Передайте данные двух предыдущих конфигураций и любые теги этой последней команде.

Примечание. В производственных вариантах мы можем реализовать различные стратегии развертывания, такие как A / B и СИНИЙ / ЗЕЛЕНЫЙ.

Развернуть синий / зеленый

  1. Создайте новую версию модели.
  2. Создайте конфигурацию конечной точки, скопировав данные из старой.
  3. Обновите производственные варианты, добавив новую конфигурацию.
  4. Вызовите UpdateEndpointApi с новой конфигурацией. Добавлена ​​инфраструктура Зеленая, здесь мы можем провести синтетическое тестирование.
  5. Перенаправить трафик на зеленый. Если зеленый работает хорошо, с помощью другого UpdateEndpointApi удалите старую модель.

Развернуть A / B

Используется, если мы хотим измерить производительность различных моделей по отношению к метрике высокого уровня.

  1. Создайте несколько моделей, используя одну и ту же конфигурацию.
  2. Обновите или создайте конфигурацию, изменив или создав производственные варианты.
  3. Установите балансировочные грузы на 50/50.
  4. Проверить работоспособность и производительность.
  5. Постепенно меняю% трафика.

В конце концов, исключите одну или несколько моделей.

Примечание: свойство multi-model for endpoint позволяет управлять несколькими моделями одновременно, память машины управляется автоматически в зависимости от трафика. Такой подход позволяет сэкономить деньги за счет оптимального использования ресурсов.

использованная литература

Https://docs.aws.amazon.com/iot/latest/developerguide/iot-quick-start.html

Https://docs.aws.amazon.com/iot/latest/developerguide/kinesis-firehose-rule-action.html

Https://docs.aws.amazon.com/glue/latest/ug/tutorial-create-job.html

Https://docs.aws.amazon.com/iot/latest/developerguide/topics.html

Https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Synthetics_Canaries.html

Https://mqtt.org/

Https://machinelearningmaster.com/gentle-introduction-concept-drift-machine-learning/

Https://parquet.apache.org/

Выводы

В этой статье мы увидели, как разработать конвейер с использованием ресурсов AWS для приема данных с устройства, подключенного к экосистеме AWS через IoT Core.

Мы также увидели, как эффективно читать и сохранять данные по мере их отправки устройствами с помощью Kinesis Data Firehose, действующего как поток почти в реальном времени, для заполнения и обновления нашего озера данных на AWS S3.

Для выполнения необходимого преобразования данных ETL мы выбрали AWS Glue Studio, показав, насколько легко его можно настроить для создания поискового робота для чтения, преобразования и возврата данных в S3, готовых к использованию для обучения модели.

Затем мы объяснили, почему использование формата паркета лучше, чем простой формат CSV. В частности, мы сосредоточились на повышении производительности по отношению к CSV для импорта / экспорта, операциям запросов Athena и тому, насколько удобнее ценообразование на AWS S3 из-за значительно меньшего размера файла.

SageMaker можно использовать прямо из коробки с его набором предварительно созданных алгоритмов, и, в частности, мы увидели, как реализовать модель прогнозирования на нашем имитируемом наборе данных, состоящем из данных об окружающей среде и загрязнении.

Наконец, мы запускаем модель в производство, используя API-интерфейсы SageMaker для создания конвейера развертывания, который учитывает проблему смещения концепций, что позволяет часто обновлять модель на основе эволюции набора данных во времени. Это особенно верно для моделей временных рядов и прогнозирования, которые становятся все лучше и лучше по мере увеличения набора данных.

Мы подошли к концу путешествия на этой неделе !. Как всегда, не стесняйтесь комментировать и делиться с нами своими мнениями и идеями. А как насчет ваших сценариев использования? Какие устройства вы используете? Свяжитесь с нами и расскажите об этом!

Следите за обновлениями и до встречи через 14 дней на # Proud2BeCloud!