Публикация и использование сообщений с помощью Spring Cloud Stream и брокера сообщений Apache Kafka

Вступление

Архитектура, управляемая событиями, требует предусмотрительности, создание каркаса для интеграции популярных платформ потоковой передачи событий может оказаться сложным. Spring Cloud Stream - это платформа для приложений микросервисов, управляемых сообщениями, и она предоставляет реализации связывателей для различных брокеров сообщений, RabbitMQ, Apache Kafka, Kafka Streams, Amazon Kinesis и т. Д. Структура может упростить вещи и позволить нам легко создавать публикацию и использование сообщений. для разных платформ, сохраняя конкретные детали реализации выбранной платформы и используя уже знакомые идиомы Spring и интерфейсы Spring.

В этом посте мы будем использовать брокер сообщений Apache Kafka.

Джей Крепс решил назвать программу в честь автора Франца Кафки, потому что это система, оптимизированная для письма, и ему нравились работы Кафки.
источник: Википедия

В моей предыдущей статье мы использовали Spring Application Events, чтобы инициировать отправку электронной почты каждый раз, когда клиент делает заказ.



Теперь мы будем строить над этим, чтобы взглянуть на архитектуру Spring Cloud Stream. Мы разделим это приложение на два микросервиса, которые обмениваются сообщениями. Мы создадим сервис пользователя, который будет заниматься администрированием пользователей и сервисом уведомлений, который будет отвечать за отправку различных уведомлений, в данном случае электронных писем.

Пользовательский сервис будет производителем сообщений (издателем), он будет публиковать сообщение в теме Kafka каждый раз, когда создается новая учетная запись пользователя. Служба уведомлений будет потребителем сообщений, она будет принимать сообщения из очереди сообщений и реагировать на эти сообщения без явного вызова службы пользователя.
Используя обмен сообщениями, мы также можем добавить новые функции, которые могут реагировать на изменения в службе пользователей, заставляя их прослушивать сообщения в очереди сообщений и позволяя нашим службам масштабироваться.

Зависимости

Приложение Spring Cloud Stream имеет ядро, не зависящее от промежуточного программного обеспечения, и обменивается данными через привязки между назначениями, открытыми внешними брокерами. Детали брокера, необходимые для установления привязок, обрабатываются реализациями binder. Добавьте реализацию связывателя Kafka в gradle.build или pom.xml файл в службе пользователей и службе уведомлений вместе с другими необходимыми зависимостями.

dependencies {
  ...
   implementation 'org.springframework.cloud:spring-cloud-stream'
   implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
  ...
}

Служба уведомлений

Поскольку служба уведомлений будет принимать сообщения, опубликованные службой пользователей, давайте создадим Consumer bean. Потребитель представляет собой операцию, которая принимает один входной аргумент. В данном случае UserDto, который будет содержать информацию о зарегистрировавшемся пользователе, например, его имя и адрес электронной почты.

Привязка - это мост между источником и целью, в модели функционального программирования имя привязки по умолчанию:

  • ввод - <functionName> + -in- + <index>
  • выход - <functionName> + -out- + <index>

где in и out соответствуют типу привязки, ввода или вывода.

Чтобы улучшить читаемость, мы можем дать привязке более описательное имя, сопоставив неявное имя привязки с явным именем привязки, используя следующее свойство:

spring.cloud.stream.bindings.eventConsumer-in-0.destination= notification-events

Пользовательское обслуживание

Поскольку это событие по запросу, нам нужно внедрить bean-компонент StreamBridge, который позволит нам отправлять данные в выходную привязку, тем самым создавая мост между непотоковым приложением и Spring Cloud Stream. Чтобы инициировать создание привязки источника, мы используем свойство spring.cloud.stream.source=supplier, в котором мы объявляем имя источника. Затем мы даем ему то же явное имя, что и в службе уведомлений, используя свойство spring.cloud.stream.bindings.supplier-out-0.destination=notification-events. В метод streamBridge.send() мы передаем POJO, и он будет проходить ту же процедуру, как если бы он исходил от функции или поставщика.

Докер

Для его запуска у вас должны быть установлены Kafka и Zookeeper, я использую Docker для запуска Kafka и Zookeeper с остальными микросервисами. Вот фрагмент docker-compose.yml:

Используйте свойство spring.cloud.stream.kafka.binder.brokers=kafka в микросервисе application.proeprties, чтобы предоставить список брокеров, к которым подключается связыватель Kafka. По умолчанию localhost. И если вы запускаете Kafka на другом порту, отличном от порта по умолчанию 9092, вы можете использовать свойство spring.cloud.stream.kafka.binder.defaultBrokerPort.

Теперь, когда все настроено, мы можем отправить запрос в службу пользователей.

В журналах службы уведомлений мы видим, что она получила сообщение.

И письмо прибыло успешно

Обратите внимание, что Spring Cloud Stream использует библиотеку Spring Retry, и всякий раз, когда возникает исключение во время обработки сообщения, платформа предпринимает несколько попыток повторной попытки одного и того же сообщения (по умолчанию 3).

Заключительные слова

Spring Cloud Stream упрощает обработку асинхронных сообщений. Он использует модель функционального программирования. Производители, потребители и процессоры Kafka определяются в функциональном интерфейсе Java как Поставщики, Потребители и Функции. Вам просто нужно определить Spring Beans поставщика, потребителя или функции, и Binder будет обрабатывать интеграцию с выбранным брокером, позволяя вашему коду сосредоточиться на бизнес-логике и быть нейтральным к промежуточному программному обеспечению.

Исходный код полного демонстрационного проекта доступен на GitHub.