Kubernetes - наша новая операционная система, в этом больше никто не может сомневаться. Поскольку было приложено много усилий для разработки подхода с использованием микросервисов и миграции рабочих нагрузок на Kubernetes, организации оставили свои службы данных позади.
Из-за COVID-19 мы все видели, насколько важны данные и насколько важно иметь правильную архитектуру и структуру данных. Данные не перестанут расти! более того, он будет продолжать бить рекорды потребления год за годом.
Эта задача заставляет нас предоставлять более автоматическое и масштабируемое решение для нашей организации, перемещая наши службы обработки данных в Kubernetes, где происходит все волшебство. Kubernetes предлагает операторов, которые помогут вам управлять операциями как первого, так и второго дня, используя проверки работоспособности, сохранение состояния, автопилоты и т. Д.
В этой демонстрации я хотел бы показать вам, как можно запускать автоматические конвейеры данных, используя операторы, которые предлагаются в каждой установке Openshift через Operator Hub
. Я решил взять Real-Time BI
в качестве примера использования и построить на его основе эту демонстрацию. Эта демонстрация использует механизмы Openshift для создания масштабируемых конвейеров данных на основе Kubernetes и использует все стандартные продукты де-факто для выполнения этих требований. Все операторы будут развернуты в кластере Openshift 4, а контейнерное хранилище Openshift предоставит базовое решение для хранения как объектных, так и блочных протоколов.
В этой демонстрации развертывается приложение для потоковой передачи музыки, которое генерирует события в зависимости от поведения пользователей (будет объяснено далее).
Используя эти генерируемые данные, мы можем использовать инструменты с открытым исходным кодом для создания наших информационных панелей и визуализаций, а также предоставить нашим заинтересованным сторонам, как нашим специалистам по данным, более надежный способ визуализации важных данных.
ЭТО НАПРЯМУЮ ВЛИЯЕТ НА БИЗНЕС-ЛОГИКУ!
Теперь, когда сообщение ясно, давайте начнем играть!
Предпосылки
- Работающий кластер Ceph (›RHCS4)
- Работающий кластер Openshift 4 (›4.6.8)
- Кластер OCS во внешнем режиме для предоставления как объектного, так и блочного хранилища.
Установка
Создайте новый проект в своем кластере Openshift, в котором должны быть развернуты все ресурсы:
$ oc new-project data-engineering-demo
Установите оба оператора AMQ Streams
и Presto
, поскольку они понадобятся нам для создания соответствующих ресурсов. перейдите в раздел Operator Hub
на левой панели, чтобы установить:
Клонируйте необходимый репозиторий git, чтобы вы могли развернуть демо:
$ git clone https://github.com/shonpaz123/cephdemos.git
Измените свой каталог на демонстрационный каталог, где находятся все манифесты:
$ cd cephdemos/data-engineering-pipeline-demo-ocp
Подготовка информационных сервисов
Подготовка нашей среды S3
Теперь, когда у нас есть все предпосылки, давайте начнем с создания необходимых ресурсов S3. Поскольку мы используем внешний кластер Ceph, мы должны создать необходимого пользователя S3 для взаимодействия с кластером. Кроме того, нам нужно создать корзину S3, чтобы Kafka мог экспортировать наши события в озеро данных. Давайте создадим эти ресурсы:
$ cd 01-ocs-external-ceph && ./run.sh && cd ..
Ожидаемый результат:
{ "user_id": "data-engineering-demo", "display_name": "data-engineering-demo", "email": "", "suspended": 0, "max_buckets": 1000, "subusers": [], "keys": [ { "user": "data-engineering-demo", "access_key": "HC8V2PT7HX8ZFS8NQ37R", "secret_key": "Y6CENKXozDDikJHQgkbLFM38muKBnmWBsAA1DXyU" } . . . } make_bucket: music-chart-songs-store-changelog
Сценарий использует awscli
для экспорта наших учетных данных в качестве переменных среды, чтобы мы могли правильно создать корзину. Убедитесь, что у вас есть доступ к URL-адресу конечной точки со всеми открытыми портами, чтобы этот сценарий работал правильно.
Развертывание Kafka new-ETL
Теперь, когда у нас есть готовый S3, нам нужно развернуть все необходимые ресурсы Kafka. В этом разделе мы развернем кластер Kafka с помощью оператора AMQ Streams
, который предлагается через Openshift Operator Hub
. Кроме того, мы развернем Kafka Topics и Kafka Connect, чтобы экспортировать все существующие тематические события в нашу корзину S3. Важный! убедитесь, что вы изменили URL-адрес конечной точки в соответствии с вашим, иначе Kafka Connect безуспешно попытается раскрыть события.
Запустите сценарий, чтобы создать эти ресурсы:
$ cd 02-kafka && ./run.sh && cd ..
Теперь давайте проверим, что все модули были успешно созданы:
$ oc get pods NAME READY STATUS RESTARTS AGE amq-streams-cluster-operator-v1.6.2-5b688f757-vhqcq 1/1 Running 0 7h35m my-cluster-entity-operator-5dfbdc56bd-75bxj 3/3 Running 0 92s my-cluster-kafka-0 1/1 Running 0 2m10s my-cluster-kafka-1 1/1 Running 0 2m10s my-cluster-kafka-2 1/1 Running 0 2m9s my-cluster-zookeeper-0 1/1 Running 0 2m42s my-connect-cluster-connect-7bdc77f479-vwdbs 1/1 Running 0 71s presto-operator-dbbc6b78f-m6p6l 1/1 Running 0 7h30m
Мы видим, что все модули находятся в рабочем состоянии и прошли проверку, поэтому давайте проверим, что у нас есть необходимые темы:
$ oc get kt NAME CLUSTER PARTITIONS REPLICATION FACTOR connect-cluster-configs my-cluster 1 3 connect-cluster-offsets my-cluster 25 3 connect-cluster-status my-cluster 5 3 consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a my-cluster 50 3 music-chart-songs-store-changelog my-cluster 1 1 played-songs my-cluster 12 3 songs my-cluster 12 3
Эти темы будут использоваться нашим потоковым приложением для получения, преобразования и экспорта этих событий в правильном формате в нашу корзину S3. В конце концов, тема music-chart-songs-store-changelog
будет содержать всю информацию с ее окончательной структурой, так что мы сможем запросить ее.
Запуск Presto для распределенных запросов
В этой демонстрации мы будем использовать возможность Presto запрашивать префиксы сегментов S3 (аналогично таблицам в реляционных базах данных). Presto необходимо создать схему, чтобы понять, какую файловую структуру он должен запрашивать. В нашем примере все события, экспортируемые в нашу корзину S3, будут выглядеть следующим образом:
{"count":7,"songName":"The Good The Bad And The Ugly"}
Каждый файл будет экспортирован со структурой JSON, содержащей две пары ключ-значение. Подчеркнем, что вы можете думать об этом как о таблице с двумя столбцами, где первый - count
, а второй - songName
, а все файлы, которые записываются в корзину, представляют собой просто строки с этой структурой.
Теперь, когда мы лучше понимаем нашу структуру данных, мы можем развернуть кластер Presto. Этот кластер создаст экземпляр куста для хранения метаданных схемы (с Postgres для хранения информации схемы) и кластер Presto, содержащий модули координатора и рабочих. Все эти ресурсы будут автоматически созданы оператором Presto, который также предлагается как часть Openshift Operator Hub.
Давайте запустим сценарий для создания этих ресурсов:
$ cd 04-presto && ./run.sh && cd ..
Теперь давайте проверим, что все модули были успешно созданы:
$ oc get pods | egrep -e "presto|postgres" NAME READY STATUS RESTARTS AGE hive-metastore-presto-cluster-576b7bb848-7btlw 1/1 Running 0 15s postgres-68d5445b7c-g9qkj 1/1 Running 0 77s presto-coordinator-presto-cluster-8f6cfd6dd-g9p4l 1/2 Running 0 15s presto-operator-dbbc6b78f-m6p6l 1/1 Running 0 7h33m presto-worker-presto-cluster-5b87f7c988-cg9m6 1/1 Running 0 15s
Визуализация данных в реальном времени с помощью Superset
Superset - это инструмент визуализации, который может представлять визуализацию и информационные панели из многих ресурсов JDBC, таких как Presto, Postgres и т. Д. Поскольку у Presto нет реального пользовательского интерфейса, который дает нам возможность исследовать наши данные, управлять разрешениями и RBAC, мы используйте Superset.
Запустите сценарий, чтобы развернуть Superset в вашем кластере:
$ cd 05-superset && ./run.sh && cd ..
Теперь убедитесь, что все поды были успешно созданы:
$ oc get pods | grep superset superset-1-deploy 0/1 Completed 0 72s superset-1-g65xr 1/1 Running 0 67s superset-db-init-6q75s 0/1 Completed 0 71s
Хороший! все прошло хорошо!
Подготовка логики данных
После того, как у нас будут готовы все наши инфраструктурные сервисы, нам нужно создать логику данных, лежащую в основе нашего потокового приложения. Поскольку Presto запрашивает данные из нашего ведра S3, нам необходимо создать схему, которая позволит Presto знать, как он должен запрашивать наши данные, в виде таблицы, чтобы предоставить сведения о структуре.
Войдите в свой Presto Coordinator
узел:
$ oc rsh $(oc get pods | grep coordinator | grep Running | awk '{print $1}')
Измените контекст для работы с каталогом ульев:
$ presto-cli --catalog hive
Создайте схему, которая сообщит Presto, что нужно использовать s3a
коннектор для запроса данных из префикса корзины S3:
$ CREATE SCHEMA hive.songs WITH (location='s3a://music-chart-songs-store-changelog/music-chart-songs-store-changelog.json/');
Измените контекст схемы и создайте таблицу:
$ USE hive.songs; $ CREATE TABLE songs (count int, songName varchar) WITH (format = 'json', external_location = 's3a://music-chart-songs-store-changelog/music-chart-songs-store-changelog.json/');
Обращать внимание! создание таблицы дает Presto фактическое знание структуры каждого файла, как мы видели в предыдущем разделе. Теперь попробуем запросить нашу корзину S3:
$ select * from songs; count | songname -------+---------- (0 rows) Query 20210203_162730_00005_7hsqi, FINISHED, 1 node Splits: 17 total, 17 done (100.00%) 1.01 [0 rows, 0B] [0 rows/s, 0B/s]
У нас нет данных, и все в порядке! мы не начали передавать какие-либо данные, но видим, что мы не получаем ошибок, что означает, что Presto может получить доступ к нашей службе S3.
Потоковая передача событий в реальном времени
Теперь, когда все ресурсы готовы к использованию, мы можем наконец развернуть наше потоковое приложение! Наше потоковое приложение на самом деле является производителем Kafka, который имитирует медиаплеер, у него есть заранее определенный список песен, которые случайным образом «проигрываются» нашим медиаплеером. Каждый раз, когда пользователь воспроизводит песню, событие отправляется в тему Kafka.
Затем мы используем Kafka Streams, чтобы преобразовать данные в желаемую структуру. Потоки будут принимать каждое событие, которое отправляется в Kafka, преобразовывать его и записывать в другую тему, где оно будет автоматически экспортировано в нашу корзину S3.
Запустим развертывание:
$ cd 03-music-chart-app && ./run.sh && cd ..
Давайте проверим, что все модули работают, модуль player-app
- это наш медиаплеер, а модуль music-chart
- это на самом деле модуль, содержащий всю логику Kafka Streams:
$ oc get pods | egrep -e "player|music" music-chart-576857c7f8-7l65x 1/1 Running 0 18s player-app-79fb9cd54f-bhtl5 1/1 Running 0 19s
Давайте посмотрим на player-app
журналы:
$ oc logs player-app-79fb9cd54f-bhtl5 2021-02-03 16:28:41,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 1: The Good The Bad And The Ugly played. 2021-02-03 16:28:46,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 1: The Good The Bad And The Ugly played. 2021-02-03 16:28:51,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 2: Believe played. 2021-02-03 16:28:56,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 3: Still Loving You played. 2021-02-03 16:29:01,972 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 2: Believe played. 2021-02-03 16:29:06,970 INFO [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 7: Fox On The Run played.
Мы видим, что данные записываются случайным образом, каждый раз, когда воспроизводится песня, в нашу тему Kafka отправляется событие. А теперь давайте взглянем на наши music-chart
журналы:
$ oc logs music-chart-576857c7f8-7l65x [KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=1, songName=Believe] [KTABLE-TOSTREAM-0000000006]: 8, PlayedSong [count=1, songName=Perfect] [KTABLE-TOSTREAM-0000000006]: 3, PlayedSong [count=1, songName=Still Loving You] [KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=1, songName=The Good The Bad And The Ugly] [KTABLE-TOSTREAM-0000000006]: 6, PlayedSong [count=1, songName=Into The Unknown] [KTABLE-TOSTREAM-0000000006]: 3, PlayedSong [count=2, songName=Still Loving You] [KTABLE-TOSTREAM-0000000006]: 5, PlayedSong [count=1, songName=Sometimes] [KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=2, songName=Believe] [KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=2, songName=The Good The Bad And The Ugly]
Мы видим, что данные успешно преобразуются, и что число счетчиков увеличивается по мере того, как пользователи воспроизводят больше песен.
Теперь нам нужно убедиться, что наш конвейер работает, поэтому давайте перейдем к нашему сервису S3, чтобы убедиться, что все события экспортируются успешно. для этого я использовал Sree в качестве браузера S3. Убедитесь, что вы используете правильные учетные данные и URL конечной точки:
Вернемся к нашему модулю координатора Presto и снова попробуем запросить наши данные:
$ presto> presto-cli --catalog hive $ presto:songs> USE hive.songs;
Запустите SQL-запрос, чтобы получить наши данные:
$ select * from songs; count | songname -------+------------------------------- 1 | Bohemian Rhapsody 4 | Still Loving You 1 | The Good The Bad And The Ugly 3 | Believe 1 | Perfect 1 | Sometimes 2 | The Good The Bad And The Ugly 2 | Bohemian Rhapsody 3 | Still Loving You 4 | Sometimes 2 | Into The Unknown 4 | Believe 4 | Into The Unknown 2 | Sometimes 5 | Still Loving You 3 | The Good The Bad And The Ugly
Удивительный! Мы видим, что наши данные обновляются автоматически! попробуйте выполнить эту команду еще несколько раз, и вы увидите, что количество строк растет. Теперь, чтобы начать визуализацию наших данных, найдите маршрут Superset, по которому вы сможете войти в консоль:
$ oc get route NAME HOST/PORT PATH SERVICES PORT TERMINATION WILDCARD superset superset-data-engineering-demo.apps.ocp.spaz.local superset 8088-tcp None
Когда мы дойдем до нашей консоли Superset (войдите с помощью admin:admin
), мы увидим, что можем перейти к Manage Databases
- ›Create Database
, чтобы создать наше соединение Preto, убедитесь, что вы указали имя службы Presto ClusterIP, в конце убедитесь, что вы проверили свое соединение :
Теперь, когда у нас появился более удобный способ запроса данных, давайте попробуем немного изучить наши данные. Перейдите к SQL Lab
и убедитесь, что вы можете выполнить наш предыдущий запрос. Чтобы подчеркнуть это, посмотрите следующий запрос, который покажет, сколько раз была проиграна каждая песня:
Хорошо! мы можем запрашивать данные! не стесняйтесь создавать все свои визуализации и информационные панели. В качестве примера я создал информационную панель, которая изменяется в реальном времени, так как каждое обновление панели фактически снова запрашивает все данные из Presto:
Вывод
В этой демонстрации мы увидели, как мы можем использовать продукты с открытым исходным кодом для запуска автоматических конвейеров данных, и все это запланировано на Openshift. Поскольку Kubernetes бьет рекорды по внедрению, организациям следует подумать о переносе своих рабочих нагрузок в Kubernetes, чтобы их сервисы данных не остались позади. Используя Red Hat и партнеров-операторов, Openshift предлагает управление вашими службами данных как на день 1, так и на день 2.
Спасибо, что прочитали этот пост в блоге, увидимся в следующий раз :)