Возможность сохранять исторические данные временных рядов в Azure Data Explorer для обработки временных рядов. Также возможность хранить ADLS gen 2 для долгосрочного хранения, а также для машинного обучения или аналитики глубокого обучения.
Azure IoT Central хранит данные только за 30 дней, а для анализа временных рядов на множестве датчиков нам требуется более 30 дней, а обозреватель данных Azure делает это как облачный архиватор. Также имеет возможность масштабирования при необходимости, а производительность потрясающая. Очень подходит для промышленных случаев использования, и забавная часть заключается в том, что агрегация может выполняться при чтении вместо того, чтобы выполнять ETL для агрегации. Экономит массу времени и значительно ускоряет понимание. Также имеет возможность машинного обучения для данных временных рядов и многое другое.
Я запускал это из:
с 18.03.2020 по 31.07.2020 и собрал данные для приведенного ниже руководства.
Сначала настройте mxchips и подключитесь к Azure Iot Central по этой ссылке: https://docs.microsoft.com/en-us/samples/azure-samples/mxchip-iot-devkit-pnp/sample/
Репозиторий Github: https://github.com/Azure-Samples/mxchip-iot-devkit-pnp
После того, как вы передадите данные в Azure IoT Central и проверите их с помощью панели мониторинга.
Пришло время настроить
- Учетная запись хранения Azure
- Концентратор событий Azure
- Обозреватель данных Azure
Создайте учетную запись хранения BLOB-объектов и создайте контейнер с именем iotcentralTelemetry. На момент написания этого большого двоичного объекта доступны параметры Azure IoT Central для экспорта телеметрии, устройств и шаблона устройства.
При экспорте должно быть 3 папки
iotcentraltelemetry\GUID\telemetry iotcentraltelemetry\GUID\devices iotcentraltelemetry\GUID\deviceTemplates
Создайте пространство имен концентратора событий.
Теперь создайте 3 концентратора событий: один для телеметрии, один для устройств и один для телеметрии устройств.
Я назвал это как
- йоцентральная телеметрия
- iotcentraldevices
- iotcentraldevicetemplates
Оставьте раздел по умолчанию 2 и выберите все значения по умолчанию для тестирования.
Теперь создайте новую базу данных, если это необходимо, или используйте существующую базу данных. Наша цель — ввести только данные телеметрии в ADX.
Теперь пришло время создать таблицу, сопоставление таблиц
// Create table command //////////////////////////////////////////////////////////// .create table ['iotcentraltelemetry'] (['humidity']:real, ['temperature']:real, ['pressure']:real, ['magnetometer_x']:int, ['magnetometer_y']:int, ['magnetometer_z']:int, ['gyroscope_x']:int, ['gyroscope_y']:int, ['gyroscope_z']:int, ['accelerometer_x']:int, ['accelerometer_y']:int, ['accelerometer_z']:int, ['EventProcessedUtcTime']:datetime, ['PartitionId']:int, ['EventEnqueuedUtcTime']:datetime)
// Create mapping command //////////////////////////////////////////////////////////// .create table ['iotcentraltelemetry'] ingestion json mapping 'iotcentraltelemetry_mapping' '[{"column":"humidity","path":"$.humidity","datatype":"real"},{"column":"temperature","path":"$.temperature","datatype":"real"},{"column":"pressure","path":"$.pressure","datatype":"real"},{"column":"magnetometer_x","path":"$.magnetometer.x","datatype":"int"},{"column":"magnetometer_y","path":"$.magnetometer.y","datatype":"int"},{"column":"magnetometer_z","path":"$.magnetometer.z","datatype":"int"},{"column":"gyroscope_x","path":"$.gyroscope.x","datatype":"int"},{"column":"gyroscope_y","path":"$.gyroscope.y","datatype":"int"},{"column":"gyroscope_z","path":"$.gyroscope.z","datatype":"int"},{"column":"accelerometer_x","path":"$.accelerometer.x","datatype":"int"},{"column":"accelerometer_y","path":"$.accelerometer.y","datatype":"int"},{"column":"accelerometer_z","path":"$.accelerometer.z","datatype":"int"},{"column":"EventProcessedUtcTime","path":"$.EventProcessedUtcTime","datatype":"datetime"},{"column":"PartitionId","path":"$.PartitionId","datatype":"int"},{"column":"EventEnqueuedUtcTime","path":"$.EventEnqueuedUtcTime","datatype":"datetime"}]'
Имя таблицы: iotcentraltelemetry
Имя сопоставления таблиц: iotcentraltelemetry_mapping
Тип сопоставления JSON
Войдите в центральную панель управления IoT
Перейдите в раздел «Экспорт», а затем настройте 2 параметра.
- Экспорт в хранилище BLOB-объектов
- Экспорт в концентратор событий
Для хранилища BLOB-объектов выберите указанный выше контейнер iotcentraltelemetry в качестве выходных данных. Вы можете выбрать все 3 телеметрии, устройства и шаблоны устройств для вывода или выбор за вами.
Данные сохраняются каждую минуту в виде файла формата JSON.
Для концентратора событий Выберите только телеметрию и выберите iotcentraltelemetry в качестве имени концентратора событий. Пространство имен концентратора событий отображается в раскрывающемся списке.
Нажмите «Создать» и подождите, пока он запустится.
Теперь перейдите в обозреватель данных Azure.
Выберите базу данных, которую мы будем использовать. В левом меню у нас есть опция для приема данных.
Выберите прием данных, создайте новое подключение для приема и выберите концентратор событий в качестве iotcentraltelemetry.
Для имени таблицы ADX используйте: iotcentraltelemetry
Для имени сопоставления: iotcentraltelemetry_mapping
Для использования типа данных: JSON
Подтвердите и создайте соединение и дождитесь загрузки данных.
теперь вы можете запустить запрос для проверки и посмотреть, загружены ли данные
iotcentraltelemetry | limit 200
Давайте сделаем некоторые графики
iotcentraltelemetry
| extend ingesttime = ingestion_time()
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z
| summarize avgHumidity=avg(humidity) ,avgPressure=avg(pressure), avgTemperature=avg(temperature) by bin(ingesttime, 1m)
| render timechart
Теперь давайте нанесем на карту только 2 точки датчика
iotcentraltelemetry
| extend ingesttime = ingestion_time()
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 1m)
| render timechart
вышеуказанный запрос агрегируется каждые 1 минуту
Теперь давайте агрегировать данные каждые 15 минут.
iotcentraltelemetry
| extend ingesttime = ingestion_time()
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 15m)
| render timechart
Эти запросы являются образцом для выполнения ETL при чтении, а не для агрегирования и хранения.
Почасовой агрегат
iotcentraltelemetry
| extend ingesttime = ingestion_time()
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 1h)
| render timechart
Среднесуточные значения
iotcentraltelemetry
| extend ingesttime = ingestion_time()
| project ingesttime,humidity,temperature,pressure, accelerometer_x,accelerometer_y,accelerometer_z
| summarize avgHumidity=avg(humidity) ,avgTemperature=avg(temperature) by bin(ingesttime, 1d)
| render timechart
Обнаружение анамолии
let min_t = datetime(2020-03-18);
let max_t = datetime(2020-03-19 22:00);
let dt = 1h;
iotcentraltelemetry
| extend ingesttime = ingestion_time()
| make-series temperature=avg(temperature) on ingesttime from min_t to max_t step dt
| extend (anomalies, score, baseline) = series_decompose_anomalies(temperature, 1.5, -1, 'linefit')
| render anomalychart with(anomalycolumns=anomalies, title='Temp, anomalies')
Прогнозирование временных рядов
let min_t = datetime(2020-03-18);
let max_t = datetime(2020-03-19 22:00);
let dt = 1h;
let horizon=7d;
iotcentraltelemetry
| extend ingesttime = ingestion_time()
| make-series temperature=avg(temperature) on ingesttime from min_t to max_t step dt
| extend forecast = series_decompose_forecast(temperature, toint(horizon/dt))
| render timechart with(title='Temp, forecasting the next week by Time Series Decomposition')
Машинное обучение
Образец кластеризации
let min_t = toscalar(iotcentraltelemetry | extend ingesttime = ingestion_time() | summarize min(ingesttime)); let max_t = toscalar(iotcentraltelemetry | extend ingesttime = ingestion_time() | summarize max(ingesttime)); iotcentraltelemetry | extend ingesttime = ingestion_time() | make-series num=count() on ingesttime from min_t to max_t step 10m | render timechart with(title="Temperature over a week, 10 minutes resolution") let min_t=datetime(2020-03-18); iotcentraltelemetry | extend ingesttime = ingestion_time() | make-series num=count() on ingesttime from min_t to min_t+24h step 1m | render timechart with(title="Zoom on the 2nd spike, 1 minute resolution")
Автокластер
let min_peak_t=datetime(2020-03-18); let max_peak_t=datetime(2020-03-19 22:00); iotcentraltelemetry | extend ingesttime = ingestion_time() | where ingesttime between(min_peak_t..max_peak_t) | evaluate autocluster() let min_peak_t=datetime(2020-03-18); let max_peak_t=datetime(2020-03-19 22:00); iotcentraltelemetry | extend ingesttime = ingestion_time() | where ingesttime between(min_peak_t..max_peak_t) | evaluate basket()
Конец запросов машинного обучения
Запуск прогнозирования по различным датчикам, таким как температура, влажность и давление
iotcentraltelemetry | extend ingestionTime = ingestion_time() | where ingestionTime > ago(5h) | make-series AvgTempWithDefault=avg(temperature) default=real(null) on ingestionTime from ago(5h) to now() step 1m | extend NoGapsTemp=series_fill_linear(AvgTempWithDefault) | project ingestionTime, NoGapsTemp | render timechart // What will be the temperature for next 15 minutes? iotcentraltelemetry | extend ingestionTime = ingestion_time() | where ingestionTime > ago(5h) | make-series AvgTemp=avg(temperature) default=real(null) on ingestionTime from ago(5h) to now()+120m step 1m | extend NoGapsTemp=series_fill_linear(AvgTemp) | project ingestionTime, NoGapsTemp | extend forecast = series_decompose_forecast(NoGapsTemp, 15) | render timechart with(title='Forecasting for next 15 min by Time Series Decomposition') // What will be the temperature for next 15 minutes? iotcentraltelemetry | extend ingestionTime = ingestion_time() | where ingestionTime > ago(5h) | make-series AvgTemp=avg(humidity) default=real(null) on ingestionTime from ago(5h) to now()+15m step 1m | extend NoGapsTemp=series_fill_linear(AvgTemp) | project ingestionTime, NoGapsTemp | extend forecast = series_decompose_forecast(NoGapsTemp, 15) | render timechart with(title='Forecasting for next 15 min by Time Series Decomposition') // What will be the temperature for next 15 minutes? iotcentraltelemetry | extend ingestionTime = ingestion_time() | where ingestionTime > ago(5h) | make-series AvgTemp=avg(pressure) default=real(null) on ingestionTime from ago(5h) to now()+120m step 1m | extend NoGapsTemp=series_fill_linear(AvgTemp) | project ingestionTime, NoGapsTemp | extend forecast = series_decompose_forecast(NoGapsTemp, 15) | render timechart with(title='Forecasting for next 15 min by Time Series Decomposition')
Вот вывод для всех собранных данных
Удачи и получайте удовольствие, экспериментируя с mxchips и другими устройствами IoT.
Первоначально опубликовано на https://github.com.