Это 4-я статья из цикла статей Micro in Action, посвященных Micro. Мы шаг за шагом создадим микросервис и объясним особенности Micro на этом пути. Мы начнем с основных понятий и тем, а затем перейдем к расширенным функциям.
Давайте сегодня поговорим об асинхронной обработке сообщений.
Асинхронная обработка сообщений - ключевая технология для построения масштабируемой и отказоустойчивой системы. Несмотря на то, что это мощный инструмент, его довольно утомительно разрабатывать, и необходимо учитывать многие технические детали. Она намного менее проста и понятна, чем синхронная система.
К счастью, Micro сделала очень элегантную абстракцию и инкапсуляцию этой модели программирования. Так что мы можем использовать это для нашего удобства.
Кроме того, с помощью абстракции интерфейса Micro мы можем прозрачно (или почти прозрачно) поддерживать различные брокеры сообщений.
Micro по умолчанию предоставляет встроенный брокер сообщений Nats. В то же время он также обеспечивает широкую поддержку основных брокеров сообщений через плагины, включая Kafka, RabbitMQ, MQTT, NSQ, Amazon SQS и т. Д. Это позволяет нам практически не изменять какой-либо бизнес-код при переключении брокеров сообщений.
Micro поддерживает асинхронный обмен сообщениями двумя разными способами.
Один - Pub / Sub, а другой работает с сообщениями через интерфейсmicro.Broker
. Первый относительно прост, а второй обеспечивает большую гибкость.
Встроенные функции Pub / Sub в Micro унифицируют и упрощают отправку, получение, кодирование и декодирование сообщений. Это освобождает разработчиков от основных технических деталей и позволяет им сосредоточиться на создании ценности для бизнеса. В большинстве случаев нам следует выбрать именно этот путь.
Ниже мы разберем развитие системы Pub / Sub на примере.
Подписаться на сообщение
В первой статье этой серии мы создали образец проекта, который уже содержит коды, относящиеся к подписке на сообщения.
Сначала мы определяем обработчик обработки сообщений. Код ./subscriber/hello.go следующий:
package subscriber import ( "context" log "github.com/micro/go-micro/v2/logger" hello "hello/proto/hello" ) type Hello struct{} func (e *Hello) Handle(ctx context.Context, msg *hello.Message) error { log.Info("Handler Received message: ", msg.Say) return nil }
Для обработки сообщений может использоваться функция или метод структуры. Пока его подпись func(context.Context, v interface{}) error
Обратите внимание, что вторым параметром нашей функции-обработчика является *hello.Message
, который определен в файле .proto. Micro автоматически декодирует сообщение, поэтому мы можем использовать его непосредственно в обработчиках сообщений.
После подготовки обработчика сообщений его необходимо зарегистрировать. Соответствующие коды ./main.go следующие:
... // Register Struct as Subscriber micro.RegisterSubscriber("com.foo.service.hello", service.Server(), new(subscriber.Hello)) ...
В приведенном выше коде регистрируются два обработчика сообщений, и они будут получать сообщения из темы с именем «com.foo.service.hello».
Если вы хотите больше контролировать поведение подписки, вам необходимо передать дополнительные параметры в micro.RegisterSubscriber
. Давайте сначала посмотрим на сигнатуру этого метода:
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error
Первый параметр представляет тему. Второй параметр - server.Server
, который можно получить из service.Server()
. Третий параметр - это обработчик сообщений.
Последний - необязательный параметр, который управляет поведением подписки. Его тип server.SubscriberOption
. В настоящее время Micro предоставляет 4 встроенных варианта:
- server.DisableAutoAck (), отключите автоматическое подтверждение сообщений после их обработки。
- server.SubscriberContext (ctx context.Context), установите параметры контекста, чтобы разрешить передачу SubscriberOption брокеру。
- server.InternalSubscriber (b bool) , указывает, что подписчик не объявляется системе обнаружения.
- server.SubscriberQueue (n строка) , имя общей очереди распределяет сообщения по подписчикам。
Примечание: я лично считаю, что фреймворк предоставляет слишком мало опций. Если у вас более высокие требования, например, вы хотите контролировать сохранение сообщения или стратегию повторной передачи, вам нужно обратиться к интерфейсу micro.Broker
. Надеюсь, это будет улучшено в будущих выпусках
Среди вышеперечисленных вариантов, server.SubscriberQueue
заслуживает отдельного объяснения.
Мы знаем, что в модели Pub / Sub существует концепция очереди (или канала на какой-то платформе). Если несколько подписчиков темы имеют свою собственную очередь, то сообщения будут скопированы и распределены в разные очереди, чтобы каждый подписчик мог получить все сообщения.
Micro по умолчанию создает глобально уникальную очередь для каждого экземпляра подписчика. Если вы хотите разделить очередь с несколькими экземплярами подписчиков, вам необходимо явно указать имя очереди через server.SubscriberQueue
:
micro.RegisterSubscriber("com.foo.srv.hello", service.Server(), subscriber.Handler, server.SubscriberQueue("foo_bar"))
Таким образом, экземпляры подписчика будут использовать одну и ту же очередь. Следовательно, сообщения будут распространяться на определенный узел для обработки, избегая повторной обработки одного и того же сообщения.
Учитывая, что это распространенный сценарий в распределенной системе, когда несколько экземпляров службы работают на разных узлах, я предлагаю: если вы не знаете, что делаете, всегда явно указывайте имя очереди - даже если в настоящее время существует только один экземпляр подписки . Наиболее распространенная практика - присвоить очереди имя Тема.
На этом этапе Sub часть модели Pub / Sub готова. Приступим к разделу Паб.
Опубликовать сообщения
Давайте создадим проект с названием pub, который публикует сообщение, и его структура будет следующей:
. ├── main.go ├── plugin.go ├── proto/hello │ └── hello.proto │ └── hello.pb.go │ └── hello.pb.micro.go └── go.mod
За исключением main.go, содержимое других файлов такое же, как описано в предыдущей статье, и не будет здесь повторяться.
Вот коды в main.go :
- Сначала создайте и инициализируйте экземпляр
micro.Service
и назовите его «com.foo.srv.hello.pub». Название не имеет особого значения и, скорее всего, будет другим в реальных проектах. - Затем укажите тему для отправки сообщения и создайте
micro.Publisher
instance. - Затем каждую секунду отправляйте сообщение типа
*hello.Message
, и фреймворк автоматически кодирует сообщение.
Подобно функции подписки, интерфейс публикации также поддерживает параметры, которые можно использовать для управления поведением публикации. Определение micro.Publisher
интерфейса следующее:
// Publisher is syntactic sugar for publishing type Publisher interface { Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error }
В настоящее время Micro предоставляет только одну встроенную опцию публикации:
- client.WithExchange (e string) PublishOption , устанавливает обмен для маршрутизации сообщения.
Беги вверх
После подготовки проекта pub запустите сервер приветствия и проект pub.
Затем мы увидим журнал полученных сообщений в консоли hello server, одна строчка в секунду:
$ go run main.go plugin.go 2020-04-03 10:10:22 level=info Starting [service] com.foo.service.hello 2020-04-03 10:10:22 level=info Server [grpc] Listening on [::]:52863 2020-04-03 10:10:22 level=info Broker [eats] Connected to [::]:52865 2020-04-03 10:10:22 level=info Registry [mdns] Registering node: com.foo.service.hello-04df9f5a-f93e-437e-9272-0f6a37a99e4e 2020-04-03 10:10:22 level=info Subscribing to topic: com.foo.service.hello 2020-04-03 10:10:22 level=info Handler Received message: 2020-04-03 10:10:22.994619 +0800 CST m=+25.013864340 2020-04-03 10:10:23 level=info Handler Received message: 2020-04-03 10:10:23.994576 +0800 CST m=+26.013819155 2020-04-03 10:10:24 level=info Handler Received message: 2020-04-03 10:10:24.994613 +0800 CST m=+27.013852978 ...
Заключение
Micro полностью поддерживает асинхронный обмен сообщениями. Он поддерживает как высокоуровневую модель Pub / Sub, так и низкоуровневые операции через micro.Broker
.
Pub / Sub значительно упрощает разработку асинхронного обмена сообщениями, так что мы можем сосредоточиться на бизнес-логике, а не на технических деталях.
Разработчикам нужно только определить издателя, подписчика и содержимое сообщения. Всю остальную работу выполняет фреймворк. Больше нет необходимости рассматривать общие проблемы в системах асинхронного обмена сообщениями, такие как маршрутизация сообщений, повторная передача и подтверждение приема, а также нет необходимости учитывать кодирование и декодирование сообщений.
Конечно, это упрощение также накладывает некоторые ограничения. Если Pub / Sub не может удовлетворить ваши потребности, следите за следующей статьей из этой серии, мы поговорим об обмене сообщениями с micro.Broker
позже.
Продолжение следует.
Смотрите также:
- Micro в действии, часть 1: начало работы
- Micro In Action, Часть 2: Полное руководство по Bootstrap
- Micro в действии, часть 3: вызов службы
- Micro в действии, часть 5: Посредник сообщений
- Micro в действии, часть 6: обнаружение сервисов
- Micro In Action, Часть 7: Автоматический выключатель и ограничитель скорости
- Micro In Action, Coda: Распределенная работа Cron
- Главная страница Micro In Action