Как написать распределенный потребитель Kafka на Python с помощью Ray
Вступление
В этом блоге я попытаюсь передать суть -
- несколько компонентов Ray, которые мы будем использовать
- затем поработайте над тем, как мы можем использовать Ray для создания распределенных потребителей Kafka для нашей обработки Stream. Предоставление хотя бы одной гарантии.
- и, наконец, предоставить REST API для управления этими потребителями.
Почему Ray для потоковой обработки?
Многие команды ежедневно используют python для разных сценариев использования. Сегодня Python - один из наиболее часто используемых языков.
Существуют и другие механизмы распределенных вычислений, такие как Apache Spark и Apache Flink, которые предоставляют интерфейс Python, но кривая обучения очень крутая, и вам необходимо создавать специализированные наборы данных, такие как RDD / DataFrames и т. Д., А операции и концепции вращаются вокруг этих конструкций.
Когда я впервые использовал Ray, это выглядело более питоническим способом написания распределенных систем, и вам не нужно много учиться, чтобы использовать Ray. Вы можете легко преобразовать существующие функции и классы Python для работы в распределенном режиме без написания нового или изменения существующего кода.
Краткое введение в Ray
Перейдите к разделу «Создание потребителей Kafka», если вы уже знакомы с Ray
Пожалуйста, просмотрите документацию Ray для более подробной информации.
Ray предоставляет простые API-интерфейсы для запуска ваших функций и классов в кластере узлов.
Терминология лучей
- Задачи - функции Python, выполняемые в распределенной установке.
- Актеры - классы Python, которые работают в распределенной установке.
- Ссылка на объект - это похоже на Futures или Promises в Javascript. Вы можете вызвать get для этих ссылок, чтобы получить данные из выполнений задач.
Запуск функций в распределенной установке
Чтобы функция работала в распределенном режиме, вам нужно только добавить декоратор @ray.remote()
поверх вашей функции.
В приведенном ниже коде мы вычисляем факториал чисел от 10 до 16. Вычисление отдельных чисел происходит параллельно в 7 различных процессах, созданных Рэем.
В приведенном выше коде:
- мы запускаем Ray с помощью
ray.init().
, который запускает планировщики, создает хранилище объектов для хранения акторов и задач и выполняет множество других вещей. - У нас есть факториальная функция, которая вычисляет факториал заданного числа. Мы украсили эту функцию символом
@ray.remote()
, который создает новую задачу Ray для запуска в распределенном режиме. max_retries
указывает лучу на повторное выполнение задачи 3 раза, если задача умирает до завершения. Задача может умереть, когда узел, выполняющий задачи, выключен или мертв.num_cpus
указывает лучу запустить эту задачу на 50% от 1 ЦП.- В следующем коде мы вызываем факториал от 10 до 16 и сохраняем фьючерсы в список. Позже мы вызываем
ray.get
, что позволяет нам дождаться завершения всех задачfactorial()
.
Результат будет выглядеть примерно так:
This cluster consists of 1 nodes in total 8.0 CPU resources in total (factorial pid=1155) calculating factorial of 15 (factorial pid=1155) Factorial of 15 = 1307674368000 (factorial pid=1154) calculating factorial of 16 (factorial pid=1154) Factorial of 16 = 20922789888000 (factorial pid=1148) calculating factorial of 10 (factorial pid=1148) Factorial of 10 = 3628800 (factorial pid=1149) calculating factorial of 14 (factorial pid=1149) Factorial of 14 = 87178291200 (factorial pid=1151) calculating factorial of 11 (factorial pid=1151) Factorial of 11 = 39916800 (factorial pid=1153) calculating factorial of 13 (factorial pid=1153) Factorial of 13 = 6227020800 (factorial pid=1152) calculating factorial of 12 (factorial pid=1152) Factorial of 12 = 479001600 Process finished with exit code 0
Как видите, все factorial()
задачи запущены в своих собственных процессах.
Та же самая настройка, описанная выше, может преобразоваться в настройку на основе классов (также известную как Актеры), только аннотируя класс с помощью @ray.remote()
, это создаст новый экземпляр рабочего, который работает в разных процессах, и мы можем вызывать функции-члены класса для создания удаленных задач для этих рабочих. .
Создание распределенных потребителей Kafka
Как вы видели в предыдущем разделе, мы могли удаленно запускать функции и функции-члены класса в разных процессах.
Мы будем использовать ту же концепцию для запуска наших потребителей в разных процессах на нескольких узлах.
В этом разделе я расскажу о следующей функции -
- Обеспечьте поддержку конфигурации для создания потребителей Kafka.
- Запустите Kafka Consumers в распределенном режиме.
- Предоставьте REST API для управления этими потребителями (запуск, остановка и т. Д.).
Мы используем клиент kafka-python для создания потребителей и fastapi для создания REST API для наших потребителей.
Делаем установку настраиваемой
Файл конфигурации потребителя JSON - в этом файле конфигурации потребителя будет размещен список конфигураций группы потребителей. Для каждой конфигурации в списке мы запустим группу потребителей с настроенным количеством рабочих / потребителей.
Для лучшей производительности оставьте
number_of_workers
равным количеству разделов в теме.
Предупреждение: если у вас 4 ядра, вы можете запустить до 4 рабочих / процессов, если каждый процесс занимает 1 ЦП.
Детали конфигурации -
Проверьте ser_des_util.py на предмет доступных сериализаторов и десериализаторов.
Stream Transformer
Создайте свой собственный Stream Transformer, расширив абстрактный класс StreamTransformer
Ниже показан один из таких преобразователей, который преобразует строковые сообщения из Kafka в JSON и создает объект SinkRecordDTO.
Обратите внимание, что здесь я не выполняю никаких действий в сообщении. Вы можете создать свой собственный StreamTransformer, который будет выполнять некоторую обработку.
Наконец, добавьте вновь созданный преобразователь потока в consumer_config.json в разделе sink_configs
section.
Авторы потоковой передачи
Авторы потоков - это группа классов, которые пишут в целевое хранилище. Например, чтобы записывать преобразованные события в Elasticsearch, вы можете создать свой собственный Stream Writer, который использует клиент ES для обновления индекса.
Чтобы создать средство записи потока, расширьте класс StreamWriter.
Например, я создал одну запись потока для печати ключей и сообщений в консоли.
Наконец, я создал класс задач-приемников, который будет действовать как оркестратор и выполнять следующую задачу:
- Преобразуйте данные
- Запись в хранилища данных с использованием предоставленных средств записи потоков
- Отправить в очередь недоставленных сообщений в случае сбоя
Запуск потребителей Kafka в распределенном режиме
Как мы видели в одном из предыдущих разделов, мы можем запускать функцию или функцию-член класса удаленно в различных процессах, украсив класс и функции с помощью @ray.remote()
В этом разделе мы будем работать над созданием удаленных сотрудников и менеджера, который управляет этими потребителями.
Вы можете найти полный код здесь.
Удаленный работник-потребитель
При такой настройке Ray создаст рабочий экземпляр ConsumerWorker
в отдельном процессе и будет запускать для них задачи, то есть вызывать для них run()
функцию.
Обратите внимание, что мы добавили в декоратор несколько дополнительных параметров:
@ray.remote(max_restarts=2, max_task_retries=2, num_cpus=WORKER_NUM_CPUS)
ray_restarts
указывает лучу перезапустить экземпляр рабочего максимум 2 раза, если рабочий умирает или погибает. В данном случае ConsumerWorker
- это наш рабочий экземпляр.
max_task_retries
указывает лучу перезапустить любую запущенную задачу / функцию максимум 2 раза, если рабочий умирает или убит. В этом случае функция run()
запускается повторно.
В приведенном выше работнике у нас есть -
- Создал новый Kafka Consumer в конструкторе
- Создана функция
run()
, которая выполняет:
- запускает бесконечный цикл для poll () для потребительских записей / сообщений
- отправляет записи на обработку, вызываяsink_task.process()
- фиксирует смещения брокерам, если сообщения обработано успешно.
- Останавливает воркер по запросу, прерывая цикл - Создает
stop_worker()
функцию, которая устанавливает флаг для остановки работающего работника.
Менеджер по работе с потребителями
Менеджер заботится о запуске и остановке рабочих потребителей по требованию.
Ключевые моменты, на которые следует обратить внимание в классе менеджера
- Он поддерживает контейнер участников / рабочих экземпляров лучей и вызывает их запуск / остановку.
- Предоставляет API-интерфейсы для запуска всех групп потребителей или определенной группы потребителей и запуска
n
работников в этих группах потребителей. - При создании воркера мы предусмотрели опцию -
max_concurrancy=2
. Это позволит нам вызватьstop()
в экземпляре worker, даже если задачаrun()
запущена.
По умолчанию экземпляр Worker выполняет задачи последовательно в одном потоке. - Использует
ray.kill(actor/worker instance)
, чтобы убить рабочий экземпляр после остановки.
API-интерфейсы Rest для управления рабочими-потребителями
Я использую Fast API для создания конечных точек для нашего управления работниками-потребителями и использую базовую аутентификацию для аутентификации запросов.
Обратите внимание, что в службе @app.on_event("startup")
of приложение создает и запускает все группы потребителей.
Запуск потребителей в удаленном кластере лучей в K8
Прочтите руководство по установке здесь, чтобы получить более подробные инструкции.
Чтобы подключиться к удаленному лучевому кластеру, нам нужно внести в наше приложение следующие изменения:
Instead of ray.init() change to: ---- if LOCAL_MODE == 'Y': ray.init() else: ray.init(address=RAY_HEAD_ADDRESS) ------ RAY_HEAD_ADDRESS is your remote ray cluster head address ex: ray://10.20.30.40:30001
Настройка кластера Ray в K8
Создайте ниже файл конфигурации YAML лучей:
В приведенной ниже конфигурации
- Мы создаем одну головную часть и две рабочие реплики.
- Головной узел занимает одно ядро и 512 МБ памяти - измените это по своему усмотрению.
- Рабочий узел занимает 0,5 ЦП и 512 МБ памяти - измените это по своему усмотрению.
- Предоставьте панель управления, головной узел луча и сервер Redis для общего доступа с помощью внешней службы K8 с использованием NodePort.
- Установите все зависимости кода в виде пакета как на головном, так и на рабочем узлах. Таким образом, головные и рабочие узлы Ray смогут найти эти модули.
pip install kafka-connect-dependency==0.1
Применить изменения:
kubectl apply -f ray-cluster-config.yaml -n ray
Запустить потребительское приложение
Обратите внимание, что ваше приложение не обязательно должно работать в кластере K8. Вы можете работать на любом сервере или даже в контейнере докеров или в модуле K8.
## to run outside container RAY_HEAD_ADDRESS=ray://192.168.64.3:30001 uvicorn src.event_consumer_app:app --port 8002
Вы даже можете запустить контейнер докеров. Соберите Dockerfile. Затем запустите изображение с переменными среды, передающими адрес головного узла луча и другие соответствующие переменные окружения.
docker run -e RAY_HEAD_ADDRESS=ray://<head node IP>:<port> -e LOCAL_MODE=N -dp 8002:8002 kafka-connect-ray
Это запустит сервер приложений и запустит 2 потребителя для группы потребителей some_consumer_group
, как показано ниже:
Запустите Rest API в Postman или вы можете просмотреть документацию Swagger по пути /docs
Резюме
В этом блоге я продемонстрировал способ создания распределенных потребителей Kafka с помощью Ray и управления жизненным циклом этих потребителей с помощью Rest API.
Как уже говорилось, эта установка легко настраивается, и вам нужно только создать свой собственный преобразователь и логику записи потока, чтобы заставить ее работать.