В этой статье мы собираемся создать службу очереди сообщений, используя Kafka и KafkaJS, что-то похожее на SQS, и мы позаботимся о том, чтобы сообщения обрабатывались ровно один раз в том порядке, в котором они отправляются. Мы также реализуем механизм повторной доставки, чтобы убедиться, что если что-то не работает в нашей бизнес-логике, мы можем снова добавить это в очередь.
Найдите исходный код на Github:https://github.com/icpsoni/kafka-message-queue
Предпосылки
1. Получить Кафку
Загрузите Kafka и распакуйте его здесь и перейдите в каталог.
$ tar -xzf <file_name>.tgz
$ cd <file_name>tec
2: Запустите среду Kafka
ПРИМЕЧАНИЕ. В вашей локальной среде должна быть установлена Java 8+.
Выполните следующие команды, чтобы запустить ZooKeeper:
# Start the ZooKeeper service
# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
Откройте другой сеанс терминала и запустите службу Kafka Broker, используя:
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
После успешного запуска всех служб базовая среда Kafka будет запущена и готова к использованию.
3: Создайте тему для хранения наших сообщений очереди
С помощью следующей команды мы создадим тему под названием «очередь сообщений» в нашей Кафке.
$ bin/kafka-topics.sh --create --topic message-queue --bootstrap-server localhost:9092
Для проверки созданной темы используйте эту команду.
$ bin/kafka-topics.sh --describe --topic message-queue --bootstrap-server localhost:9092
Вы можете создать тему с любым количеством разделов, здесь мы используем только 1 раздел.
Создание сервисов с помощью KafkaJS
Нам в основном нужны 3 вещи, чтобы наша система очередей работала идеально.
- Отправка событий в тему Kafka Queue.
- Подписка на тему и чтение Очередь сообщений.
- Обработка повторной доставки.
кафка-config.js
Следующий фрагмент содержит базовую конфигурацию Kafka, необходимую нам для нашей системы очередей.
// Using KafkaJs nodejs library import { Kafka } from 'kafkajs'; // kafka broker running on localhost:9092 default port const kafkaBroker = 'localhost:9092'; // kafka topic used for queue messages export const kafkaTopic = 'message-queue'; // kafka client with basic config export const KafkaClient = new Kafka({ brokers: [kafkaBroker] });
производитель.js
Этот сервис будет отправлять сообщения в тему Kafka, которую мы создали ранее. Мы можем использовать функцию sendMessageToQueue
и передать объект сообщения, который нужно отправить в Kafka.
import { KafkaClient } from './index.js'; import { kafkaTopic } from "./kafka-config"; export const sendMessageToQueue = async (message) => { const producer = KafkaClient.producer(); await producer.connect(); await producer.send({ topic: kafkaTopic, messages: [ { value: message // Your message data goes here } ] }); // Disconnect producer once message sending is done. await producer.disconnect(); };
сообщение-queue.js
Это наш основной сервис, который занимается получением сообщений из очереди Kafka, выполнением бизнес-логики и обработкой повторной доставки, если что-то пойдет не так в бизнес-логике.
import { sendMessageToQueue } from "./producer.js"; import { KafkaClient, kafkaTopic, kafkaGroupId } from "./kafka-config.js"; export const consumeMessage = async () => { // Creating a Consumer Instance const consumer = KafkaClient.consumer({ groupId: kafkaGroupId, }); await consumer.connect(); // Subscribing to out Kafka topic await consumer.subscribe({ topic: kafkaTopic, fromBeginning: true}); await consumer.run({ autoCommit: false, // It won't commit message acknowledge to kafka until we don't do manually eachMessage: async ({ topic, partition, message}) => { const messageData = message.value.toString(); try { // Do the business Logic console.info('Received Message', messageData); } catch (error) { console.error(error); // Resending message to kafka queue for redelivery await sendMessageToQueue(messageData); } finally { const offset = +message.offset + 1; // Committing the message offset to Kafka await consumer.commitOffsets([{topic: kafkaTopic, partition, offset: offset.toString()}]); } } }); };
Загрузить исходный код
Дополнительно: он также содержит файлы тестовых образцов.
Ссылки: