Как написать распределенный потребитель Kafka на Python с помощью Ray

Вступление

В этом блоге я попытаюсь передать суть -

  • несколько компонентов Ray, которые мы будем использовать
  • затем поработайте над тем, как мы можем использовать Ray для создания распределенных потребителей Kafka для нашей обработки Stream. Предоставление хотя бы одной гарантии.
  • и, наконец, предоставить REST API для управления этими потребителями.

Почему Ray для потоковой обработки?

Многие команды ежедневно используют python для разных сценариев использования. Сегодня Python - один из наиболее часто используемых языков.

Существуют и другие механизмы распределенных вычислений, такие как Apache Spark и Apache Flink, которые предоставляют интерфейс Python, но кривая обучения очень крутая, и вам необходимо создавать специализированные наборы данных, такие как RDD / DataFrames и т. Д., А операции и концепции вращаются вокруг этих конструкций.

Когда я впервые использовал Ray, это выглядело более питоническим способом написания распределенных систем, и вам не нужно много учиться, чтобы использовать Ray. Вы можете легко преобразовать существующие функции и классы Python для работы в распределенном режиме без написания нового или изменения существующего кода.

Краткое введение в Ray

Перейдите к разделу «Создание потребителей Kafka», если вы уже знакомы с Ray

Пожалуйста, просмотрите документацию Ray для более подробной информации.

Ray предоставляет простые API-интерфейсы для запуска ваших функций и классов в кластере узлов.

Терминология лучей

  • Задачи - функции Python, выполняемые в распределенной установке.
  • Актеры - классы Python, которые работают в распределенной установке.
  • Ссылка на объект - это похоже на Futures или Promises в Javascript. Вы можете вызвать get для этих ссылок, чтобы получить данные из выполнений задач.

Запуск функций в распределенной установке

Чтобы функция работала в распределенном режиме, вам нужно только добавить декоратор @ray.remote() поверх вашей функции.

В приведенном ниже коде мы вычисляем факториал чисел от 10 до 16. Вычисление отдельных чисел происходит параллельно в 7 различных процессах, созданных Рэем.

В приведенном выше коде:

  • мы запускаем Ray с помощью ray.init()., который запускает планировщики, создает хранилище объектов для хранения акторов и задач и выполняет множество других вещей.
  • У нас есть факториальная функция, которая вычисляет факториал заданного числа. Мы украсили эту функцию символом @ray.remote(), который создает новую задачу Ray для запуска в распределенном режиме.
  • max_retries указывает лучу на повторное выполнение задачи 3 раза, если задача умирает до завершения. Задача может умереть, когда узел, выполняющий задачи, выключен или мертв.
  • num_cpus указывает лучу запустить эту задачу на 50% от 1 ЦП.
  • В следующем коде мы вызываем факториал от 10 до 16 и сохраняем фьючерсы в список. Позже мы вызываем ray.get, что позволяет нам дождаться завершения всех задач factorial().

Результат будет выглядеть примерно так:

This cluster consists of
    1 nodes in total
    8.0 CPU resources in total
(factorial pid=1155) calculating factorial of 15
(factorial pid=1155) Factorial of 15 = 1307674368000
(factorial pid=1154) calculating factorial of 16
(factorial pid=1154) Factorial of 16 = 20922789888000
(factorial pid=1148) calculating factorial of 10
(factorial pid=1148) Factorial of 10 = 3628800
(factorial pid=1149) calculating factorial of 14
(factorial pid=1149) Factorial of 14 = 87178291200
(factorial pid=1151) calculating factorial of 11
(factorial pid=1151) Factorial of 11 = 39916800
(factorial pid=1153) calculating factorial of 13
(factorial pid=1153) Factorial of 13 = 6227020800
(factorial pid=1152) calculating factorial of 12
(factorial pid=1152) Factorial of 12 = 479001600
Process finished with exit code 0

Как видите, все factorial() задачи запущены в своих собственных процессах.

Та же самая настройка, описанная выше, может преобразоваться в настройку на основе классов (также известную как Актеры), только аннотируя класс с помощью @ray.remote(), это создаст новый экземпляр рабочего, который работает в разных процессах, и мы можем вызывать функции-члены класса для создания удаленных задач для этих рабочих. .

Создание распределенных потребителей Kafka

Как вы видели в предыдущем разделе, мы могли удаленно запускать функции и функции-члены класса в разных процессах.
Мы будем использовать ту же концепцию для запуска наших потребителей в разных процессах на нескольких узлах.

В этом разделе я расскажу о следующей функции -

  1. Обеспечьте поддержку конфигурации для создания потребителей Kafka.
  2. Запустите Kafka Consumers в распределенном режиме.
  3. Предоставьте REST API для управления этими потребителями (запуск, остановка и т. Д.).

Мы используем клиент kafka-python для создания потребителей и fastapi для создания REST API для наших потребителей.

Делаем установку настраиваемой

Файл конфигурации потребителя JSON - в этом файле конфигурации потребителя будет размещен список конфигураций группы потребителей. Для каждой конфигурации в списке мы запустим группу потребителей с настроенным количеством рабочих / потребителей.

Для лучшей производительности оставьте number_of_workers равным количеству разделов в теме.

Предупреждение: если у вас 4 ядра, вы можете запустить до 4 рабочих / процессов, если каждый процесс занимает 1 ЦП.

Детали конфигурации -

Проверьте ser_des_util.py на предмет доступных сериализаторов и десериализаторов.

Stream Transformer

Создайте свой собственный Stream Transformer, расширив абстрактный класс StreamTransformer

Ниже показан один из таких преобразователей, который преобразует строковые сообщения из Kafka в JSON и создает объект SinkRecordDTO.
Обратите внимание, что здесь я не выполняю никаких действий в сообщении. Вы можете создать свой собственный StreamTransformer, который будет выполнять некоторую обработку.

Наконец, добавьте вновь созданный преобразователь потока в consumer_config.json в разделе sink_configssection.

Авторы потоковой передачи

Авторы потоков - это группа классов, которые пишут в целевое хранилище. Например, чтобы записывать преобразованные события в Elasticsearch, вы можете создать свой собственный Stream Writer, который использует клиент ES для обновления индекса.

Чтобы создать средство записи потока, расширьте класс StreamWriter.

Например, я создал одну запись потока для печати ключей и сообщений в консоли.

Наконец, я создал класс задач-приемников, который будет действовать как оркестратор и выполнять следующую задачу:

  1. Преобразуйте данные
  2. Запись в хранилища данных с использованием предоставленных средств записи потоков
  3. Отправить в очередь недоставленных сообщений в случае сбоя

Запуск потребителей Kafka в распределенном режиме

Как мы видели в одном из предыдущих разделов, мы можем запускать функцию или функцию-член класса удаленно в различных процессах, украсив класс и функции с помощью @ray.remote()

В этом разделе мы будем работать над созданием удаленных сотрудников и менеджера, который управляет этими потребителями.

Вы можете найти полный код здесь.

Удаленный работник-потребитель

При такой настройке Ray создаст рабочий экземпляр ConsumerWorker в отдельном процессе и будет запускать для них задачи, то есть вызывать для них run() функцию.

Обратите внимание, что мы добавили в декоратор несколько дополнительных параметров:

@ray.remote(max_restarts=2, max_task_retries=2, num_cpus=WORKER_NUM_CPUS)

ray_restarts указывает лучу перезапустить экземпляр рабочего максимум 2 раза, если рабочий умирает или погибает. В данном случае ConsumerWorker - это наш рабочий экземпляр.

max_task_retries указывает лучу перезапустить любую запущенную задачу / функцию максимум 2 раза, если рабочий умирает или убит. В этом случае функция run() запускается повторно.

В приведенном выше работнике у нас есть -

  1. Создал новый Kafka Consumer в конструкторе
  2. Создана функция run(), которая выполняет:
    - запускает бесконечный цикл для poll () для потребительских записей / сообщений
    - отправляет записи на обработку, вызывая sink_task.process()
    - фиксирует смещения брокерам, если сообщения обработано успешно.
    - Останавливает воркер по запросу, прерывая цикл
  3. Создает stop_worker() функцию, которая устанавливает флаг для остановки работающего работника.

Менеджер по работе с потребителями

Менеджер заботится о запуске и остановке рабочих потребителей по требованию.

Ключевые моменты, на которые следует обратить внимание в классе менеджера

  • Он поддерживает контейнер участников / рабочих экземпляров лучей и вызывает их запуск / остановку.
  • Предоставляет API-интерфейсы для запуска всех групп потребителей или определенной группы потребителей и запуска n работников в этих группах потребителей.
  • При создании воркера мы предусмотрели опцию -max_concurrancy=2. Это позволит нам вызвать stop() в экземпляре worker, даже если задача run() запущена.
    По умолчанию экземпляр Worker выполняет задачи последовательно в одном потоке.
  • Использует ray.kill(actor/worker instance), чтобы убить рабочий экземпляр после остановки.

API-интерфейсы Rest для управления рабочими-потребителями

Я использую Fast API для создания конечных точек для нашего управления работниками-потребителями и использую базовую аутентификацию для аутентификации запросов.

Обратите внимание, что в службе @app.on_event("startup")of приложение создает и запускает все группы потребителей.

Запуск потребителей в удаленном кластере лучей в K8

Прочтите руководство по установке здесь, чтобы получить более подробные инструкции.

Чтобы подключиться к удаленному лучевому кластеру, нам нужно внести в наше приложение следующие изменения:

Instead of ray.init() change to:
----
if LOCAL_MODE == 'Y':
    ray.init()
else:
    ray.init(address=RAY_HEAD_ADDRESS)
------
RAY_HEAD_ADDRESS is your remote ray cluster head address ex:  ray://10.20.30.40:30001

Настройка кластера Ray в K8

Создайте ниже файл конфигурации YAML лучей:

В приведенной ниже конфигурации

  • Мы создаем одну головную часть и две рабочие реплики.
  • Головной узел занимает одно ядро ​​и 512 МБ памяти - измените это по своему усмотрению.
  • Рабочий узел занимает 0,5 ЦП и 512 МБ памяти - измените это по своему усмотрению.
  • Предоставьте панель управления, головной узел луча и сервер Redis для общего доступа с помощью внешней службы K8 с использованием NodePort.
  • Установите все зависимости кода в виде пакета как на головном, так и на рабочем узлах. Таким образом, головные и рабочие узлы Ray смогут найти эти модули. pip install kafka-connect-dependency==0.1

Применить изменения:

kubectl apply -f ray-cluster-config.yaml -n ray

Запустить потребительское приложение

Обратите внимание, что ваше приложение не обязательно должно работать в кластере K8. Вы можете работать на любом сервере или даже в контейнере докеров или в модуле K8.

## to run outside container
RAY_HEAD_ADDRESS=ray://192.168.64.3:30001 uvicorn src.event_consumer_app:app --port 8002

Вы даже можете запустить контейнер докеров. Соберите Dockerfile. Затем запустите изображение с переменными среды, передающими адрес головного узла луча и другие соответствующие переменные окружения.

docker run -e RAY_HEAD_ADDRESS=ray://<head node IP>:<port> -e LOCAL_MODE=N  -dp 8002:8002 kafka-connect-ray

Это запустит сервер приложений и запустит 2 потребителя для группы потребителей some_consumer_group, как показано ниже:

Запустите Rest API в Postman или вы можете просмотреть документацию Swagger по пути /docs

Резюме

В этом блоге я продемонстрировал способ создания распределенных потребителей Kafka с помощью Ray и управления жизненным циклом этих потребителей с помощью Rest API.

Как уже говорилось, эта установка легко настраивается, и вам нужно только создать свой собственный преобразователь и логику записи потока, чтобы заставить ее работать.

использованная литература