В этой статье мы собираемся создать службу очереди сообщений, используя Kafka и KafkaJS, что-то похожее на SQS, и мы позаботимся о том, чтобы сообщения обрабатывались ровно один раз в том порядке, в котором они отправляются. Мы также реализуем механизм повторной доставки, чтобы убедиться, что если что-то не работает в нашей бизнес-логике, мы можем снова добавить это в очередь.

Найдите исходный код на Github:https://github.com/icpsoni/kafka-message-queue

Предпосылки

1. Получить Кафку

Загрузите Kafka и распакуйте его здесь и перейдите в каталог.

$ tar -xzf <file_name>.tgz
$ cd <file_name>tec

2: Запустите среду Kafka

ПРИМЕЧАНИЕ. В вашей локальной среде должна быть установлена ​​Java 8+.

Выполните следующие команды, чтобы запустить ZooKeeper:

# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties

Откройте другой сеанс терминала и запустите службу Kafka Broker, используя:

# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties

После успешного запуска всех служб базовая среда Kafka будет запущена и готова к использованию.

3: Создайте тему для хранения наших сообщений очереди

С помощью следующей команды мы создадим тему под названием «очередь сообщений» в нашей Кафке.

$ bin/kafka-topics.sh --create --topic message-queue --bootstrap-server localhost:9092

Для проверки созданной темы используйте эту команду.

$ bin/kafka-topics.sh --describe --topic message-queue --bootstrap-server localhost:9092

Вы можете создать тему с любым количеством разделов, здесь мы используем только 1 раздел.

Создание сервисов с помощью KafkaJS

Нам в основном нужны 3 вещи, чтобы наша система очередей работала идеально.

  1. Отправка событий в тему Kafka Queue.
  2. Подписка на тему и чтение Очередь сообщений.
  3. Обработка повторной доставки.

кафка-config.js

Следующий фрагмент содержит базовую конфигурацию Kafka, необходимую нам для нашей системы очередей.

// Using KafkaJs nodejs library
import { Kafka } from 'kafkajs';

// kafka broker running on localhost:9092 default port
const kafkaBroker = 'localhost:9092';

// kafka topic used for queue messages
export const kafkaTopic = 'message-queue';

// kafka client with basic config
export const KafkaClient = new Kafka({
  brokers: [kafkaBroker]
});

производитель.js

Этот сервис будет отправлять сообщения в тему Kafka, которую мы создали ранее. Мы можем использовать функцию sendMessageToQueue и передать объект сообщения, который нужно отправить в Kafka.

import { KafkaClient } from './index.js';
import { kafkaTopic } from "./kafka-config";

export const sendMessageToQueue = async (message) => {
  const producer = KafkaClient.producer();
  await producer.connect();
  await producer.send({
    topic: kafkaTopic,
    messages: [
      {
        value: message // Your message data goes here
      }
    ]
  });
  // Disconnect producer once message sending is done.
  await producer.disconnect();
};

сообщение-queue.js

Это наш основной сервис, который занимается получением сообщений из очереди Kafka, выполнением бизнес-логики и обработкой повторной доставки, если что-то пойдет не так в бизнес-логике.

import { sendMessageToQueue } from "./producer.js";
import { KafkaClient, kafkaTopic, kafkaGroupId } from "./kafka-config.js";

export const consumeMessage = async () => {
  // Creating a Consumer Instance
  const consumer = KafkaClient.consumer({
    groupId: kafkaGroupId,
  });

  await consumer.connect();
  // Subscribing to out Kafka topic
  await consumer.subscribe({ topic: kafkaTopic, fromBeginning: true});

  await consumer.run({
    autoCommit: false, // It won't commit message acknowledge to kafka until we don't do manually
    eachMessage: async ({ topic, partition, message}) => {
      const messageData = message.value.toString();
      try {
        // Do the business Logic
        console.info('Received Message', messageData);
      } catch (error) {
        console.error(error);
        // Resending message to kafka queue for redelivery
        await sendMessageToQueue(messageData);
      } finally {
        const offset = +message.offset + 1;
        // Committing the message offset to Kafka
        await consumer.commitOffsets([{topic: kafkaTopic, partition, offset: offset.toString()}]);
      }
    }
  });
};

Загрузить исходный код

Дополнительно: он также содержит файлы тестовых образцов.



Ссылки:

  1. https://kafka.apache.org/quickstart
  2. https://kafka.js.org/docs/getting-started