Горизонтальное масштабирование сервера WebSocket с использованием Spring Boot, Redis Pub/Sub и Redis Streams

Это продолжение моей предыдущей статьи о проектных соображениях для горизонтального масштабирования сервера WebSocket.

В этой статье я подробно расскажу, как мы можем реализовать это с помощью Redis Pub/Sub и Redis Streams.



Моя серия серверов WebSocket

Краткое резюме

В прошлой статье мы определили две проблемы, которые возникнут при горизонтальном масштабировании сервера WebSocket и внутренних микросервисов:

  • Проблема № 1: потеря сообщений из-за балансировщика нагрузки
  • Проблема № 2: Дублированная обработка сообщений из-за нескольких подписчиков

Решения заключались в применении шаблонов обмена сообщениями публикация-подписка с концепциями групп потребителей к дизайну архитектуры. Для получения дополнительной информации обратитесь к предыдущей статье.

Давайте начнем

Следуйте инструкциям по созданию масштабируемого сервера WebSocket с использованием Spring Boot, Stomp, Redis Pub/Sub и Redis Streams.

Шаг 1. Создание сервера WebSocket

Следуйте шагам 1 и 2 моей предыдущей статьи, чтобы инициализировать сервер WebSocket с использованием протокола обмена сообщениями Spring Boot и STOMP.

Шаг 2. Запустите сервер Redis

Для быстрой настройки запустите сервер Redis локально с помощью docker.

docker run --name redis -p 6379:6379 -d redis

Шаг 3. Настройте подключение к серверу Redis

Добавьте следующую конфигурацию в файл application.yml сервера WebSocket для подключения к серверу Redis.

# application.yml
spring.redis:
    host: localhost
    port: 6379

Шаг 4. Внедрение Pub/Sub (вещательного канала) для однонаправленной связи в реальном времени

На шаге 4 мы создадим API для однонаправленной связи в реальном времени между серверными микросервисами и веб-приложениями (интерфейсом). Сервер WebSocket получает сообщения от серверных микросервисов через API и рассылает сообщения всем экземплярам сервера WebSocket с помощью Redis Pub/Sub. Затем сообщения пересылаются в веб-приложения через установленные соединения WebSocket.

Шаг 4.1. Создайте класс BroadcastEvent

BroadcastEvent — это настраиваемый объект для трансляции сообщения от одного экземпляра сервера WebSocket всем экземплярам сервера WebSocket.

Шаг 4.2. Настройте Redis Pub/Sub — ReactiveRedisTemplate

ReactiveRedisTemplate — это вспомогательный класс, упрощающий код доступа к данным Redis. В нашей конфигурации мы публикуем/подписываемся на значение BroadcastEvent и используем Jackson2JsonRedisSerializer для выполнения автоматической сериализации/десериализации значения.

Шаг 4.3. Настройте Redis Pub/Sub — службу вещания

RedisBroadcastService содержит логику публикации и подписки на пользовательский канал ( BROADCAST-CHANNEL ). Это канал для трансляции сообщений от одного экземпляра сервера WebSocket всем экземплярам сервера WebSocket.

Всякий раз, когда серверы WebSocket получают сообщение от BROADCAST-CHANNEL, сообщение перенаправляется в веб-приложения (интерфейс), которые установили с ним соединение WebSocket.

Примечание. @PostConstruct — это аннотация Spring, которая позволяет нам прикреплять настраиваемые действия к созданию компонента, а методы запускаются только один раз. В нашем случае мы подписываемся на BROADCAST-CHANNEL при создании bean-компонента.

Шаг 4.4. Создание конечных точек API

В приведенном ниже коде создается контроллер REST с конечной точкой запроса POST, который принимает тело запроса NewMessageRequest. topic — это место назначения STOMP, на которое подписывается клиент (интерфейс), а message — это фактическое сообщение в формате String.

Запросы API будут транслироваться на все экземпляры серверов WebSocket, как настроено на шаге 4.3 выше.

Шаг 4.5. Тестирование однонаправленного обмена данными в реальном времени через API

Запустите сервер WebSocket и подключитесь к серверу WebSocket ws://localhost:8080/stomp по протоколу STOMP, используя инструмент отладчика WebSocket, разработанный jiangxy. После подключения настройте инструмент отладчика WebSocket для подписки на тему /topic/frontend.

Затем отправьте запрос HTTP POST на сервер WebSocket, используя приведенную ниже команду curl:

curl -X POST -d '{"topic": "/topic/frontend", "message": "testing API endpoint" }' -H 'Content-Type: application/json' localhost:8080/api/notification

Инструмент отладчика WebSocket должен иметь вывод, показанный ниже:

Это показывает, что мы успешно настроили сервер WebSocket с Redis Pub/Sub для масштабируемой однонаправленной связи в реальном времени между серверными микросервисами и веб-приложениями (интерфейсом).

Шаг 5. Внедрите Pub/Sub с группами потребителей для двунаправленного общения в режиме реального времени

На шаге 5 мы будем использовать Redis Streams в качестве нашей системы Pub/Sub для двунаправленной связи в реальном времени между серверными микросервисами и веб-приложениями (интерфейсом). Мы не используем Redis Pub/Sub, так как он не поддерживает концепцию групп потребителей.

Шаг 5.1. Создайте класс StreamDataEvent

StreamDataEvent — это настраиваемый объект для обмена данными между подписчиками и издателями. message — это фактическое сообщение в формате String, а topic — это обязательное поле для сервера WebSocket, чтобы знать, в какой пункт назначения STOMP отправить сообщение.

Шаг 5.2. Сервер WebSocket — реализация потребителя потока Redis

Потребитель получает сообщение из потоков Redis и пересылает сообщение во все веб-приложения (интерфейс) через установленное соединение WebSocket.

Примечание. Нет необходимости широковещательно передавать сообщение, так как все экземпляры сервера WebSocket получат сообщение из потоков Redis.

Шаг 5.3. Сервер WebSocket — реализация конфигурации потока Redis

Следующий код содержит конфигурации для подписки на потоки Redis, где сообщения будут обрабатываться RedisStreamConsumer, который мы настроили на шаге 5.2.

Здесь мы настраиваем сервер WebSocket для прослушивания потока, идентифицированного ключом TEST_EVENT_TO_WEBSOCKET_SERVER. Вы можете создать больше подписок в зависимости от ваших вариантов использования.

Шаг 5.4. Сервер WebSocket — реализация производителя потоков Redis

Производитель предоставляет метод publishEvent для публикации данных в потоках Redis. В нашем примере есть запланированное задание, которое периодически публикуется (каждые пять секунд, десять секунд после запуска сервера WebSocket) в потоки Redis с использованием ключа TEST_EVENT_TO_BACKEND.

Шаг 5.5. Сервер WebSocket — реализация конфигурации WebSocket

Создайте Controller, который обрабатывает сообщения из веб-приложения (внешнего интерфейса), которые отправляются на сервер WebSocket с префиксом /app. В приведенном ниже примере сообщения, отправленные на /app/test, будут перенаправлены (опубликованы) в потоки Redis по ключу TEST_EVENT_TO_BACKEND.

Примечание. Нет необходимости рассылать сообщение всем экземплярам WebSocket, поскольку публикация в Redis Streams уже гарантирует получение сообщения всеми серверными микросервисами. Дополнительные сведения см. на схеме в шаге 5.

Шаг 5.6. Серверная микрослужба — реализация потокового потребителя Redis

Аналогичным образом в образце серверной микрослужбы реализуйте потребителя потока Redis.

Шаг 5.6. Серверная микрослужба — реализация конфигурации потока Redis

Конфигурация здесь аналогична конфигурации сервера WebSocket. Единственное отличие состоит в том, что мы добавили группу потребителей ( CONSUMER_GROUP ), которая гарантирует, что только один экземпляр серверной микрослужбы будет использовать данные из потоков Redis.

Чтобы конфигурация работала, нам нужно будет вручную создать группу потребителей для потока TEST_EVENT_TO_BACKEND в Redis, используя приведенную ниже команду.

Примечание. Это также можно реализовать с помощью кода, но я буду проще, используя вместо этого команду Redis CLI.

docker exec redis redis-cli XGROUP CREATE TEST_EVENT_TO_BACKEND CONSUMER_GROUP $ MKSTREAM

Шаг 5.7. Серверная микрослужба — реализация производителя потоков Redis

Конфигурация производителя аналогична конфигурации сервера WebSocket.

Обратите внимание, что у микрослужбы есть запланированное задание, которое периодически публикуется в потоках Redis, а сообщение создается для отправки в веб-приложение (интерфейс) в целевой теме /topic/to-frontend как часть нашего примера.

Шаг 5.8. Тестирование двунаправленного обмена данными в режиме реального времени через Pub/Sub

Мы настроили как сервер WebSocket, так и образец серверной микрослужбы. Давайте протестируем публикацию и подписку на данные из потоков Redis, используя запланированную конфигурацию публикации данных, которую мы сделали в RedisStreamProducer.

Разверните два экземпляра сервера WebSocket и два экземпляра демонстрационных серверных микросервисов. Вы должны заметить, что выходные журналы аналогичны приведенным ниже.

Если вы подключаетесь к серверу WebSocket с помощью инструмента отладчика WebSocket и подписываетесь на тему /topic/to-frontend, вы должны увидеть следующие логи:

Это показывает, что мы успешно настроили сервер WebSocket с Redis Streams для масштабируемой двунаправленной связи в реальном времени между серверными микросервисами и веб-приложениями (интерфейсом).

Краткое содержание

Вот и все! Вы можете найти пример кода здесь на GitHub. Моя реализация не идеальна, но цель состоит в том, чтобы дать вам представление о том, как легко масштабировать серверы WebSocket в микросервисной архитектуре с помощью шаблонов обмена сообщениями публикации-подписки.

Спасибо, что дочитали до конца. Надеюсь, вы узнали что-то новое из этой статьи. Следите за новостями и удачного обучения!