Это 4-я статья из цикла статей Micro in Action, посвященных Micro. Мы шаг за шагом создадим микросервис и объясним особенности Micro на этом пути. Мы начнем с основных понятий и тем, а затем перейдем к расширенным функциям.

Давайте сегодня поговорим об асинхронной обработке сообщений.

Асинхронная обработка сообщений - ключевая технология для построения масштабируемой и отказоустойчивой системы. Несмотря на то, что это мощный инструмент, его довольно утомительно разрабатывать, и необходимо учитывать многие технические детали. Она намного менее проста и понятна, чем синхронная система.

К счастью, Micro сделала очень элегантную абстракцию и инкапсуляцию этой модели программирования. Так что мы можем использовать это для нашего удобства.

Кроме того, с помощью абстракции интерфейса Micro мы можем прозрачно (или почти прозрачно) поддерживать различные брокеры сообщений.

Micro по умолчанию предоставляет встроенный брокер сообщений Nats. В то же время он также обеспечивает широкую поддержку основных брокеров сообщений через плагины, включая Kafka, RabbitMQ, MQTT, NSQ, Amazon SQS и т. Д. Это позволяет нам практически не изменять какой-либо бизнес-код при переключении брокеров сообщений.

Micro поддерживает асинхронный обмен сообщениями двумя разными способами.

Один - Pub / Sub, а другой работает с сообщениями через интерфейсmicro.Broker. Первый относительно прост, а второй обеспечивает большую гибкость.

Встроенные функции Pub / Sub в Micro унифицируют и упрощают отправку, получение, кодирование и декодирование сообщений. Это освобождает разработчиков от основных технических деталей и позволяет им сосредоточиться на создании ценности для бизнеса. В большинстве случаев нам следует выбрать именно этот путь.

Ниже мы разберем развитие системы Pub / Sub на примере.

Подписаться на сообщение

В первой статье этой серии мы создали образец проекта, который уже содержит коды, относящиеся к подписке на сообщения.

Сначала мы определяем обработчик обработки сообщений. Код ./subscriber/hello.go следующий:

package subscriber

import (
   "context"
   log "github.com/micro/go-micro/v2/logger"

   hello "hello/proto/hello"
)

type Hello struct{}

func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error {
   log.Info("Handler Received message: ", msg.Say)
   return nil
}

Для обработки сообщений может использоваться функция или метод структуры. Пока его подпись func(context.Context, v interface{}) error

Обратите внимание, что вторым параметром нашей функции-обработчика является *hello.Message, который определен в файле .proto. Micro автоматически декодирует сообщение, поэтому мы можем использовать его непосредственно в обработчиках сообщений.

После подготовки обработчика сообщений его необходимо зарегистрировать. Соответствующие коды ./main.go следующие:

...
// Register Struct as Subscriber
micro.RegisterSubscriber("com.foo.service.hello", service.Server(), new(subscriber.Hello))
...

В приведенном выше коде регистрируются два обработчика сообщений, и они будут получать сообщения из темы с именем «com.foo.service.hello».

Если вы хотите больше контролировать поведение подписки, вам необходимо передать дополнительные параметры в micro.RegisterSubscriber. Давайте сначала посмотрим на сигнатуру этого метода:

func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error

Первый параметр представляет тему. Второй параметр - server.Server, который можно получить из service.Server(). Третий параметр - это обработчик сообщений.

Последний - необязательный параметр, который управляет поведением подписки. Его тип server.SubscriberOption. В настоящее время Micro предоставляет 4 встроенных варианта:

  1. server.DisableAutoAck (), отключите автоматическое подтверждение сообщений после их обработки。
  2. server.SubscriberContext (ctx context.Context), установите параметры контекста, чтобы разрешить передачу SubscriberOption брокеру。
  3. server.InternalSubscriber (b bool) , указывает, что подписчик не объявляется системе обнаружения.
  4. server.SubscriberQueue (n строка) , имя общей очереди распределяет сообщения по подписчикам。

Примечание: я лично считаю, что фреймворк предоставляет слишком мало опций. Если у вас более высокие требования, например, вы хотите контролировать сохранение сообщения или стратегию повторной передачи, вам нужно обратиться к интерфейсу micro.Broker. Надеюсь, это будет улучшено в будущих выпусках

Среди вышеперечисленных вариантов, server.SubscriberQueue заслуживает отдельного объяснения.

Мы знаем, что в модели Pub / Sub существует концепция очереди (или канала на какой-то платформе). Если несколько подписчиков темы имеют свою собственную очередь, то сообщения будут скопированы и распределены в разные очереди, чтобы каждый подписчик мог получить все сообщения.

Micro по умолчанию создает глобально уникальную очередь для каждого экземпляра подписчика. Если вы хотите разделить очередь с несколькими экземплярами подписчиков, вам необходимо явно указать имя очереди через server.SubscriberQueue:

micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler, server.SubscriberQueue("foo_bar"))

Таким образом, экземпляры подписчика будут использовать одну и ту же очередь. Следовательно, сообщения будут распространяться на определенный узел для обработки, избегая повторной обработки одного и того же сообщения.

Учитывая, что это распространенный сценарий в распределенной системе, когда несколько экземпляров службы работают на разных узлах, я предлагаю: если вы не знаете, что делаете, всегда явно указывайте имя очереди - даже если в настоящее время существует только один экземпляр подписки . Наиболее распространенная практика - присвоить очереди имя Тема.

На этом этапе Sub часть модели Pub / Sub готова. Приступим к разделу Паб.

Опубликовать сообщения

Давайте создадим проект с названием pub, который публикует сообщение, и его структура будет следующей:

.
├── main.go
├── plugin.go
├── proto/hello
│   └── hello.proto
│   └── hello.pb.go
│   └── hello.pb.micro.go
└── go.mod

За исключением main.go, содержимое других файлов такое же, как описано в предыдущей статье, и не будет здесь повторяться.

Вот коды в main.go

  • Сначала создайте и инициализируйте экземпляр micro.Service и назовите его «com.foo.srv.hello.pub». Название не имеет особого значения и, скорее всего, будет другим в реальных проектах.
  • Затем укажите тему для отправки сообщения и создайте micro.Publisher instance.
  • Затем каждую секунду отправляйте сообщение типа *hello.Message, и фреймворк автоматически кодирует сообщение.

Подобно функции подписки, интерфейс публикации также поддерживает параметры, которые можно использовать для управления поведением публикации. Определение micro.Publisher интерфейса следующее:

// Publisher is syntactic sugar for publishing
type Publisher interface {
   Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}

В настоящее время Micro предоставляет только одну встроенную опцию публикации:

  • client.WithExchange (e string) PublishOption , устанавливает обмен для маршрутизации сообщения.

Беги вверх

После подготовки проекта pub запустите сервер приветствия и проект pub.

Затем мы увидим журнал полученных сообщений в консоли hello server, одна строчка в секунду:

$ go run main.go plugin.go 
2020-04-03 10:10:22  level=info Starting [service] com.foo.service.hello
2020-04-03 10:10:22  level=info Server [grpc] Listening on [::]:52863
2020-04-03 10:10:22  level=info Broker [eats] Connected to [::]:52865
2020-04-03 10:10:22  level=info Registry [mdns] Registering node: com.foo.service.hello-04df9f5a-f93e-437e-9272-0f6a37a99e4e
2020-04-03 10:10:22  level=info Subscribing to topic: com.foo.service.hello
2020-04-03 10:10:22  level=info Handler Received message: 2020-04-03 10:10:22.994619 +0800 CST m=+25.013864340
2020-04-03 10:10:23  level=info Handler Received message: 2020-04-03 10:10:23.994576 +0800 CST m=+26.013819155
2020-04-03 10:10:24  level=info Handler Received message: 2020-04-03 10:10:24.994613 +0800 CST m=+27.013852978
...

Заключение

Micro полностью поддерживает асинхронный обмен сообщениями. Он поддерживает как высокоуровневую модель Pub / Sub, так и низкоуровневые операции через micro.Broker.

Pub / Sub значительно упрощает разработку асинхронного обмена сообщениями, так что мы можем сосредоточиться на бизнес-логике, а не на технических деталях.

Разработчикам нужно только определить издателя, подписчика и содержимое сообщения. Всю остальную работу выполняет фреймворк. Больше нет необходимости рассматривать общие проблемы в системах асинхронного обмена сообщениями, такие как маршрутизация сообщений, повторная передача и подтверждение приема, а также нет необходимости учитывать кодирование и декодирование сообщений.

Конечно, это упрощение также накладывает некоторые ограничения. Если Pub / Sub не может удовлетворить ваши потребности, следите за следующей статьей из этой серии, мы поговорим об обмене сообщениями с micro.Broker позже.

Продолжение следует.

Смотрите также: