Сбор данных датчиков IoT с помощью сериализованного двоичного формата Google Protocol Buffers по HTTPS, бессерверных облачных функций Google, Google Cloud Pub / Sub и MongoDB Atlas на GCP в качестве альтернативы интегрированным платформам Cloud IoT и стандартным протоколам IoT. Объединяйте, анализируйте и создавайте модели машинного обучения на основе данных с помощью таких инструментов, как MongoDB Compass, Jupyter Notebooks и Google's AI Platform Notebooks.

Вступление

Большинство доминирующих облачных провайдеров предлагают интегрированные услуги IoT (Интернет вещей) и IIotT (Industrial IoT). У Amazon есть AWS IoT, у Microsoft Azure есть несколько предложений, включая IoT Central, предложение IBM, включая IBM Watson IoT Platform, у Alibaba Cloud есть несколько решений IoT / IIoT для различных вертикальных рынков, а Google предлагает Google Cloud IoT " Платформа. Все эти решения продаются как высокопроизводительные и масштабируемые технологические стеки промышленного уровня. Они могут масштабироваться до десятков тысяч или более устройств IoT и огромного количества потоковой телеметрии.

На самом деле не всем нужно полностью интегрированное IoT-решение. Академические учреждения, исследовательские лаборатории, технологические стартапы и многие коммерческие предприятия хотят использовать облако для приложений IoT, но могут быть не готовы к полностью интегрированной платформе IoT или сопротивляться привязке к платформе поставщика облака.

Точно так же, в зависимости от требований к производительности и типа приложения, организации могут не нуждаться или захотят начать использовать стандартные отраслевые протоколы данных и транспортных протоколов IoT / IIOT, такие как MQTT (передача телеметрии очереди сообщений) или CoAP (приложение с ограничениями). Протокол), по UDP (протокол дейтаграмм пользователя). Они могут предпочесть передавать телеметрию через HTTP с использованием TCP или безопасно, используя HTTPS (HTTP через TLS).

Демонстрация

В этой демонстрации мы будем собирать данные датчиков окружающей среды от ряда датчиков устройств IoT и передавать эти телеметрические данные через Интернет в Google Cloud. Каждое устройство IoT устанавливается в отдельном физическом месте. Устройства содержат множество стандартных датчиков, в том числе влажности и температуры, движения и интенсивности света.

Мы будем передавать данные телеметрии сенсора в виде JSON через HTTP на бессерверные HTTPS-конечные точки Google Cloud Function. Затем мы переключимся на использование буферов протокола Google для передачи двоичных данных по HTTP. Мы должны наблюдать уменьшение размера сообщения, содержащегося в полезной нагрузке запроса, по мере перехода от JSON к Protobuf, что должно уменьшить задержку системы и стоимость.

Данные, полученные Cloud Functions по HTTP, будут публиковаться асинхронно в Google Cloud Pub / Sub. Вторая облачная функция будет реагировать на все опубликованные события и отправлять сообщения в MongoDB Atlas на GCP. Оказавшись в Atlas, мы будем агрегировать, преобразовывать, анализировать и строить модели машинного обучения на основе данных, используя такие инструменты, как MongoDB Compass, Jupyter Notebooks и Google AI Platform Notebooks.

Для этой демонстрации архитектура JSON over HTTP будет выглядеть следующим образом. Все датчики будут передавать данные в одну конечную точку HTTPS Cloud Function.

Для Protobuf через HTTP в демонстрации архитектура будет выглядеть следующим образом. Каждый тип датчика будет передавать данные на разные конечные точки HTTPS облачной функции.

Хотя облачные функции автоматически масштабируются по горизонтали, чтобы приспособиться к дополнительной нагрузке, создаваемой объемом получаемой телеметрии, существуют также другие варианты масштабирования системы. Например, мы могли бы создать отдельные конвейеры функций и темы / подписки для каждого типа датчика. Мы также могли бы разделить данные телеметрии по нескольким коллекциям атласа MongoDB в зависимости от типа датчика вместо одной коллекции. Во всех случаях мы по-прежнему получим выгоду от возможностей горизонтального масштабирования облачной функции.

Исходный код

Весь исходный код доступен на GitHub. Используйте следующую команду для клонирования проекта.

git clone \
  --branch master --single-branch --depth 1 --no-tags \
  https://github.com/garystafford/iot-protobuf-demo.git

Вам нужно будет настроить переменные среды проекта, чтобы они соответствовали вашей собственной среде разработки и облачной среде. Весь исходный код этого поста написан на Python. Он предназначен для интерпретаторов Python 3, но был протестирован с использованием интерпретаторов Python 2. Блокноты Jupyter проекта можно просматривать в рамках проекта на GitHub или с помощью бесплатного онлайн-просмотра Jupyter nbviewer.

Технологии

Буферы протокола

Согласно Google, Protocol Buffers (aka Protobuf) - это не зависящий от языка и платформы, эффективный, расширяемый, автоматизированный механизм для сериализации структурированных данных для использования в протоколах связи, хранения данных и т. Д. Буферы протокола в 3-10 раз меньше и в 20-100 раз быстрее, чем XML.

Каждое сообщение буфера протокола - это небольшая логическая запись информации, содержащая серию строго типизированных пар имя-значение. После того, как вы определили свои сообщения, вы запускаете компилятор буфера протокола для языка вашего приложения в вашем .proto файле, чтобы сгенерировать классы доступа к данным.

Облачные функции Google

Согласно Google, Cloud Functions - это бессерверная вычислительная платформа Google, управляемая событиями. Ключевые особенности облачных функций включают автоматическое масштабирование, высокую доступность, отказоустойчивость,
отсутствие серверов для предоставления, управления, исправления или обновления,
оплата производится только во время работы вашего кода, и они легко подключаются и расширяют другие облачные сервисы. Облачные функции изначально поддерживают несколько типов событий, включая HTTP, Cloud Pub / Sub, Cloud Storage и Firebase. Текущая языковая поддержка включает Python, Go и Node.

Google Cloud Pub / Sub

Согласно Google, Cloud Pub / Sub - это межплатформенное программное обеспечение для облака, ориентированное на работу с сообщениями. Это масштабируемая и надежная система приема и доставки событий. Предоставляя асинхронный обмен сообщениями "многие ко многим", который разделяет отправителей и получателей, он обеспечивает безопасную и высокодоступную связь между независимыми приложениями. Cloud Pub / Sub обеспечивает надежный обмен сообщениями с малой задержкой, который интегрируется с системами, размещенными на платформе Google Cloud Platform и за ее пределами.

Атлас MongoDB

MongoDB Atlas - это полностью управляемая MongoDB-as-a-Service, доступная на AWS, Azure и GCP. Atlas, зрелый продукт SaaS, предлагает высокую доступность, соглашения об уровне обслуживания, эластичную масштабируемость, межрегиональную репликацию, безопасность корпоративного уровня, интеграцию LDAP, коннектор BI и многое другое.

В настоящее время MongoDB Atlas предлагает четыре тарифных плана: бесплатный, базовый, профессиональный и корпоративный. Планы варьируются от самого маленького бесплатного кластера MongoDB размером M0 с общей оперативной памятью и хранилищем 512 МБ до массивного кластера M400 MongoDB с 488 ГБ ОЗУ и 3 ТБ хранилища.

Экономическая эффективность облачных функций

При истинном масштабе IIoT облачные функции Google могут быть не самым эффективным или рентабельным методом приема данных телеметрии. Согласно модели ценообразования Google, вы получаете два миллиона бесплатных вызовов функций в месяц, причем каждый дополнительный миллион вызовов стоит 0,40 доллара США. Общая стоимость также включает использование памяти, общее время вычислений и исходящую передачу данных. Если ваша система состоит из десятков или сотен устройств IoT, облачные функции могут оказаться рентабельными.

Однако с тысячами или более устройств, каждое из которых передает данные несколько раз в минуту, вы можете быстро превзойти рентабельность функций Google. В этом случае вы можете обратиться к платформе Google Cloud IoT. В качестве альтернативы вы можете создать свою собственную платформу с продуктами Google, такими как Knative, что позволит вам запускать свои контейнеры либо полностью управляемыми с помощью недавно выпущенного Cloud Run, либо в кластере Google Kubernetes Engine с Cloud Run на GKE.

Сценарии датчиков

Для каждого типа датчика я разработал отдельные сценарии Python, которые запускаются на каждом устройстве IoT. Существует две версии каждого скрипта: одна для JSON через HTTP и одна для Protobuf через HTTP.

JSON через HTTPS

Ниже мы видим скрипт dht_sensor_http_json.py, используемый для передачи данных о влажности и температуре через JSON через HTTP в облачную функцию Google, работающую на GCP. Полезные данные запроса JSON содержат метку времени, идентификатор устройства IoT, тип устройства и показания датчиков температуры и влажности. URL-адрес для функции Google Cloud хранится как переменная среды, локальная для устройств IoT и устанавливается при развертывании скрипта.

import json
import logging
import os
import socket
import sys
import time
import Adafruit_DHT
import requests
URL = os.environ.get('GCF_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15

def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())

def get_sensor_data(device_id):
    while True:
        humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)
        payload = {'device': device_id,
                   'type': TYPE,
                   'timestamp': time.time(),
                   'data': {'temperature': temperature,
                            'humidity': humidity}}
        post_data(payload)
        time.sleep(FREQUENCY)

def post_data(payload):
    payload = json.dumps(payload)
    headers = {
        'Content-Type': 'application/json; charset=utf-8',
        'Authorization': JWT
    }
    try:
        requests.post(URL, json=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')

if __name__ == '__main__':
    sys.exit(main())

Частота телеметрии

Хотя датчики способны производить данные много раз в минуту, для этой демонстрации телеметрия датчиков намеренно ограничена передачей только каждые 15 секунд. На мой взгляд, чтобы уменьшить сложность системы, потенциальную задержку, обратное давление и стоимость, вам следует производить данные телеметрии только с той частотой, которую диктуют ваши требования.

Веб-токены JSON

В целях безопасности в дополнение к конечным точкам HTTPS, предоставляемым Google Cloud Functions, я включил использование JSON Web Token (JWT). Веб-токены JSON - это открытый отраслевой стандарт RFC 7519 для безопасного представления претензий между двумя сторонами. В этом случае JWT используется для проверки подлинности сценариев датчиков, отправляющих телеметрию в облачные функции. JWT содержит идентификатор, пароль и срок действия, все зашифрованные секретным ключом, известным каждой облачной функции, для проверки личности устройства IoT. Без правильной передачи JWT в заголовке авторизации запрос к облачной функции завершится ошибкой с кодом состояния HTTP 401 Unauthorized. Ниже приведен пример данных полезной нагрузки JWT.

{
  "sub": "IoT Protobuf Serverless Demo",
  "id": "iot-demo-key",
  "password": "t7J2gaQHCFcxMD6584XEpXyzWhZwRrNJ",
  "iat": 1557407124,
  "exp": 1564664724
}

Для этой демонстрации я создал временный JWT с помощью jwt.io. Функции HTTP используют PyJWT, библиотеку Python, которая позволяет кодировать и декодировать JWT. Библиотека PyJWT позволяет функции декодировать и проверять JWT (токен носителя) из заголовка авторизации входящего запроса. Токен JWT хранится как переменная среды. Инструкции по развертыванию включены в проект GitHub.

Полезная нагрузка JSON

Ниже приведена типичная полезная нагрузка запроса JSON (красиво напечатанная), содержащая данные датчика DHT. Это конкретное сообщение имеет размер 148 байт. Формат сообщения намеренно удобен для чтения. Конечно, мы могли бы сократить ключевые поля сообщения, чтобы уменьшить размер полезной нагрузки еще на 15–20 байт.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Буферы протокола

Для демонстрации я создал файл протокольных буферов sensors.proto для поддержки вывода данных тремя типами датчиков: цифровым датчиком влажности и температуры (DHT), пассивным инфракрасным датчиком (PIR) и цифровым датчиком интенсивности света (DLI). Я использую более новую proto3 версию языка буферов протокола. Я создал общую схему сообщения датчика Protobuf с переменной телеметрией датчика, хранящейся во вложенном объекте dataobject внутри каждого типа сообщения.

Буферы протокола в 3-10 раз меньше и в 20-100 раз быстрее, чем XML.

Важно использовать правильный Protobuf Scalar Value Type, чтобы поддерживать числовую точность на языке, для которого вы компилируете. Для простоты я использую double для представления метки времени, а также числовых значений влажности и температуры. В качестве альтернативы вы можете выбрать Google Protobuf WellKnownTypes, Timestamp, чтобы сохранить временную метку.

syntax = "proto3";
package sensors;
// DHT22
message SensorDHT {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDHT data = 4;
}
message DataDHT {
    double temperature = 1;
    double humidity = 2;
}
// Onyehn_PIR
message SensorPIR {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataPIR data = 4;
}
message DataPIR {
    bool motion = 1;
}
// Anmbest_MD46N
message SensorDLI {
    string device = 1;
    string type = 2;
    double timestamp = 3;
    DataDLI data = 4;
}
message DataDLI {
    bool light = 1;
}

Поскольку данные датчика будут захвачены с помощью сценариев, написанных на Python 3, файл протокольных буферов компилируется для Python, в результате чего получается файл sensors_pb2.py.

protoc --python_out=. sensors.proto

Буферы протокола через HTTPS

Ниже мы видим альтернативный сценарий датчика DHT, dht_sensor_http_pb.py, который передает полезные данные двоичного запроса на основе протокольных буферов через HTTPS в облачную функцию Google, работающую на GCP. Обратите внимание, что заголовок Content-Type запроса был изменен с application/json на application/x-protobuf. В этом случае вместо JSON те же поля данных хранятся в экземпляре типа сообщения SensorDHT (sensors_pb2.SensorDHT()) Protobuf. Обратите внимание на выражение import sensors_pb2. Этот оператор импортирует скомпилированный файл буферов протокола, который хранится локально в скрипте на устройстве IoT.

import logging
import os
import socket
import sys
import time
import Adafruit_DHT
import requests
import sensors_pb2
URL = os.environ.get('GCF_DHT_URL')
JWT = os.environ.get('JWT')
SENSOR = Adafruit_DHT.DHT22
TYPE = 'DHT22'
PIN = 18
FREQUENCY = 15

def main():
    if not URL or not JWT:
        sys.exit("Are the Environment Variables set?")
    get_sensor_data(socket.gethostname())

def get_sensor_data(device_id):
    while True:
        try:
            humidity, temperature = Adafruit_DHT.read_retry(SENSOR, PIN)
            sensor_dht = sensors_pb2.SensorDHT()
            sensor_dht.device = device_id
            sensor_dht.type = TYPE
            sensor_dht.timestamp = time.time()
            sensor_dht.data.temperature = temperature
            sensor_dht.data.humidity = humidity
            payload = sensor_dht.SerializeToString()
            post_data(payload)
            time.sleep(FREQUENCY)
        except TypeError:
            logging.error('Error getting sensor data!')

def post_data(payload):
    headers = {
        'Content-Type': 'application/x-protobuf',
        'Authorization': JWT
    }
    try:
        requests.post(URL, data=payload, headers=headers)
    except requests.exceptions.ConnectionError:
        logging.error('Error posting data to Cloud Function!')
    except requests.exceptions.MissingSchema:
        logging.error('Error posting data to Cloud Function! Are Environment Variables set?')

if __name__ == '__main__':
    sys.exit(main())

Двоичная полезная нагрузка Protobuf

Чтобы понять полезную нагрузку на основе буферов двоичного протокола, мы можем записать образец сообщения SensorDHT в файл на диске в виде массива байтов.

message = sensorDHT.SerializeToString()
binary_file_output = open("./data_binary.txt", "wb")
file_byte_array = bytearray(message)
binary_file_output.write(file_byte_array)

Затем, используя команду hexdump, мы можем просмотреть представление файла двоичных данных.

> hexdump -C data_binary.txt
00000000  0a 08 38 32 39 63 37 65  30 65 12 05 44 48 54 32  |..829c7e0e..DHT2|
00000010  32 1d 05 a0 b9 4e 22 0a  0d ec 51 b2 41 15 cd cc  |2....N"...Q.A...|
00000020  38 42                                             |8B|
00000022

Размер файла двоичных данных составляет 48 байтов на диске по сравнению с эквивалентным размером файла JSON 148 байтов на диске (32% размера). Затем в качестве теста мы могли бы отправить этот файл двоичных данных в качестве полезной нагрузки POST в облачную функцию, как показано ниже, с помощью Postman. Почтальон преобразует содержимое файла двоичных данных в двоичную строку перед передачей.

Точно так же мы можем сериализовать то же самое сообщение SensorDHT на основе двоичных протокольных буферов в двоичную строку, используя метод SerializeToString.

message = sensorDHT.SerializeToString()
print(message)

Результирующая двоичная строка выглядит следующим образом.

b'\n\nrp829c7e0e\x12\x05DHT22\x19c\xee\xbcg\xf5\x8e\xccA"\x12\t\x00\x00\x00\xa0\x99\x191@\x11\x00\x00\x00`f\x06Q@'

Длина двоичной строки сериализованного сообщения и, следовательно, полезная нагрузка запроса, отправленная почтальоном и полученная облачной функцией для этого конкретного сообщения, составляет 111 байтов по сравнению с размером полезной нагрузки JSON в 148 байтов (75% размера).

Проверить полезную нагрузку Protobuf

Чтобы проверить, что данные, содержащиеся в полезной нагрузке Protobuf, идентичны полезной нагрузке JSON, мы можем проанализировать полезную нагрузку из сериализованной двоичной строки с помощью метода Protobuf ParseFromString. Затем мы конвертируем его в JSON с помощью метода Protobuf MessageToJson.

message = sensorDHT.SerializeToString() 
message_parsed = sensors_pb2.SensorDHT()
message_parsed.ParseFromString(message)
print(MessageToJson(message_parsed))

Результирующий объект JSON идентичен полезной нагрузке JSON, отправленной с использованием JSON через HTTPS, ранее в демонстрации.

{
  "device": "rp829c7e0e",
  "type": "DHT22",
  "timestamp": 1557585090.476025,
  "data": {
    "temperature": 17.100000381469727,
    "humidity": 68.0999984741211
  }
}

Облачные функции Google

Существует ряд облачных функций Google, в частности четыре HTTP-функции, которые принимают данные датчиков по HTTP от устройств IoT. Каждая функция предоставляет конечную точку HTTPS. Согласно Google, вы используете HTTP-функции, когда хотите вызвать свою функцию через HTTP (S) запрос. Чтобы обеспечить семантику HTTP, сигнатуры функций HTTP принимают аргументы, специфичные для HTTP.

Ниже я развернул одну функцию, которая принимает телеметрию датчиков JSON от всех типов датчиков, и три функции для Protobuf, по одной для каждого типа датчика: DHT, PIR и DLI.

Обработка сообщений JSON

Ниже мы видим облачную функцию main.py, которая обрабатывает входящие полезные данные JSON через HTTPS от всех типов датчиков. После проверки JWT запроса полезная нагрузка сообщения JSON сериализуется в байтовую строку и отправляется в общую тему Google Cloud Pub / Sub. Обратите внимание, что секретный ключ, идентификатор и пароль JWT, а также Google Cloud Pub / Sub Topic хранятся как переменные среды, локальные для облачных функций. В моих тестах для успешного выполнения HTTP-функций на основе JSON требовалось в среднем 9–18 мс.

import logging
import os
import jwt
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1
TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')

def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})
    request_json = request.get_json()
    if not request_json:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})
    send_message(request_json)
    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})

def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]
    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False

def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, 
                      data=bytes(str(message), 'utf-8'))

Облачные функции развертываются в GCP с помощью команды gcloud functions deploy CLI (я использую Jenkins для автоматизации развертывания). Я заключил команды развертывания в сценарии bash. Сценарий также копирует YAML-файл общих переменных среды, используемый облачной функцией. У каждой функции есть сценарий развертывания, включенный в проект.

# get latest env vars file
cp -f ./../env_vars_file/env.yaml .
# deploy function
gcloud functions deploy http_json_to_pubsub \
  --runtime python37 \
  --trigger-http \
  --region us-central1 \
  --memory 256 \
  --entry-point incoming_message \
  --env-vars-file env.yaml

Используя .gcloudignore файл, команда gcloud functions deploy CLI развертывает три файла: облачную функцию (main.py), требуемый файл пакетов Python (requirements.txt), файл переменных среды (env.yaml). Google автоматически устанавливает зависимости с помощью файла requirements.txt.

Обработка сообщений Protobuf

Ниже мы видим облачную функцию main.py, которая обрабатывает входящие данные Protobuf по HTTPS от типов датчиков DHT. Как только полезная нагрузка сообщения Protobuf с данными датчика получена функцией HTTP, она десериализуется в JSON, а затем сериализуется в байтовую строку. Затем байтовая строка отправляется в Google Cloud Pub / Sub Topic. В моих тестах для успешного выполнения HTTP-функций на основе Protobuf в среднем требовалось 7–14 мс.

Как и прежде, обратите внимание на инструкцию import sensors_pb2. Этот оператор импортирует скомпилированный файл буферов протокола, который хранится локально в скрипте на устройстве IoT. Он используется для синтаксического анализа сериализованного сообщения в исходный SensorDHT тип сообщения Protobuf.

import logging
import os
import jwt
import sensors_pb2
from flask import make_response, jsonify
from flask_api import status
from google.cloud import pubsub_v1
from google.protobuf.json_format import MessageToJson
TOPIC = os.environ.get('TOPIC')
SECRET_KEY = os.getenv('SECRET_KEY')
ID = os.getenv('ID')
PASSWORD = os.getenv('PASSWORD')

def incoming_message(request):
    if not validate_token(request):
        return make_response(jsonify({'success': False}),
                             status.HTTP_401_UNAUTHORIZED,
                             {'ContentType': 'application/json'})
    data = request.get_data()
    if not data:
        return make_response(jsonify({'success': False}),
                             status.HTTP_400_BAD_REQUEST,
                             {'ContentType': 'application/json'})
    sensor_pb = sensors_pb2.SensorDHT()
    sensor_pb.ParseFromString(data)
    sensor_json = MessageToJson(sensor_pb)
    send_message(sensor_json)
    return make_response(jsonify({'success': True}),
                         status.HTTP_201_CREATED,
                         {'ContentType': 'application/json'})

def validate_token(request):
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        return False
    auth_token = auth_header.split(" ")[1]
    if not auth_token:
        return False
    try:
        payload = jwt.decode(auth_token, SECRET_KEY)
        if payload['id'] == ID and payload['password'] == PASSWORD:
            return True
    except jwt.ExpiredSignatureError:
        return False
    except jwt.InvalidTokenError:
        return False

def send_message(message):
    publisher = pubsub_v1.PublisherClient()
    publisher.publish(topic=TOPIC, data=bytes(message, 'utf-8'))

Cloud Pub / Sub Функции

Помимо функций HTTP, демонстрация использует функцию, запускаемую триггерами Google Cloud Pub / Sub. Согласно Google, облачные функции могут запускаться сообщениями, опубликованными в Cloud Pub / Sub Topics в том же проекте GCP, что и функция. Функция автоматически подписывается на тему. Ниже мы видим, что функция автоматически подписалась на iot-data-demo Cloud Pub / Sub Topic.

Отправка телеметрии в MongoDB Atlas

Общая облачная функция, запускаемая сообщениями, опубликованными в Cloud Pub / Sub, затем отправляет сообщения в MongoDB Atlas. Чтобы переформатировать сообщения Cloud Pub / Sub в BSON (двоичный JSON), требуется минимальная очистка. Интересно, что согласно bsonspec.org, BSON можно сравнить с двоичными форматами обмена, такими как Protocol Buffers. BSON в большей степени меньше схемы, чем буферы протокола, что может дать ему преимущество в гибкости, но также и небольшой недостаток в эффективности использования пространства (BSON имеет служебные данные для имен полей в сериализованных данных).

Функция использует PyMongo для подключения к MongoDB Atlas. Согласно их веб-сайту, PyMongo - это дистрибутив Python, содержащий инструменты для работы с MongoDB, и это рекомендуемый способ работы с MongoDB из Python.

import base64
import json
import logging
import os
import pymongo
MONGODB_CONN = os.environ.get('MONGODB_CONN')
MONGODB_DB = os.environ.get('MONGODB_DB')
MONGODB_COL = os.environ.get('MONGODB_COL')

def read_message(event, context):
    message = base64.b64decode(event['data']).decode('utf-8')
    message = message.replace("'", '"')
    message = message.replace('True', 'true')
    message = json.loads(message)
    client = pymongo.MongoClient(MONGODB_CONN)
    db = client[MONGODB_DB]
    col = db[MONGODB_COL]
    col.insert_one(message)

Функция реагирует на опубликованные события и отправляет сообщения в кластер MongoDB Atlas, работающий в том же регионе, us-central1, что и облачные функции и тема Pub / Sub. Ниже мы видим текущие параметры, доступные при инициализации кластера Atlas.

MongoDB Atlas предоставляет богатый веб-интерфейс для управления и мониторинга кластеров, баз данных, коллекций, безопасности и производительности MongoDB.

Хотя время выполнения функций Cloud Pub / Sub в Atlas больше по продолжительности, чем у функций HTTP, задержка значительно сокращается за счет размещения кластера Cloud Pub / Sub для Atlas, облачных функций и кластера MongoDB Atlas в одном и том же регионе GCP. Время выполнения между областями достигало 500–600 мс, в то время как время выполнения в одной области в среднем составляло 200–225 мс. Выбор более производительного кластера Atlas, вероятно, приведет к еще меньшему времени выполнения функции.

Агрегирование данных с помощью MongoDB Compass

MongoDB Compass - бесплатное удобное настольное приложение для взаимодействия с вашими базами данных MongoDB. Вы можете просматривать собранные данные датчиков, просматривать схему сообщения (документа), управлять индексами и строить сложные агрегации MongoDB.

При выполнении аналитики или машинного обучения я в основном использую MongoDB Compass для предварительного просмотра захваченных данных телеметрии и построения конвейеров агрегации. Операции агрегирования обрабатывают записи данных и возвращают вычисленные результаты. Эта функция экономит массу времени, фильтруя и готовя данные для дальнейшего анализа, визуализации и машинного обучения с Jupyter Notebooks.

Конвейеры агрегации могут быть напрямую экспортированы в Java, Node, C # и Python 3. Экспортированный код конвейера агрегации может быть помещен непосредственно в ваши приложения Python и Блокноты Jupyter.

Ниже экспортированный код конвейеров агрегации из MongoDB Compass используется для загрузки набора результатов непосредственно в Pandas DataFrame. Эта конкретная агрегация возвращает данные датчика DHT временного ряда с определенного устройства IoT за 72-часовой период.

DEVICE_1 = 'rp59adf374'
pipeline = [
    {
        '$match': {
            'type': 'DHT22', 
            'device': DEVICE_1, 
            'timestamp': {
                '$gt': 1557619200,
                '$lt': 1557792000
            }
        }
    }, {
        '$project': {
            '_id': 0,
            'timestamp': 1, 
            'temperature': '$data.temperature', 
            'humidity': '$data.humidity'
        }
    }, {
        '$sort': {
            'timestamp': 1
        }
    }
]
aggResult = iot_data.aggregate(pipeline)
df1 = pd.DataFrame(list(aggResult))

Производительность MongoDB Atlas

В этой демонстрации из Jupyter Notebooks на основе Python3 я смог последовательно запросить коллекцию MongoDB Atlas из почти 70 тыс. Документов для получения наборов результатов, содержащих цифровые данные о температуре и влажности за 3 дня (72 часа), примерно 10,2 тыс. Документов в среднем. 825 мс. Это путь от моего локального ноутбука для разработки до MongoDB Atlas, работающего на GCP, в другом географическом регионе.

Время запросов на GCP намного быстрее, например, при запуске Notebook в JupyterLab на AI Platform Google или при выполнении задания PySpark с Cloud Dataproc против Atlas. При запуске того же Jupyter Notebook непосредственно на платформе Google AI для того же запроса MongoDB Atlas потребовалось в среднем 450 мс против 825 мс (в 1,83 раза быстрее). Это было в двух разных регионах GCP; время того же региона должно быть еще быстрее.

Наблюдаемость GCP

Существует несколько вариантов наблюдения за функциями Google Cloud в системе, Google Cloud Pub / Sub и MongoDB Atlas. Как показано выше, интерфейс GCP Cloud Functions позволяет вам видеть выполнение отдельных функций, время выполнения, использование памяти и активные экземпляры в различных временных интервалах.

Чтобы получить более подробное представление о Google Cloud Functions и Google Cloud Pub / Sub, я построил две пользовательские панели мониторинга с помощью Stackdriver. Согласно Google, Stackdriver собирает метрики, журналы и события из инфраструктуры, предоставляя разработчикам и операторам богатый набор наблюдаемых сигналов. Я создал настраиваемую панель инструментов Stackdriver Cloud Functions (показано ниже) и панель управления темами и подписками Cloud Pub / Sub.

Что касается функций, я решил отображать время выполнения, использование памяти, количество выполнений и выход из сети - все на одной стеклянной панели с использованием четырех графиков. Ниже я использую средний 95-й процентиль для мониторинга. 95-й процентиль утверждает, что в 95% случаев наблюдаемые значения ниже этого количества, а в оставшихся 5% случаев наблюдаемые значения выше этого количества.

Анализ данных с использованием Jupyter Notebooks

Согласно jupyter.org, Jupyter Notebook - это веб-приложение с открытым исходным кодом, которое позволяет создавать и обмениваться документами, содержащими живой код, уравнения, визуализации и повествовательный текст. Использование включает очистку и преобразование данных, численное моделирование, статистическое моделирование, визуализацию данных, машинное обучение и многое другое. Широкое использование Jupyter Notebooks значительно выросло, поскольку большие данные, искусственный интеллект и машинное обучение пережили взрывной рост.

PyCharm

JetBrains PyCharm, моя любимая среда разработки Python, имеет прямую интеграцию с Jupyter Notebooks. Фактически, последние обновления PyCharm для Professional Edition значительно улучшили эту интеграцию. PyCharm предлагает двустороннее редактирование в среде IDE и в интерфейсе веб-браузера Jupyter Notebook. PyCharm позволяет запускать и отлаживать отдельные ячейки в записной книжке. PyCharm автоматически запускает Jupyter Server и соответствующее ядро ​​для открытого вами Notebook. И одна из моих любимых функций, средство просмотра переменных PyCharm автоматически отслеживает текущее значение переменной.

Ниже мы видим пример Analytics Notebook, включенный в демонстрационный проект, отображаемый в PyCharm 19.1.2 (Professional Edition). Для эффективной работы с записными книжками в PyCharm действительно требуется полноразмерный монитор. Работать на ноутбуке с переполненным пользовательским интерфейсом Notebook в PyCharm можно, но, конечно, не так эффективно, как на большом мониторе.

Сервер ноутбуков Jupyter

Ниже мы видим тот же блокнот Analytics, показанный выше в PyCharm, открытый в клиентском веб-интерфейсе Jupyter Notebook Server и работающий локально на рабочей станции разработчика. Интерфейс на основе веб-браузера также предлагает богатый набор функций для разработки Notebook.

Из Notebook мы можем запрашивать данные из MongoDB Atlas, снова используя PyMongo, и загружать наборы результатов в Panda DataFrames. В качестве альтернативы жестко запрограммированным значениям и переменным среды в Notebooks я использую пакет Python python-dotenv. Этот пакет позволяет мне помещать переменные среды в общий файл .env и ссылаться на них из любого ноутбука. В пакете есть много возможностей для управления переменными окружения.

Затем мы можем проанализировать данные, используя ряд общих фреймворков, включая Pandas, Matplotlib, SciPy, PySpark и NumPy, и это лишь некоторые из них. Ниже мы видим данные временных рядов от четырех разных датчиков на одном устройстве IoT. Совместно просматривая данные, мы можем изучить причинное влияние одной переменной окружающей среды на другую, например влияние света на температуру или влажность.

Ниже мы можем использовать гистограммы для визуализации частотных
температурных интервалов во времени для данного местоположения устройства.

Машинное обучение с использованием Jupyter Notebooks

Помимо аналитики данных, мы можем использовать Jupyter Notebooks с такими инструментами, как scikit-learn, для построения моделей машинного обучения на основе телеметрии наших датчиков. Scikit-learn - это набор инструментов машинного обучения на Python, построенный на NumPy, SciPy и matplotlib. Ниже я использовал JupyterLab на платформе AI Google и scikit-learn, чтобы построить несколько моделей на основе данных датчиков.

Используя scikit-learn, мы можем создавать модели для прогнозирования таких вещей, как то, какое устройство IoT генерирует определенные значения температуры и влажности, или температуру и влажность, с учетом времени суток, местоположения устройства и переменных внешней среды, или обнаруживать аномалии в сенсорная телеметрия.

Scikit-learn позволяет легко создавать рандомизированные наборы данных для обучения и тестирования, создавать модели, используя данные с нескольких устройств IoT, как показано ниже.

В проект входит блокнот Jupyter, демонстрирующий, как построить несколько моделей машинного обучения с использованием данных датчиков. Примеры алгоритмов контролируемого обучения, используемых для построения моделей классификации в этой демонстрации, включают машину опорных векторов (SVM), k -ближайшие соседи (k -NN), и Классификатор случайных лесов.

Имея данные от нескольких датчиков, мы можем обогатить модели машинного обучения, добавив дополнительные категориальные (дискретные) функции к нашим обучающим данным. Например, мы могли бы изучить влияние света, движения и времени суток на температуру и влажность.

Заключение

Надеюсь, этот пост продемонстрировал, как эффективно собирать данные телеметрии с устройств IoT с помощью буферов протокола Google по HTTPS, бессерверных облачных функций Google, Cloud Pub / Sub и MongoDB Atlas, и все это на облачной платформе Google. После захвата телеметрические данные были легко агрегированы и проанализированы с помощью обычных инструментов, таких как MongoDB Compass и Jupyter Notebooks. Кроме того, мы использовали данные и инструменты для создания моделей машинного обучения для прогнозирования и обнаружения аномалий.

Все мнения, выраженные в этом посте, являются моими собственными и не обязательно являются взглядами моих нынешних или прошлых работодателей или их клиентов.

Изображение: все возможно © 123RF.com