Kubernetes - наша новая операционная система, в этом больше никто не может сомневаться. Поскольку было приложено много усилий для разработки подхода с использованием микросервисов и миграции рабочих нагрузок на Kubernetes, организации оставили свои службы данных позади.

Из-за COVID-19 мы все видели, насколько важны данные и насколько важно иметь правильную архитектуру и структуру данных. Данные не перестанут расти! более того, он будет продолжать бить рекорды потребления год за годом.

Эта задача заставляет нас предоставлять более автоматическое и масштабируемое решение для нашей организации, перемещая наши службы обработки данных в Kubernetes, где происходит все волшебство. Kubernetes предлагает операторов, которые помогут вам управлять операциями как первого, так и второго дня, используя проверки работоспособности, сохранение состояния, автопилоты и т. Д.

В этой демонстрации я хотел бы показать вам, как можно запускать автоматические конвейеры данных, используя операторы, которые предлагаются в каждой установке Openshift через Operator Hub. Я решил взять Real-Time BI в качестве примера использования и построить на его основе эту демонстрацию. Эта демонстрация использует механизмы Openshift для создания масштабируемых конвейеров данных на основе Kubernetes и использует все стандартные продукты де-факто для выполнения этих требований. Все операторы будут развернуты в кластере Openshift 4, а контейнерное хранилище Openshift предоставит базовое решение для хранения как объектных, так и блочных протоколов.

В этой демонстрации развертывается приложение для потоковой передачи музыки, которое генерирует события в зависимости от поведения пользователей (будет объяснено далее).

Используя эти генерируемые данные, мы можем использовать инструменты с открытым исходным кодом для создания наших информационных панелей и визуализаций, а также предоставить нашим заинтересованным сторонам, как нашим специалистам по данным, более надежный способ визуализации важных данных.

ЭТО НАПРЯМУЮ ВЛИЯЕТ НА БИЗНЕС-ЛОГИКУ!

Теперь, когда сообщение ясно, давайте начнем играть!

Предпосылки

  • Работающий кластер Ceph (›RHCS4)
  • Работающий кластер Openshift 4 (›4.6.8)
  • Кластер OCS во внешнем режиме для предоставления как объектного, так и блочного хранилища.

Установка

Создайте новый проект в своем кластере Openshift, в котором должны быть развернуты все ресурсы:

$ oc new-project data-engineering-demo

Установите оба оператора AMQ Streams и Presto, поскольку они понадобятся нам для создания соответствующих ресурсов. перейдите в раздел Operator Hub на левой панели, чтобы установить:

Клонируйте необходимый репозиторий git, чтобы вы могли развернуть демо:

$ git clone https://github.com/shonpaz123/cephdemos.git

Измените свой каталог на демонстрационный каталог, где находятся все манифесты:

$ cd cephdemos/data-engineering-pipeline-demo-ocp

Подготовка информационных сервисов

Подготовка нашей среды S3

Теперь, когда у нас есть все предпосылки, давайте начнем с создания необходимых ресурсов S3. Поскольку мы используем внешний кластер Ceph, мы должны создать необходимого пользователя S3 для взаимодействия с кластером. Кроме того, нам нужно создать корзину S3, чтобы Kafka мог экспортировать наши события в озеро данных. Давайте создадим эти ресурсы:

$ cd 01-ocs-external-ceph && ./run.sh && cd ..

Ожидаемый результат:

{
    "user_id": "data-engineering-demo",
    "display_name": "data-engineering-demo",
    "email": "",
    "suspended": 0,
    "max_buckets": 1000,
    "subusers": [],
    "keys": [
        {
            "user": "data-engineering-demo",
            "access_key": "HC8V2PT7HX8ZFS8NQ37R",
            "secret_key": "Y6CENKXozDDikJHQgkbLFM38muKBnmWBsAA1DXyU"
        }
    .
    .
    .
}
make_bucket: music-chart-songs-store-changelog

Сценарий использует awscli для экспорта наших учетных данных в качестве переменных среды, чтобы мы могли правильно создать корзину. Убедитесь, что у вас есть доступ к URL-адресу конечной точки со всеми открытыми портами, чтобы этот сценарий работал правильно.

Развертывание Kafka new-ETL

Теперь, когда у нас есть готовый S3, нам нужно развернуть все необходимые ресурсы Kafka. В этом разделе мы развернем кластер Kafka с помощью оператора AMQ Streams, который предлагается через Openshift Operator Hub. Кроме того, мы развернем Kafka Topics и Kafka Connect, чтобы экспортировать все существующие тематические события в нашу корзину S3. Важный! убедитесь, что вы изменили URL-адрес конечной точки в соответствии с вашим, иначе Kafka Connect безуспешно попытается раскрыть события.

Запустите сценарий, чтобы создать эти ресурсы:

$ cd 02-kafka && ./run.sh && cd ..

Теперь давайте проверим, что все модули были успешно созданы:

$ oc get pods 
NAME                                                  READY   STATUS    RESTARTS   AGE
amq-streams-cluster-operator-v1.6.2-5b688f757-vhqcq   1/1     Running   0          7h35m
my-cluster-entity-operator-5dfbdc56bd-75bxj           3/3     Running   0          92s
my-cluster-kafka-0                                    1/1     Running   0          2m10s
my-cluster-kafka-1                                    1/1     Running   0          2m10s
my-cluster-kafka-2                                    1/1     Running   0          2m9s
my-cluster-zookeeper-0                                1/1     Running   0          2m42s
my-connect-cluster-connect-7bdc77f479-vwdbs           1/1     Running   0          71s
presto-operator-dbbc6b78f-m6p6l                       1/1     Running   0          7h30m

Мы видим, что все модули находятся в рабочем состоянии и прошли проверку, поэтому давайте проверим, что у нас есть необходимые темы:

$ oc get kt
NAME                                                          CLUSTER      PARTITIONS   REPLICATION FACTOR
connect-cluster-configs                                       my-cluster   1            3
connect-cluster-offsets                                       my-cluster   25           3
connect-cluster-status                                        my-cluster   5            3
consumer-offsets---84e7a678d08f4bd226872e5cdd4eb527fadc1c6a   my-cluster   50           3
music-chart-songs-store-changelog                             my-cluster   1            1
played-songs                                                  my-cluster   12           3
songs                                                         my-cluster   12           3

Эти темы будут использоваться нашим потоковым приложением для получения, преобразования и экспорта этих событий в правильном формате в нашу корзину S3. В конце концов, тема music-chart-songs-store-changelog будет содержать всю информацию с ее окончательной структурой, так что мы сможем запросить ее.

Запуск Presto для распределенных запросов

В этой демонстрации мы будем использовать возможность Presto запрашивать префиксы сегментов S3 (аналогично таблицам в реляционных базах данных). Presto необходимо создать схему, чтобы понять, какую файловую структуру он должен запрашивать. В нашем примере все события, экспортируемые в нашу корзину S3, будут выглядеть следующим образом:

{"count":7,"songName":"The Good The Bad And The Ugly"}

Каждый файл будет экспортирован со структурой JSON, содержащей две пары ключ-значение. Подчеркнем, что вы можете думать об этом как о таблице с двумя столбцами, где первый - count, а второй - songName, а все файлы, которые записываются в корзину, представляют собой просто строки с этой структурой.

Теперь, когда мы лучше понимаем нашу структуру данных, мы можем развернуть кластер Presto. Этот кластер создаст экземпляр куста для хранения метаданных схемы (с Postgres для хранения информации схемы) и кластер Presto, содержащий модули координатора и рабочих. Все эти ресурсы будут автоматически созданы оператором Presto, который также предлагается как часть Openshift Operator Hub.

Давайте запустим сценарий для создания этих ресурсов:

$ cd 04-presto && ./run.sh && cd ..

Теперь давайте проверим, что все модули были успешно созданы:

$ oc get pods | egrep -e "presto|postgres"
NAME                                                  READY   STATUS    RESTARTS   AGE
hive-metastore-presto-cluster-576b7bb848-7btlw        1/1     Running   0          15s
postgres-68d5445b7c-g9qkj                             1/1     Running   0          77s
presto-coordinator-presto-cluster-8f6cfd6dd-g9p4l     1/2     Running   0          15s
presto-operator-dbbc6b78f-m6p6l                       1/1     Running   0          7h33m
presto-worker-presto-cluster-5b87f7c988-cg9m6         1/1     Running   0          15s

Визуализация данных в реальном времени с помощью Superset

Superset - это инструмент визуализации, который может представлять визуализацию и информационные панели из многих ресурсов JDBC, таких как Presto, Postgres и т. Д. Поскольку у Presto нет реального пользовательского интерфейса, который дает нам возможность исследовать наши данные, управлять разрешениями и RBAC, мы используйте Superset.

Запустите сценарий, чтобы развернуть Superset в вашем кластере:

$ cd 05-superset && ./run.sh && cd ..

Теперь убедитесь, что все поды были успешно созданы:

$ oc get pods | grep superset
superset-1-deploy                                     0/1     Completed   0          72s
superset-1-g65xr                                      1/1     Running     0          67s
superset-db-init-6q75s                                0/1     Completed   0          71s

Хороший! все прошло хорошо!

Подготовка логики данных

После того, как у нас будут готовы все наши инфраструктурные сервисы, нам нужно создать логику данных, лежащую в основе нашего потокового приложения. Поскольку Presto запрашивает данные из нашего ведра S3, нам необходимо создать схему, которая позволит Presto знать, как он должен запрашивать наши данные, в виде таблицы, чтобы предоставить сведения о структуре.

Войдите в свой Presto Coordinator узел:

$ oc rsh $(oc get pods | grep coordinator | grep Running | awk '{print $1}')

Измените контекст для работы с каталогом ульев:

$ presto-cli --catalog hive

Создайте схему, которая сообщит Presto, что нужно использовать s3a коннектор для запроса данных из префикса корзины S3:

$ CREATE SCHEMA hive.songs WITH (location='s3a://music-chart-songs-store-changelog/music-chart-songs-store-changelog.json/');

Измените контекст схемы и создайте таблицу:

$ USE hive.songs;
$ CREATE TABLE songs (count int, songName varchar) WITH (format = 'json', external_location = 's3a://music-chart-songs-store-changelog/music-chart-songs-store-changelog.json/');

Обращать внимание! создание таблицы дает Presto фактическое знание структуры каждого файла, как мы видели в предыдущем разделе. Теперь попробуем запросить нашу корзину S3:

$ select * from songs;
 count | songname 
-------+----------
(0 rows)
Query 20210203_162730_00005_7hsqi, FINISHED, 1 node
Splits: 17 total, 17 done (100.00%)
1.01 [0 rows, 0B] [0 rows/s, 0B/s]

У нас нет данных, и все в порядке! мы не начали передавать какие-либо данные, но видим, что мы не получаем ошибок, что означает, что Presto может получить доступ к нашей службе S3.

Потоковая передача событий в реальном времени

Теперь, когда все ресурсы готовы к использованию, мы можем наконец развернуть наше потоковое приложение! Наше потоковое приложение на самом деле является производителем Kafka, который имитирует медиаплеер, у него есть заранее определенный список песен, которые случайным образом «проигрываются» нашим медиаплеером. Каждый раз, когда пользователь воспроизводит песню, событие отправляется в тему Kafka.

Затем мы используем Kafka Streams, чтобы преобразовать данные в желаемую структуру. Потоки будут принимать каждое событие, которое отправляется в Kafka, преобразовывать его и записывать в другую тему, где оно будет автоматически экспортировано в нашу корзину S3.

Запустим развертывание:

$ cd 03-music-chart-app && ./run.sh && cd ..

Давайте проверим, что все модули работают, модуль player-app - это наш медиаплеер, а модуль music-chart - это на самом деле модуль, содержащий всю логику Kafka Streams:

$ oc get pods | egrep -e "player|music"
music-chart-576857c7f8-7l65x                          1/1     Running     0          18s
player-app-79fb9cd54f-bhtl5                           1/1     Running     0          19s

Давайте посмотрим на player-app журналы:

$ oc logs player-app-79fb9cd54f-bhtl5
2021-02-03 16:28:41,970 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 1: The Good The Bad And The Ugly played.
2021-02-03 16:28:46,970 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 1: The Good The Bad And The Ugly played.
2021-02-03 16:28:51,970 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 2: Believe played.
2021-02-03 16:28:56,970 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 3: Still Loving You played.
2021-02-03 16:29:01,972 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 2: Believe played.
2021-02-03 16:29:06,970 INFO  [org.acm.PlaySongsGenerator] (RxComputationThreadPool-1) song 7: Fox On The Run played.

Мы видим, что данные записываются случайным образом, каждый раз, когда воспроизводится песня, в нашу тему Kafka отправляется событие. А теперь давайте взглянем на наши music-chart журналы:

$ oc logs music-chart-576857c7f8-7l65x 
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=1, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 8, PlayedSong [count=1, songName=Perfect]
[KTABLE-TOSTREAM-0000000006]: 3, PlayedSong [count=1, songName=Still Loving You]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=1, songName=The Good The Bad And The Ugly]
[KTABLE-TOSTREAM-0000000006]: 6, PlayedSong [count=1, songName=Into The Unknown]
[KTABLE-TOSTREAM-0000000006]: 3, PlayedSong [count=2, songName=Still Loving You]
[KTABLE-TOSTREAM-0000000006]: 5, PlayedSong [count=1, songName=Sometimes]
[KTABLE-TOSTREAM-0000000006]: 2, PlayedSong [count=2, songName=Believe]
[KTABLE-TOSTREAM-0000000006]: 1, PlayedSong [count=2, songName=The Good The Bad And The Ugly]

Мы видим, что данные успешно преобразуются, и что число счетчиков увеличивается по мере того, как пользователи воспроизводят больше песен.

Теперь нам нужно убедиться, что наш конвейер работает, поэтому давайте перейдем к нашему сервису S3, чтобы убедиться, что все события экспортируются успешно. для этого я использовал Sree в качестве браузера S3. Убедитесь, что вы используете правильные учетные данные и URL конечной точки:

Вернемся к нашему модулю координатора Presto и снова попробуем запросить наши данные:

$ presto> presto-cli --catalog hive
$ presto:songs> USE hive.songs;

Запустите SQL-запрос, чтобы получить наши данные:

$ select * from songs;
 count |           songname            
-------+-------------------------------
     1 | Bohemian Rhapsody             
     4 | Still Loving You              
     1 | The Good The Bad And The Ugly 
     3 | Believe                       
     1 | Perfect                       
     1 | Sometimes                     
     2 | The Good The Bad And The Ugly 
     2 | Bohemian Rhapsody             
     3 | Still Loving You              
     4 | Sometimes                     
     2 | Into The Unknown              
     4 | Believe                       
     4 | Into The Unknown              
     2 | Sometimes                     
     5 | Still Loving You              
     3 | The Good The Bad And The Ugly

Удивительный! Мы видим, что наши данные обновляются автоматически! попробуйте выполнить эту команду еще несколько раз, и вы увидите, что количество строк растет. Теперь, чтобы начать визуализацию наших данных, найдите маршрут Superset, по которому вы сможете войти в консоль:

$ oc get route
NAME       HOST/PORT                                            PATH   SERVICES   PORT       TERMINATION   WILDCARD
superset   superset-data-engineering-demo.apps.ocp.spaz.local          superset   8088-tcp                 None

Когда мы дойдем до нашей консоли Superset (войдите с помощью admin:admin), мы увидим, что можем перейти к Manage Databases - ›Create Database, чтобы создать наше соединение Preto, убедитесь, что вы указали имя службы Presto ClusterIP, в конце убедитесь, что вы проверили свое соединение :

Теперь, когда у нас появился более удобный способ запроса данных, давайте попробуем немного изучить наши данные. Перейдите к SQL Lab и убедитесь, что вы можете выполнить наш предыдущий запрос. Чтобы подчеркнуть это, посмотрите следующий запрос, который покажет, сколько раз была проиграна каждая песня:

Хорошо! мы можем запрашивать данные! не стесняйтесь создавать все свои визуализации и информационные панели. В качестве примера я создал информационную панель, которая изменяется в реальном времени, так как каждое обновление панели фактически снова запрашивает все данные из Presto:

Вывод

В этой демонстрации мы увидели, как мы можем использовать продукты с открытым исходным кодом для запуска автоматических конвейеров данных, и все это запланировано на Openshift. Поскольку Kubernetes бьет рекорды по внедрению, организациям следует подумать о переносе своих рабочих нагрузок в Kubernetes, чтобы их сервисы данных не остались позади. Используя Red Hat и партнеров-операторов, Openshift предлагает управление вашими службами данных как на день 1, так и на день 2.

Спасибо, что прочитали этот пост в блоге, увидимся в следующий раз :)