Возможность сохранять исторические данные временных рядов в Azure Data Explorer для обработки временных рядов. Также возможность хранить ADLS gen 2 для долгосрочного хранения, а также для машинного обучения или аналитики глубокого обучения.

Azure IoT Central хранит данные только за 30 дней, а для анализа временных рядов на множестве датчиков нам требуется более 30 дней, а обозреватель данных Azure делает это как облачный архиватор. Также имеет возможность масштабирования при необходимости, а производительность потрясающая. Очень подходит для промышленных случаев использования, и забавная часть заключается в том, что агрегация может выполняться при чтении вместо того, чтобы выполнять ETL для агрегации. Экономит массу времени и значительно ускоряет понимание. Также имеет возможность машинного обучения для данных временных рядов и многое другое.

Я запускал это из:

с 18.03.2020 по 31.07.2020 и собрал данные для приведенного ниже руководства.

Сначала настройте mxchips и подключитесь к Azure Iot Central по этой ссылке: https://docs.microsoft.com/en-us/samples/azure-samples/mxchip-iot-devkit-pnp/sample/

Репозиторий Github: https://github.com/Azure-Samples/mxchip-iot-devkit-pnp

После того, как вы передадите данные в Azure IoT Central и проверите их с помощью панели мониторинга.

Пришло время настроить

  • Учетная запись хранения Azure
  • Концентратор событий Azure
  • Обозреватель данных Azure

Создайте учетную запись хранения BLOB-объектов и создайте контейнер с именем iotcentralTelemetry. На момент написания этого большого двоичного объекта доступны параметры Azure IoT Central для экспорта телеметрии, устройств и шаблона устройства.

При экспорте должно быть 3 папки

iotcentraltelemetry\GUID\telemetry iotcentraltelemetry\GUID\devices iotcentraltelemetry\GUID\deviceTemplates

Создайте пространство имен концентратора событий.

Теперь создайте 3 концентратора событий: один для телеметрии, один для устройств и один для телеметрии устройств.

Я назвал это как

  • йоцентральная телеметрия
  • iotcentraldevices
  • iotcentraldevicetemplates

Оставьте раздел по умолчанию 2 и выберите все значения по умолчанию для тестирования.

Теперь создайте новую базу данных, если это необходимо, или используйте существующую базу данных. Наша цель — ввести только данные телеметрии в ADX.

Теперь пришло время создать таблицу, сопоставление таблиц

// Create table command //////////////////////////////////////////////////////////// .create table ['iotcentraltelemetry'] (['humidity']:real, ['temperature']:real, ['pressure']:real, ['magnetometer_x']:int, ['magnetometer_y']:int, ['magnetometer_z']:int, ['gyroscope_x']:int, ['gyroscope_y']:int, ['gyroscope_z']:int, ['accelerometer_x']:int, ['accelerometer_y']:int, ['accelerometer_z']:int, ['EventProcessedUtcTime']:datetime, ['PartitionId']:int, ['EventEnqueuedUtcTime']:datetime) 
// Create mapping command //////////////////////////////////////////////////////////// .create table ['iotcentraltelemetry'] ingestion json mapping 'iotcentraltelemetry_mapping' '[{"column":"humidity","path":"$.humidity","datatype":"real"},{"column":"temperature","path":"$.temperature","datatype":"real"},{"column":"pressure","path":"$.pressure","datatype":"real"},{"column":"magnetometer_x","path":"$.magnetometer.x","datatype":"int"},{"column":"magnetometer_y","path":"$.magnetometer.y","datatype":"int"},{"column":"magnetometer_z","path":"$.magnetometer.z","datatype":"int"},{"column":"gyroscope_x","path":"$.gyroscope.x","datatype":"int"},{"column":"gyroscope_y","path":"$.gyroscope.y","datatype":"int"},{"column":"gyroscope_z","path":"$.gyroscope.z","datatype":"int"},{"column":"accelerometer_x","path":"$.accelerometer.x","datatype":"int"},{"column":"accelerometer_y","path":"$.accelerometer.y","datatype":"int"},{"column":"accelerometer_z","path":"$.accelerometer.z","datatype":"int"},{"column":"EventProcessedUtcTime","path":"$.EventProcessedUtcTime","datatype":"datetime"},{"column":"PartitionId","path":"$.PartitionId","datatype":"int"},{"column":"EventEnqueuedUtcTime","path":"$.EventEnqueuedUtcTime","datatype":"datetime"}]'

Имя таблицы: iotcentraltelemetry

Имя сопоставления таблиц: iotcentraltelemetry_mapping

Тип сопоставления JSON

Войдите в центральную панель управления IoT

Перейдите в раздел «Экспорт», а затем настройте 2 параметра.

  • Экспорт в хранилище BLOB-объектов
  • Экспорт в концентратор событий

Для хранилища BLOB-объектов выберите указанный выше контейнер iotcentraltelemetry в качестве выходных данных. Вы можете выбрать все 3 телеметрии, устройства и шаблоны устройств для вывода или выбор за вами.

Данные сохраняются каждую минуту в виде файла формата JSON.

Для концентратора событий Выберите только телеметрию и выберите iotcentraltelemetry в качестве имени концентратора событий. Пространство имен концентратора событий отображается в раскрывающемся списке.

Нажмите «Создать» и подождите, пока он запустится.

Теперь перейдите в обозреватель данных Azure.

Выберите базу данных, которую мы будем использовать. В левом меню у нас есть опция для приема данных.

Выберите прием данных, создайте новое подключение для приема и выберите концентратор событий в качестве iotcentraltelemetry.

Для имени таблицы ADX используйте: iotcentraltelemetry

Для имени сопоставления: iotcentraltelemetry_mapping

Для использования типа данных: JSON

Подтвердите и создайте соединение и дождитесь загрузки данных.

теперь вы можете запустить запрос для проверки и посмотреть, загружены ли данные

iotcentraltelemetry | limit 200

Давайте сделаем некоторые графики

iotcentraltelemetry 
| extend ingesttime = ingestion_time() 
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z 
| summarize avgHumidity=avg(humidity) ,avgPressure=avg(pressure), avgTemperature=avg(temperature) by bin(ingesttime, 1m) 
| render timechart

Теперь давайте нанесем на карту только 2 точки датчика

iotcentraltelemetry 
| extend ingesttime = ingestion_time() 
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z 
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 1m) 
| render timechart

вышеуказанный запрос агрегируется каждые 1 минуту

Теперь давайте агрегировать данные каждые 15 минут.

iotcentraltelemetry 
| extend ingesttime = ingestion_time() 
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z 
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 15m) 
| render timechart

Эти запросы являются образцом для выполнения ETL при чтении, а не для агрегирования и хранения.

Почасовой агрегат

iotcentraltelemetry 
| extend ingesttime = ingestion_time() 
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z 
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 1h) 
| render timechart

Среднесуточные значения

iotcentraltelemetry 
| extend ingesttime = ingestion_time() 
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z 
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 1d) 
| render timechart

Обнаружение анамолии

let min_t = datetime(2020-03-18); 
let max_t = datetime(2020-03-19 22:00); 
let dt = 1h; 
iotcentraltelemetry 
| extend ingesttime = ingestion_time() 
| make-series temperature=avg(temperature) on ingesttime from min_t to max_t step dt 
| extend (anomalies, score, baseline) = series_decompose_anomalies(temperature, 1.5, -1, 'linefit') 
| render anomalychart with(anomalycolumns=anomalies, title='Temp, anomalies')

Прогнозирование временных рядов

let min_t = datetime(2020-03-18); 
let max_t = datetime(2020-03-19 22:00); 
let dt = 1h; 
let horizon=7d; 
iotcentraltelemetry 
| extend ingesttime = ingestion_time() 
| make-series temperature=avg(temperature) on ingesttime from min_t to max_t step dt 
| extend forecast = series_decompose_forecast(temperature, toint(horizon/dt)) 
| render timechart with(title='Temp, forecasting the next week by Time Series Decomposition')

Машинное обучение

Образец кластеризации

let min_t = toscalar(iotcentraltelemetry | extend ingesttime = ingestion_time()  | summarize min(ingesttime));  
let max_t = toscalar(iotcentraltelemetry  | extend ingesttime = ingestion_time() | summarize max(ingesttime));  
iotcentraltelemetry
| extend ingesttime = ingestion_time() 
| make-series num=count() on ingesttime from min_t to max_t step 10m
| render timechart with(title="Temperature over a week, 10 minutes resolution")
let min_t=datetime(2020-03-18);
iotcentraltelemetry
| extend ingesttime = ingestion_time() 
| make-series num=count() on ingesttime from min_t to min_t+24h step 1m
| render timechart with(title="Zoom on the 2nd spike, 1 minute resolution")

Автокластер

let min_peak_t=datetime(2020-03-18);
let max_peak_t=datetime(2020-03-19 22:00);
iotcentraltelemetry
| extend ingesttime = ingestion_time() 
| where ingesttime between(min_peak_t..max_peak_t)
| evaluate autocluster()
let min_peak_t=datetime(2020-03-18);
let max_peak_t=datetime(2020-03-19 22:00);
iotcentraltelemetry
| extend ingesttime = ingestion_time() 
| where ingesttime between(min_peak_t..max_peak_t)
| evaluate basket()

Конец запросов машинного обучения

Запуск прогнозирования по различным датчикам, таким как температура, влажность и давление

iotcentraltelemetry
| extend ingestionTime = ingestion_time()
| where ingestionTime > ago(5h)
| make-series AvgTempWithDefault=avg(temperature) default=real(null) on ingestionTime from ago(5h) to now() step 1m
| extend NoGapsTemp=series_fill_linear(AvgTempWithDefault)
| project ingestionTime, NoGapsTemp
| render timechart
// What will be the temperature for next 15 minutes?
iotcentraltelemetry
| extend ingestionTime = ingestion_time()
| where ingestionTime > ago(5h)
| make-series AvgTemp=avg(temperature) default=real(null) on ingestionTime from ago(5h) to now()+120m step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project ingestionTime, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 15)
| render timechart with(title='Forecasting for next 15 min by Time Series Decomposition')
// What will be the temperature for next 15 minutes?
iotcentraltelemetry
| extend ingestionTime = ingestion_time()
| where ingestionTime > ago(5h)
| make-series AvgTemp=avg(humidity) default=real(null) on ingestionTime from ago(5h) to now()+15m step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project ingestionTime, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 15)
| render timechart with(title='Forecasting for next 15 min by Time Series Decomposition')
// What will be the temperature for next 15 minutes?
iotcentraltelemetry
| extend ingestionTime = ingestion_time()
| where ingestionTime > ago(5h)
| make-series AvgTemp=avg(pressure) default=real(null) on ingestionTime from ago(5h) to now()+120m step 1m
| extend NoGapsTemp=series_fill_linear(AvgTemp)
| project ingestionTime, NoGapsTemp
| extend forecast = series_decompose_forecast(NoGapsTemp, 15)
| render timechart with(title='Forecasting for next 15 min by Time Series Decomposition')

Вот вывод для всех собранных данных

Удачи и получайте удовольствие, экспериментируя с mxchips и другими устройствами IoT.

Первоначально опубликовано на https://github.com.