Извлечение ответов из данных с использованием возможностей потоковых конвейеров ETL — полное руководство с использованием реального набора данных

Введение

Потоковая передача данных — горячая тема в современном мире обработки данных. Если вы читаете посты, связанные с данными, на Medium или ищете работу в LinkedIn, возможно, вы как-то наткнулись на эту информацию. В основном это проявляется в виде требований к работе, таких как Kafka, Flink, Spark или других сложных инструментов, ориентированных на приложения для работы с большими данными.

Эти инструменты используются крупными компаниями для расширения возможностей обработки данных. Недавно я столкнулся с реальностью ИТ в бразильском государственном секторе, поэтому я думал о том, как потоковая передача данных может помочь не только компаниям, но и обществу (напрямую).

Как всегда, я думаю, что нет лучшего способа изучить эти инструменты и проверить свои идеи, чем попробовать реальный (или почти такой) проект. В этом посте мы рассмотрим использование ksqlDB (инструмент, связанный с Kafka), чтобы упростить анализ данных о дорожно-транспортных происшествиях, используя реальные данные бразильской федеральной дорожной полиции (Policia Rodoviária Federal, дословный перевод). ).

Мы создадим конвейер данных, вдохновленный архитектурой Medallion, чтобы обеспечить постепенный (и, возможно, в реальном времени) анализ данных об авариях.

Мы стремимся узнать больше о потоковой передаче данных, Kafka и ksqlDB.

Я надеюсь, вам понравится это!

Проблема

Бразильская федеральная дорожная полиция (сокращенно PRF), как следует из названия, — это наша полиция, отвечающая за охрану автомагистралей. Ежегодно собирает и публикует данные о дорожно-транспортных происшествиях (лицензия CC BY-ND 3.0) с информацией о пострадавших (возраст, пол, состояние), погодных условиях, времени и месте, причинах и следствиях.

Я познакомился с этим набором данных на курсе бакалавриата, и я думаю, что очень полезно узнать об ETL, потому что он одновременно очень богат информацией и очень проблематичен в форматировании. В нем много пропущенных значений, несогласованных типов, несогласованных столбцов, нестандартизированных значений, опечаток и т. д.

Теперь предположим, что правительство пытается улучшить политику предотвращения несчастных случаев, но для этого ему необходимо ответить на следующие вопросы:

  • Сколько человек ежемесячно попадает в аварии?
  • Каково ежемесячное количество и процент неповрежденных, легкораненых, тяжелораненых и мертвых?
  • Каков процент людей, вовлеченных в несчастные случаи каждого пола в каждом месяце?
  • Смертность от каждого вида несчастных случаев

К сожалению, они не могут дождаться выпуска годового отчета, поэтому нам необходимо настроить добавочный отчет, который связывается с их внутренней базой данных и обновляет результаты, отображаемые на информационных панелях, как только новые аварии добавляются в систему.

Однако данные, вставленные в систему, не обязательно идеальны и могут иметь те же проблемы, что и опубликованные наборы данных, поэтому нам также необходимо очистить и преобразовать записи, чтобы сделать их полезными в окончательном отчете.

И здесь на помощь приходит ksqlDB.

Отказ от ответственности: Помимо использования реальных данных от правительства Бразилии, описанная ситуация является вымышленной, используемой для изучения ksqlDB на практике с интересной целью.

Apache Kafka и ksqlDB

Apache Kafka — это распределенная платформа для потоковой передачи событий с открытым исходным кодом, короче говоря, инструмент, используемый для отправки и получения сообщений. В Kafka у нас есть структура топиков, содержащих сообщения (буквально просто строка байтов), написанные производителем и прочитанные потребителем.

Это один из наиболее важных инструментов для потоковой передачи данных, особенно в контексте больших данных, поскольку он может легко обрабатывать миллионы сообщений с высокой пропускной способностью. Он используется такими компаниями, как Uber и Netflix, для улучшения своих возможностей обработки данных, позволяя, например, анализировать данные в реальном времени и машинное обучение.

«ksqlDB — это база данных, специально созданная для приложений потоковой обработки поверх Apache Kafka». Это инструмент из экосистемы Kafka, который позволяет нам обрабатывать темы Kafka так же, как традиционные таблицы из реляционных баз данных, и выполнять к ним SQL-подобные запросы.

Хранилище ksqlDB основано на двух основных структурах — потоках и таблицах. Потоки похожи на обычные темы Kafka, неизменяемые коллекции только для добавления, т.е. постоянно растущий список сообщений. Потоки можно использовать для представления исторической последовательности событий, например транзакций в банке.

Таблицы, с другой стороны, представляют собой изменяемые коллекции, которые представляют текущее состояние/моментальный снимок группы. Для этого они используют концепцию первичных ключей. При получении сообщений в таблице будут храниться только последние значения для данного ключа.

Помимо различий, ПОТОКИ и ТАБЛИЦЫ основаны на одной и той же базовой структуре темы Кафки.

Как упоминалось ранее, ksqlDB полностью основан на SQL, и, если вы не делаете что-то особенное, язык программирования не требуется. Из-за этого, если у вас уже есть опыт работы с SQL, вы можете легко перейти от обычной реляционной среды к потоковой среде без особых накладных расходов.

Важно отметить, что аналогичного поведения можно добиться с помощью других инструментов, таких как Apache Spark, или даже с потребителями, написанными вручную, но я думаю, что ksqlDb имеет простой и удобный для начинающих интерфейс.

ksqlDB не является Open Source, она принадлежит Confluent Inc. (узнайте больше о ее лицензии здесь), но это руководство можно пройти с помощью бесплатной автономной версии.

На момент написания статьи автор не имеет никакого отношения к Confluent Inc. (владелец ksqlDB), и высказанные здесь мнения в основном личные.

Реализация

Основная идея этого проекта — использовать ksqlDB для создания потокового ETL-конвейера. Конвейер будет основан на архитектуре Medallion, которая постепенно делит данные на более точные состояния, классифицируемые на бронзовые, серебряные и золотые.

Короче говоря, бронзовый слой хранит необработанные данные, серебряный слой хранит очищенные данные, а золотой слой хранит обогащенные и агрегированные данные. Мы будем использовать MongoDB для долговременного хранения.

Помимо потоков и таблиц, мы также будем использовать коннекторы баз данных для перемещения данных между слоями. Эти коннекторы отвечают за перемещение записей из базы данных (в данном случае MongoDB) в темы Kafka (в процессе, называемом сбором измененных данных) и наоборот.

Исходные данные, использованные в этом посте, можно найти в формате CSV по ссылке. Предварительно я уже преобразовал данные за 2015–2020 годы в файл parquet, чтобы уменьшить его размер и улучшить время чтения, этот файл будет доступен в репозитории GitHub.

Настройка среды

Эта среда проекта основана на файлах докеров, которые можно найти в этом официальном руководстве по ksqlDB.

Что вам нужно:

  • Докер и Докер составляют
  • Коннекторы приемника и источника MongoDB для Kafka — Debezium MongoDB CDC Source Connector и Коннектор MongoDB (приемник).
  • (Необязательно) Python 3.8+ с. Используется для вставки записей в MongoDB.

Загруженные коннекторы необходимо поместить в папку /plugins по тому же пути, что и файл docker-compose.

Затем контейнеры можно запустить обычным способом с помощью команды docker-compose up.

После этого подключитесь к оболочке MongoDB с помощью команды mongo -u mongo -p mongo внутри контейнера и запустите базу данных с помощью rs.initiate().

Если во время выполнения возникают проблемы с коннектором MongoDB, глубокую ориентацию можно найти по ссылке учебника и в справочниках.

Бронзовый слой — извлечение необработанных данных

Бронзовый уровень хранит необработанные данные, извлеченные из транзакционной среды, без каких-либо преобразований или очистки, просто процесс ctrl+c Ctrl+v. В нашем случае этот слой должен извлекать информацию из базы данных, где изначально зарегистрированы аварии.

Для простоты мы создадим записи непосредственно на бронзовом слое.

Этот слой будет представлен коллекцией MongoDB с именем accidents_bronze внутри базы данных accidents.

Чтобы переместить записи из MongoDB в ksqlDB, нам нужно настроить исходный коннектор. Этот коннектор отвечает за просмотр коллекции и потоковую передачу всех обнаруженных изменений (вставок, удалений и обновлений) в форме структурированных сообщений (в AVRO или JSON) в тему Kafka.

Для начала подключитесь к экземпляру сервера ksqlDB через ksqlDB-client с помощью docker exec.

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

Если все пойдет хорошо, вы должны увидеть на экране большую KSQLDB с сообщением «РАБОТАЕТ». Это клиентский интерфейс ksqlDB, откуда мы взаимодействуем с сервером для определения наших потоков, таблиц, коннекторов и запросов.

Прежде чем продолжить, нам нужно запустить следующую команду

SET 'auto.offset.reset' = 'earliest';

Это гарантирует, что все определенные запросы будут начинаться с самой ранней точки в каждой теме.

Затем создание коннектора — это просто описание некоторых конфигураций.

Команда начинается с предложения CREATE SOURCE CONNECTOR, за которым следует имя соединителя и конфигурации. Предложение WITH указывает используемые конфигурации.

Сначала определяется connector.class. Это сам коннектор, класс Java, реализующий его логику. Мы будем использовать коннектор Debezium MongoDB, который ранее был включен в папку плагинов.

Во-вторых, мы передаем адрес MongoDB (хост + имя) и учетные данные (логин + пароль).

Затем мы определяем, какие коллекции в нашей базе данных будут отслеживаться.

Наконец, параметр transforms задает упрощение сообщений, создаваемых коннектором Debezium, а errors.tolerance определяет поведение коннектора для сообщений, выдающих ошибки (поведение по умолчанию — остановить исполнение).

Создав коннектор, давайте выполним запрос DESCRIBE CONNECTOR, чтобы увидеть его текущий статус. Любые ошибки, возникающие при его выполнении, должны выводиться здесь.

Теперь, когда наш соединитель запущен, он начнет передавать все изменения в коллекции accidents_bronze в тему
replica-set.accidents.accidents_bronze.

ksqlDB не может напрямую обрабатывать темы Kafka, поэтому нам нужно определить STREAM, используя его.

Определение STREAM в ksqlDB почти равносильно созданию таблицы в SQL. Вам нужно передать имя, список столбцов с соответствующими типами и некоторые конфигурации в предложении WITH.

В нашем случае нам нужно настроить, какая тема будет кормить наш поток. Из-за этого имена и типы столбцов должны соответствовать полям в исходных сообщениях темы.

См. команду ниже.

Вам не нужно включать ВСЕ исходные поля, как я сделал выше.

Кратко о том, что мы уже сделали:
1. Мы настроили коннектор исходного кода MongoDB для потоковой передачи изменений в коллекции accidents_bronzeв виде структурированных сообщений
2. . Используя автоматически созданную тему replica-set.accidents.accidents_bronze , мы определили STREAM в ksqlDB с именем ACCIDENTS_BRONZE_STREAM, чтобы разрешить обработку его сообщений.

Теперь, когда поток настроен, можно выполнять над ним запросы SELECT, и именно здесь начинается настоящее волшебство.

Я позволил себе вставить несколько записей в экземпляр mongoDB, так что у нас есть данные для экспериментов.

Например, давайте выберем данные и id каждого сообщения.

В ksqlDB эти обычные операторы SQL называются PULL QUERIES, потому что они возвращают ответ на основе текущего состояния и завершения потока.

Добавляя EMIT CHANGES в конце PULL QUERY, он превращается в PUSH QUERIES. В отличие от своего аналога, он никогда не завершается и всегда вычисляет новые строки на основе поступающих сообщений. Давайте посмотрим, как это работает.

Слева у нас есть PUSH QUERY, а справа простой скрипт Python, вставляющий записи в MongoDB. По мере вставки новых записей они автоматически появляются в ответе на запрос.

Запросы Push и Pull будут нашими строительными блоками для создания всех преобразований, необходимых в следующих разделах.

Серебряный слой — очистка данных

Цель Silver Layer — хранить очищенные данные, которые могут быть легко использованы другими приложениями, такими как проекты машинного обучения и Gold Layer.

Этот уровень будет представлен коллекцией MongoDB с именем accidents_silver внутри базы данных accidents.

Наша главная забота — гарантировать, что данные будут правильно отформатированы, чтобы последующие задачи могли сосредоточиться на решении своих конкретных бизнес-правил.

Для этого нам нужно обрабатывать поступающие сообщения в бронзовом потоке и сохранять их в коллекции accidents_silver. Этот процесс можно разделить на два этапа: «создать поток для очистки сообщений» и «создать коннектор приемника для сохранения сообщений в accidents_silver».

Теперь мы познакомились с истинной мощью ksqlDB — потоковой обработкой.

Можно определитьпоток, используя запрос к другим потокам. Новый поток будет заполнен результатом запроса.

Давайте посмотрим, как это работает.

Например, если нам нужен новый STREAM, содержащий только _id и дату, где он не равен нулю, мы могли бы сделать это:

С помощью этой функции можно создать преобразование (бронзовый_в_серебряный) ПОТОК, который отвечает за выбор и очистку сообщений из бронзового потока.

В нашем примере необходимо очистить поля: sexo (пол), tipo_acidente (тип аварии), ilesos (невредим), feridos_leves (легко ранен), feridos_graves (сильно_ранен), mortos(мертв) и data_inversa (дата).

Заглянув в базу данных (это я сделал за кадром), можно отметить следующие проблемы:

  1. Столбец "гендер" содержит несколько значений, представляющих один и тот же пол: мужской может быть "мужской" или "м", а женский может быть либо "женский' или 'f'.
  2. Тип аварии также содержит несколько значений для «того же типа».
  3. Дата может быть отформатирована одним из следующих способов: 2019–12–20, 20/12/2019 или 20/12/19.
  4. Отсутствующие значения кодируются как NULL, строка «NULL» или строка «(null)».

Помимо этих преобразований, я также (попытаюсь) перевести поля и значения, чтобы облегчить понимание.

Исправление этих проблем реализовано в accidents_bronze_to_silverSTREAM, определенном ниже:

Я не буду подробно объяснять приведенную выше команду SQL, главное — понять, что можно сделать с ksqlDB.

Мы можем построить мощный процесс преобразования потока сообщений, используя (почти) только знания SQL!

Давайте посмотрим, как это работает ниже.

Последний шаг — сохранить данные в MongoDB с помощью Sink Connector.

Для коннектора выше используется коннектор Kafka MongoDB, а остальные конфигурации говорят сами за себя.

accidents_silver создается автоматически, результаты показаны ниже.

Теперь, когда у нас есть точные данные об авариях, пришло время, наконец, ответить на наши вопросы.

Gold Layer — бизнес-правила и агрегаты

Золотой слой содержит специфические для бизнеса правила, ориентированные на решение проблем конкретного проекта.

В нашем проекте будут определены два «золотых слоя», один из которых будет посвящен ответам на ежемесячно агрегированные вопросы, а другой — ответам на уровень смертности в каждом несчастном случае, каждый из которых будет храниться в отдельной коллекции.

С архитектурной точки зрения в этом шаге нет ничего нового. Это похоже на предыдущий шаг, где мы получаем данные из потока, преобразуем их в соответствии с нашими потребностями и сохраняем результаты в базе данных.

Что отличает этот шаг, так это необходимые агрегации.

Чтобы ответить на наши вопросы, нам не нужно хранить каждое отдельное происшествие, а только текущее количество смертей и травм за каждый месяц (пример). Итак, вместо использования ПОТОКОВ мы будем использовать ТАБЛИЦЫ.

К счастью, с точки зрения синтаксиса нет большой разницы между определениями таблицы и потока.

Как упоминалось ранее, таблицы имеют первичный ключ. Для этих примеров нам не нужно явно определять ключи, потому что ksqlDB создает их автоматически со столбцами, используемыми в GROUP BY.

В ksqlDB агрегирование может выполняться только в PUSH-запросах, поэтому в конце запроса требуется «EMIT CHANGES».

Начнем с ежемесячной агрегированной таблицы.

И увидеть таблицу в действии…

По мере вставки новых записей таблица (справа) автоматически обновляет счетчики каждого месяца. Посмотрим внимательно на результаты.

Та же логика применима и к таблице смертности, где мы рассчитываем вероятность смерти в каждом типе несчастного случая.

Снова посмотрите на таблицу в действии…

Наконец, все, что осталось, — это сохранить каждую таблицу в соответствующей коллекции MongoDB.

Этот коннектор приемника имеет несколько различных конфигураций (преобразования и document.id.strategy), используемых для создания поля _id в MongoDB, соответствующий первичному ключу таблицы.

И результаты должны начать появляться в коллекциях.

Заключение

Потоковая обработка уже стала реальностью во многих компаниях, особенно среди крупных игроков. Среди используемых технологий Apache Kafka является ведущим решением для потоковой передачи сообщений между приложениями и базами данных с огромной экосистемой вспомогательных инструментов для обработки интенсивных задач, связанных с данными. ksqlDB является одним из них.

В этом посте мы узнали о ksqlDB с помощью практического проекта с использованием реального набора данных.

В своих проектах я всегда стараюсь смешивать изученные инструменты и концепции с интересными темами из реальности. По этой причине я выбрал настоящий набор данных из Открытых государственных данных Бразилии с данными о дорожно-транспортных происшествиях.

Чтобы обеспечить постепенный (и, возможно, в реальном времени) анализ данных, я предложил использовать архитектуру Medallion для преобразования необработанных неформатированных данных в ответы на вопросы. Основная цель этой архитектуры состояла в том, чтобы позволить исследовать различные концепции ksqlDB.

Мы узнали об основных единицах хранения ksqlSB (потоках и таблицах), push- и pull-запросах и, самое главное, о том, как этот инструмент может помочь нам решить (настоящую?) проблему инженерии данных.

Как всегда, я не являюсь экспертом ни в одной из обсуждаемых тем, и я настоятельно рекомендую читать дальше, см. ссылки ниже.

Спасибо за чтение ;)

Рекомендации

Весь код доступен в этом репозитории GitHub.

[1] Архитектура медальона — Глоссарий Databricks
[2] Что такое архитектура домика у озера в виде медальона? — Microsoft Learn
[3] Конвейер Streaming ETL — официальная документация ksqlDB
[4] Streams and Tables — Учебное пособие по Confluent ksqlDB
[5] Использование Apache Kafka в Netflix Studio and Finance World — Блог Confluent
[6] Коннектор приемника MongoDB Kafka — Официальная документация Mongo
[7] Коннектор источника и приемника MongoDB — Confluent Hub
[8] Sink Постпроцессоры коннектора, настройте идентификатор документа в коннекторе приемника — Официальная документация Mongo
[9] Presto® на Apache Kafka® в масштабе Uber— Блог Uber

Все изображения в этом сообщении сделаны автором, если не указано иное.