Разверните кластер Kafka в контейнере Docker и узнайте, как создавать темы, создавать и использовать сообщения.

Вступление

Управляемая событиями архитектура - одна из современных архитектур, которые сегодня реализованы во многих приложениях. За последние несколько лет для поддержки такой архитектуры было разработано множество инструментов, например AWS SNS, SQS, RabbitMQ и Apache Kafka.

Основная цель архитектуры, управляемой событиями, - разделить ваши службы с помощью очереди сообщений или очередей, в которых службы будут публиковать, опрашивать или потреблять. Таким образом, мы можем довольно легко заменить производителя или потребителя, поскольку они отделены друг от друга.

В этой части мы собираемся настроить локальный Kafka Docker и использовать его инструменты интерфейса командной строки для выполнения основных операций, включая создание темы, публикацию сообщений и их использование.

Мы будем использовать следующие образы Docker:

  • confluentinc/cp-zookeeper:5.4.0
  • confluentinc/cp-server:5.4.0
  • confluentinc/cp-kafka:5.4.0

Предварительное условие

Единственное предварительное условие для выполнения этого руководства - это Docker. Вы можете следовать официальным инструкциям по установке.

Написать файл Docker Compose

Первым делом мы создадим docker-compose.yml файл. Преимущество использования составного файла состоит в том, что мы можем сгруппировать вместе сервисы, составляющие наше приложение как таковое. Затем мы можем развернуть все контейнеры, определенные в этом файле, с помощью одной команды: docker-compose up.

Альтернативой было бы использование команды docker run, но это означало бы, что нам нужно выполнить ее три раза, по одному для каждого образа Docker, из которого мы хотим развернуть контейнер.

Как упоминалось во введении, у нас будет работать три контейнера Docker. Итак, наш файл compose будет состоять из трех сервисов со следующими именами: zookeeper, broker и kafka-tools.

Наш локальный кластер Kafka состоит из контейнеров zookeeper и broker. broker - это Apache Kafka, основанный на Apache Zookeeper, поэтому мы указываем зависимость в файле compose.

services:
  ...
  ...
  broker:
    ...
    ...
    depends_on:
      - zookeeper

Наконец, у нас есть контейнер kafka-tools, который содержит инструменты командной строки, которые мы можем использовать для взаимодействия с broker.

Обратите внимание, что для контейнера kafka-tools мы добавили network_mode: "host". Это значит, что внутри контейнера localhost интерпретируется как хост Docker на нашем компьютере.

Это сделано для того, чтобы мы могли легко подключиться к broker, поскольку он открывается на localhost:9092. Позже мы узнаем об этом подробнее.

Запустите контейнеры

Мы собираемся запустить три контейнера, которые мы определили в файле docker-compose.yml. Перейдите в свой терминал, перейдите в каталог, в котором вы создали файл docker-compose.yml, и выполните следующую команду.

~/demo/kafka-local ❯ ls -l
total 8
-rw-r--r--  1 billyde  staff  1347 12 Feb 23:06 docker-compose.yml
~/demo/kafka-local ❯ docker-compose up -d
Creating network "kafka-local_default" with the default driver
Creating kafka-tools ... done
Creating zookeeper   ... done
Creating broker      ... done

Наконец, чтобы проверить, все ли контейнеры запущены, выполните эту команду.

~/demo/kafka-local ❯ docker ps
CONTAINER ID        IMAGE                             COMMAND                  CREATED             STATUS              PORTS                                        NAMES
748c5da81038        confluentinc/cp-server:5.4.0      "/etc/confluent/dock…"   5 seconds ago       Up 4 seconds        0.0.0.0:9092->9092/tcp                       broker
5044ae334235        confluentinc/cp-zookeeper:5.4.0   "/etc/confluent/dock…"   5 seconds ago       Up 4 seconds        2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp   zookeeper
ff1f9070dc42        confluentinc/cp-kafka:5.4.0       "tail -f /dev/null"      5 seconds ago       Up 4 seconds        9092/tcp                                     kafka-tools

Если вы видите, что все контейнеры перечислены там и их STATUS = Up xx seconds, то все в порядке. Счастливые дни!

Создать тему Kafka

Как и любой брокер сообщений, Kafka работает, отправляя сообщения по темам. Каждая тема обычно указывает на конкретное событие, например, тему активности пользователя, в которой сообщения будут связаны с действиями пользователя.

Давайте посмотрим, как мы можем создать тему с помощью инструментов cli.

Во-первых, нам нужно попасть в контейнер kafka-tools, потому что именно там находятся наши инструменты Kafka cli. Для этого выполните следующую команду на своем терминале.

~/demo/kafka-local ❯ docker exec -ti kafka-tools bash
root@kafka-tools:/#

Если вы видите root@kafka-tools:/#, вы в деле! Давайте создадим тему и назовем ее to-do-list, потому что она будет содержать список того, что нам нужно сделать.

root@kafka-tools:/# kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic to-do-list
root@kafka-tools:/#

На самом деле команда ничего не возвращает, и ничего не значит хорошо. Чтобы проверить, действительно ли тема создана или нет, мы можем выполнить следующую команду.

root@kafka-tools:/# kafka-topics --list --bootstrap-server localhost:9092
__confluent.support.metrics
_confluent-license
_confluent-metrics
to-do-list

Отличный материал, мы видим, что наша тема to-do-list действительно была создана. Для получения более подробной информации по теме мы можем выполнить следующую команду.

root@kafka-tools:/#  kafka-topics --describe --bootstrap-server localhost:9092 --topic to-do-list
Topic: to-do-list PartitionCount: 2 ReplicationFactor: 1 Configs:
 Topic: to-do-list Partition: 0 Leader: 1 Replicas: 1 Isr: 1
 Topic: to-do-list Partition: 1 Leader: 1 Replicas: 1 Isr: 1

Хорошо, прежде чем перейти к следующему разделу, давайте обсудим здесь несколько вещей.

  • --bootstrap-server localhost:9092: Помните это из предыдущего раздела? Если мы не добавим network_mode: "host" в службу kafka-tools, она не сможет подключиться к broker. Вместо этого мы увидим вот что.
[2020-02-12 12:40:27,990] WARN [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
  • --replication-factor 1: указывает, сколько реплик разделов темы мы хотим создать. Репликация предназначена для обеспечения отказоустойчивости. Например, если мы укажем коэффициент равным 3, это означает, что будет три сервера, которые будут содержать одни и те же данные (тема, ее разделы и ее сообщения). Один из трех серверов будет лидером, а остальные - последователями.
  • --partitions 2: Имя информативно, но в основном это позволяет нам установить, сколько разделов мы хотим иметь для нашей темы. Всякий раз, когда наш производитель Kafka отправляет сообщение или запись в эту тему, запись будет присутствовать только в одном из доступных разделов. Размещение записи в разделе зависит от ключа записи.
  • --topic to-do-list: Как вы уже догадались, этот аргумент определяет название темы, которую мы хотим создать.

Отправлять сообщения в тему

Наша тема готова, а это значит, что мы можем добавить в нее несколько сообщений. Для этого мы будем использовать kafka-console-producer. Давайте посмотрим, как мы можем отправлять сообщения типа "ключ-значение".

root@kafka-tools:/# kafka-console-producer --broker-list localhost:9092 --topic to-do-list --property "parse.key=true" --property "key.separator=:"
>1:Wash dishes
>2:Clean bathroom
>3:Mop living room

Параметр --property parse.key=true сообщает производителю, что мы хотим отправить сообщение с ключом. Кроме того, нам также необходимо сообщить производителю, какой разделитель будет использоваться для отделения ключа от значения (или самого сообщения), которое задается этой опцией --property "key.separator=:".

Теперь мы можем закрыть производителя, когда закончим отправку сообщений. Просто нажмите ctrl + c на клавиатуре, чтобы завершить его.

Потребляйте сообщения из темы

Мы опубликовали несколько сообщений в предыдущей теме, так что давайте посмотрим, сможем ли мы их использовать. Для этого мы будем использовать kafka-console-consumer.

root@kafka-tools:/# kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic to-do-list --property "print.key=true"
1 Wash dishes
3 Mop living room
2 Clean bathroom

Идеально! Мы можем увидеть все три сообщения, которые мы создали в предыдущем разделе.

Несколько замечаний:

  • --from-beginning: Потребитель Kafka отслеживает потребление сообщений по смещению. Этот параметр указывает потребителю начать потребление с самого раннего доступного смещения, если у него еще нет установленного смещения (а в данном случае его нет).
  • --property "print.key=true": этот параметр указывает потребителю выводить ключ сообщений на консоль. Если не указано иное, потребитель просто выведет только сообщения.

То же, что и производитель, чтобы завершить работу потребителя, просто нажмите ctrl + c.

Заворачивать

В этом руководстве мы узнали, как развернуть локальный кластер Kafka в контейнере Docker. Кроме того, мы также выполнили некоторые базовые операции, включая создание темы, а также создание и использование сообщений - все это было сделано с помощью инструментов командной строки Kafka.

К настоящему времени мы все должны иметь базовое представление о том, как работает Kafka. Я действительно надеюсь, что это вдохновит вас на дальнейшее изучение и создание событийно-ориентированных приложений с помощью Kafka.

В приложении Kafka нередко используется DynamoDB. По этой причине вас также может заинтересовать настройка локального экземпляра DynamoDB. Вы можете ознакомиться с этим руководством для этого.

Также ознакомьтесь с Avro и реестром схем. Это потрясающие инструменты, которые позволят вам определять схемы, которые сообщают вашим производителям и потребителям, как сериализовать и десериализовать (соответственно) сообщения Kafka из и в ваши POJO или классы данных.

Ссылка

Docker создает файл https://github.com/confluentinc/examples/blob/5.4.0-post/cp-all-in-one/docker-compose.yml

Официальный документ Kafka https://kafka.apache.org/quickstart