Примечание редактора. Марк Нидхэм будет спикером ODSC Europe в июне этого года. Обязательно ознакомьтесь с его докладом «Создание приложения для аналитики в реальном времени для службы доставки пиццы» здесь!

Gartner определяет аналитику в реальном времени следующим образом:

Аналитика в реальном времени — это дисциплина, которая применяет логику и математику к данным, чтобы предоставить полезную информацию для быстрого принятия лучших решений.

То, что я выделил жирным шрифтом, на мой взгляд, является наиболее важной частью определения.

Чтобы понять, что это значит, мы должны начать с представления о мире с точки зрения событий, где событие — это то, что происходит. Это может быть кто-то, кто что-то заказывает, что-то отправляется, фактически любое действие, которое произошло в прошлом.

И мы возьмем эти события, осознаем их и поймем их. Мы хотим знать, что произошло, получить некоторое представление об этом событии, а затем принять меры на основе этого события прямо сейчас!

Тот факт, что мы хотим действовать немедленно, отличает аналитику в реальном времени от более пакетной/исторической аналитики.

Аналитика в реальном времени (RTA) стала настолько распространенной в последние годы, что вы можете даже не знать, что использовали приложение, которое ее реализует. Одним из самых популярных применений RTA является функция LinkedIn «Кто просматривал ваш профиль», которая показывает вам количество людей, которые недавно просматривали ваш профиль, а также их имена.

Они также сбивают с толку некоторых зрителей, пытаясь убедить вас подписаться на LinkedIn Premium. Но кроме того, причина показа пользователям этих данных в режиме реального времени заключается в том, чтобы люди совершали действия, чтобы отправлять сообщения друг другу. Может быть, у человека, просматривающего ваш профиль, есть интересная работа или возможность — вам просто нужно это узнать!

Это и другие приложения RTA создаются с использованием некоторой комбинации инструментов, образующих стек RTA.

Стек RTA

Стек RTA описывает ряд инструментов и процессов, которые вы можете использовать для получения информации из неограниченных данных и создания приложений, таких как LinkedIn, которые просматривали ваш профиль. Диаграмма, показывающая стек, приведена ниже:

Давайте быстро пройдемся по каждому из компонентов:

  • Производители событий — обнаруживают изменения состояния в исходных системах и инициируют события, которые могут использоваться и обрабатываться нижестоящими приложениями.
  • Платформа потоковой передачи — действует как источник достоверных данных о событиях и поэтому должна обрабатывать большие объемы и параллелизм данных, производимых и потребляемых. Надежно сохраняет события, чтобы последующие компоненты могли их обрабатывать.
  • Потоковый процессор — считывает события с платформы потоковых данных, а затем выполняет какие-либо действия с этим событием.
  • Уровень обслуживания — основная точка доступа для использования аналитики в реальном времени, созданной на платформе потоковой передачи событий.
  • Внешний интерфейс — то, с чем взаимодействуют конечные пользователи.

Создание службы доставки пиццы

Итак, это теория, но как применить ее на практике? Я думаю, всегда интересно создать сквозной пример, поэтому мы рассмотрим, как онлайн-сервис пиццы может использовать аналитику в реальном времени для управления своим бизнесом.

Предположим, что служба пиццы уже собирает заказы в Apache Kafka, а также ведет учет своих клиентов и продуктов, которые они продают, в MySQL.

Нас пригласили в качестве консультантов, чтобы помочь им получить представление о том, что происходит в бизнесе в режиме реального времени. Первое требование — создать панель управления операциями, которая показывает количество заказов, общий доход и среднюю стоимость заказа за последние несколько минут. Мы также должны указать, в каком направлении это имеет тенденцию.

Мы можем решить эту проблему, используя только данные в теме заказов. Давайте посмотрим на содержание этой темы, используя инструмент командной строки kcat:

kcat -C -b localhost:29092 -t orders -c1 | jq
{
  "id": "80ad8992-dc34-463a-a5c9-e4cf8d79fa13",
  "createdAt": "2023-05-22T14:25:56.565681",
  "userId": 2783,
  "price": 1241,
  "items": [
    {
      "productId": "56",
      "quantity": 3,
      "price": 155
    },
    {
      "productId": "36",
      "quantity": 3,
      "price": 60
    },
    {
      "productId": "50",
      "quantity": 4,
      "price": 149
    }
  ],
  "deliveryLat": "12.95916643",
  "deliveryLon": "77.54959169"
}

Каждое событие содержит идентификатор события, идентификатор пользователя, время размещения заказа, а затем массив элементов заказа. Похоже, все работает хорошо, поэтому давайте посмотрим, как принимать эти события в Apache Pinot.

Apache Pinot — это база данных OLAP в реальном времени, созданная в LinkedIn для предоставления масштабируемой аналитики в реальном времени с малой задержкой. Он может получать из источников пакетных данных (таких как Hadoop HDFS, Amazon S3 и Google Cloud Storage), а также из источников потоковых данных (таких как Apache Kafka и Redpanda).

Pinot хранит данные в таблицах, каждая из которых должна сначала определить схему. Схема для нашей таблицы определена ниже:

{
   "schemaName": "orders",
    "dimensionFieldSpecs": [
      {"name": "id", "dataType": "STRING"},
      {"name": "userId", "dataType": "INT"},
      {"name": "deliveryLat", "dataType": "DOUBLE"},
      {"name": "deliveryLon", "dataType": "DOUBLE"},
      {"name": "items", "dataType": "JSON"}
    ],
    "metricFieldSpecs": [
      {"name": "productsOrdered", "dataType": "INT"},
      {"name": "totalQuantity", "dataType": "INT"},
      {"name": "price", "dataType": "DOUBLE"}
    ],
    "dateTimeFieldSpecs": [
      {
        "name": "ts",
        "dataType": "TIMESTAMP",
        "format": "1:MILLISECONDS:EPOCH",
        "granularity": "1:MILLISECONDS"
      }
    ]
}

Pinot автоматически сопоставит поля из источника данных со столбцами в схеме, если они имеют одинаковое имя. Это относится к идентификатору, идентификатору пользователя, широте доставки, продолжительности доставки, товарам и цене. Остальные поля будут заполнены функцией преобразования, описанной в конфигурации таблицы:

{
    "tableName": "orders",
    "tableType": "REALTIME",
    "segmentsConfig": {
        "timeColumnName": "ts",
        "timeType": "MILLISECONDS",
        "retentionTimeUnit": "DAYS",
        "retentionTimeValue": "1",
        "schemaName": "orders",
        "replicasPerPartition": "1"
    },
    "tenants": {},
    "tableIndexConfig": {
        "loadMode": "MMAP",
        "streamConfigs": {
            "streamType": "kafka",
            "stream.kafka.consumer.type": "lowLevel",
            "stream.kafka.topic.name": "orders",
            "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
            "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", 
            "stream.kafka.broker.list": "kafka:9092",
            "stream.kafka.consumer.prop.auto.offset.reset": "smallest"
        }
    },
    "ingestionConfig": {
        "transformConfigs": [
            {
                "columnName": "ts",
                "transformFunction": "FromDateTime(createdAt, 'yyyy-MM-dd''T''HH:mm:ss.SSSSSS')"
            },
            {
                "columnName": "totalQuantity",
                "transformFunction": "JSONPATHLONG(items, '$.sum($.[*].quantity)')"
            },
            {
                "columnName": "productsOrdered",
                "transformFunction": "JSONPATHARRAY(items, '$.length()')"
            }
        ]
    }
}

Функции преобразования определяются в файле ingestionConfig.transformConfigs. Эти функции заполняют поле на основе значений, которые существуют где-либо еще в исходных данных.

Эта таблица имеет тип таблицы REALTIME, что означает, что она будет принимать данные с платформы потоковых данных. Конфигурация для подключения к Kafka определяется в tableIndexConfig.streamConfigs.

Затем мы можем создать таблицу и схему, выполнив следующую команду:

docker run \
  -v $PWD/pinot/config:/config \
  --network pizza-shop \
  apachepinot/pinot:0.12.0-arm64 \
  AddTable \
  -schemaFile /config/orders/schema.json \
  -tableConfigFile /config/orders/table.json \
  -controllerHost pinot-controller \
  -exec

Это займет несколько секунд, и как только это будет сделано, мы перейдем к http://localhost:9000, щелкните Консоль запросов, а затем щелкните таблицу Заказы. Мы должны увидеть что-то вроде этого:

Наши заказы успешно поступают в Пино. Затем мы могли бы написать запрос, чтобы увидеть, сколько заказов было сделано за последнюю 1 минуту и ​​за 1 минуту до этого:

select 
  count(*) FILTER(WHERE  ts > ago('PT1M')) AS events1Min,
  count(*) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) 
  AS events1Min2Min,
  sum(price) FILTER(WHERE  ts > ago('PT1M')) AS total1Min,
  sum(price) FILTER(WHERE  ts <= ago('PT1M') AND ts > ago('PT2M')) 
  AS total1Min2Min
from orders 
where ts > ago('PT2M');

Похоже, у нас немного больше событий за последние 2 минуты, но в них немного.

Пользовательский интерфейс Pinot хорош для выполнения специальных запросов, но в конечном итоге мы захотим закодировать их во внешнем приложении. Мы будем использовать Streamlit, библиотеку Python с открытым исходным кодом, которая позволяет разработчикам с легкостью создавать интерактивные, удобные веб-приложения для машинного обучения и обработки данных. Это упрощает процесс создания визуализаций данных и инструментов аналитики, не требуя обширных знаний в области веб-разработки.

Визуализация наших данных показана ниже:

Соответствующий код показан ниже:

import streamlit as st
import pandas as pd
from pinotdb import connect
from datetime import datetime
import os
pinot_host=os.environ.get("PINOT_SERVER", "pinot-broker")
pinot_port=os.environ.get("PINOT_PORT", 8099)
conn = connect(pinot_host, pinot_port)
st.set_page_config(layout="wide")
st.title("All About That Dough Dashboard 🍕")
now = datetime.now()
dt_string = now.strftime("%d %B %Y %H:%M:%S")
st.write(f"Last update: {dt_string}")
curs = conn.cursor()
query = """
select count(*) FILTER(WHERE ts > ago('PT1M')) AS events1Min,
     count(*) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS events1Min2Min,
     sum(price) FILTER(WHERE ts > ago('PT1M')) AS total1Min,
     sum(price) FILTER(WHERE ts <= ago('PT1M') AND ts > ago('PT2M')) AS total1Min2Min
from orders 
where ts > ago('PT2M')
limit 1
"""
curs.execute(query)
df = pd.DataFrame(curs, columns=[item[0] for item in curs.description])
st.subheader("Orders in the last minute")
metric1, metric2, metric3 = st.columns(3)
metric1.metric(
     label="# of Orders",
     value="{:,}".format(int(df['events1Min'].values[0])),
     delta="{:,}".format(int(df['events1Min'].values[0] - df['events1Min2Min'].values[0])) 
          if df['events1Min2Min'].values[0] > 0 else None
)
metric2.metric(
     label="Revenue in ₹",
     value="{:,.2f}".format(df['total1Min'].values[0]),
     delta="{:,.2f}".format(df['total1Min'].values[0] - df['total1Min2Min'].values[0]) 
          if df['total1Min2Min'].values[0] > 0 else None
)
average_order_value_1min = df['total1Min'].values[0] / int(df['events1Min'].values[0])
average_order_value_1min_2min = (df['total1Min2Min'].values[0] / int(df['events1Min2Min'].values[0])
     if int(df['events1Min2Min'].values[0]) > 0
     else 0)
metric3.metric(
     label="Average order value in ₹",
     value="{:,.2f}".format(average_order_value_1min),
     delta="{:,.2f}".format(average_order_value_1min - average_order_value_1min_2min) 
          if average_order_value_1min_2min > 0 else None
)

Краткое содержание

Это только первая часть приложения. Мы сделаем еще несколько итераций приложения, используя Debezium для переноса данных из MySQL в Kafka и RisngWave для объединения потоков. Мы также обновим нашу панель инструментов Streamlit, чтобы просматривать самые продаваемые продукты и категории.

Если это пробудило ваше любопытство, я буду рад видеть вас на конференции ODSC Europe, где я проведу семинар по созданию остальной части этого приложения!

Об авторе/спикере ODSC Europe:

Марк Нидхэм (Mark Needham) — защитник Apache Pinot и инженер по связям с разработчиками в StarTree. Как инженер по связям с разработчиками, Марк помогает пользователям научиться использовать Apache Pinot для создания аналитических приложений, ориентированных на пользователя, в реальном времени. Он также помогает разработчикам, упрощая начало работы, внося изменения в продукт и улучшая документацию. Марк пишет о своем опыте работы с Pinot на сайте markhneedham.com. Он пишет в Твиттере @markhneedham.

Первоначально опубликовано на OpenDataScience.com

Читайте другие статьи по науке о данных на OpenDataScience.com, включая учебные пособия и руководства от начального до продвинутого уровня! Подпишитесь на нашу еженедельную рассылку здесь и получайте последние новости каждый четверг. Вы также можете пройти обучение по науке о данных по запросу, где бы вы ни находились, с нашей платформой Ai+ Training. Подпишитесь также на нашу быстрорастущую публикацию на Medium, ODSC Journal, и узнайте, как стать писателем.