Что мы будем строить?

В этой статье мы создадим два простых приложения Node.js, которые будут взаимодействовать через RabbitMQ. Первое приложение, назовем его отправителем, предоставит API для отправки чего-либо второму приложению. Второе приложение, назовем его получателем, будет получать сообщения от приложения-отправителя и выводить их на консоль.

Коммуникация

В этой статье мы будем использовать RabbitMQ для налаживания взаимодействия между этими приложениями. RabbitMQ - это брокер сообщений, написанный на Erlang и использующий несколько протоколов для связи с пользователем (мы будем использовать AMQP).

Коммуникационная стратегия убийственно проста. Первое приложение, также известное как отправитель, проверит, существует ли очередь, создаст ее, если ее нет, и отправит ей сообщение. Второе приложение, также известное как получатель, подпишется на новые сообщения из этой очереди и распечатает полученные.

Действие

Я пропущу часть настройки маршрутизации, полный код доступен здесь. Фактически, все, что нам нужно сделать, это написать интерфейс, который позволит нам легко работать с RabbitMQ. Этот интерфейс будет одноэлементным классом, который будет иметь такие методы, как subscribe для получения сообщений из заданной очереди, send для отправки сообщений в заданную очередь и статический getInstance метод для получения одноэлементного экземпляра.

Начнем с метода getInstance. Этот метод должен вернуть экземпляр класса (я назвал этот класс MessageBroker) с установленным соединением RabbitMQ и создать канал. Для связи с RabbitMQ по протоколу AMQP мы будем использовать пакет amqplib. Способ подключения выглядит так:

В строке 6 мы создаем соединение, а в строке 7 мы создаем канал для связи с RabbitMQ.

Следующий метод, который мы добавим, - это метод getInstance, который будет возвращать синглтон:

В строке 7 мы объявили переменную экземпляра, которая будет содержать экземпляр singleton. В строке 27 мы проверяем, содержит ли уже переменная экземпляра экземпляр синглтона. Если это так, метод возвращает переменную экземпляра, которая может содержать ожидающее выполнение обещания, которое впоследствии вернет нам экземпляр MessageBroker. В противном случае мы инициализируем созданный экземпляр MessageBroker в строке 29 и присвоим возвращаемому ожидающему обещанию значение экземпляра. Таким образом мы можем избежать двойного соединения.

Отправка сообщений

Давайте добавим метод send в класс MessageBroker.

В строке 10 мы используем метод assertQueue, который проверяет наличие очереди, и если нет, assertQueue создаст очередь. Эта операция идемпотентна при идентичных аргументах. Мы устанавливаем для параметра длительного использования значение true, если значение true, очередь переживет перезапуски брокера. В строке 11 мы используем метод sendToQueue, который отправляет одно сообщение с content, заданным в качестве буфера, конкретному queue.

Подписка

Чтобы сделать метод подписки правильным, нам нужно понять, как RabbitMQ отправляет сообщения своим подписчикам.

Когда приложение подписывается на сообщения очереди, RabbitMQ рассматривает его как worker. RabbitMQ будет отправлять новые сообщения рабочим, используя алгоритм циклического перебора.

Итак, наше приложение должно подписаться на очередь только один раз, но иметь возможность зарегистрировать столько обработчиков, сколько необходимо.

Во-первых, мы создали queue член класса и определили его как пустой объект. Этот член будет хранить обработчики очередей, где ключ - это имя очереди, а значение - массив зарегистрированных обработчиков. В строке 23 мы проверяем, содержит ли переменная queues что-либо. Если true, мы проверяем, зарегистрирован ли уже данный обработчик, если да, возвращаем функцию отписки, если нет, помещаем обработчик в массив обработчиков this.queues[queue].push(handler) и возвращаем функцию отписки. Если this.queues[queue] не определен, мы проверяем наличие очереди (строка 32) и определяем this.queues[queue] с массивом, который содержит обработчик (строка 33).

Наконец, мы подписываемся на сообщения очереди, используя метод this.channel.consume. Первый аргумент - это имя очереди. Во-вторых, это обработчик сообщений. Чтобы понять, что происходит в строках 36–39, мы должны узнать о подтверждении сообщения.

Подтверждение сообщения

Если вы убьете воркера, мы потеряем сообщение, которое он только что обрабатывал. Мы также потеряем все сообщения, которые были отправлены этому конкретному исполнителю, но еще не были обработаны.

Но мы не хотим терять задачи. Если работник умирает, мы хотим, чтобы задача была доставлена ​​другому работнику.

Подтверждение (подтверждение) отправляется обратно потребителем, чтобы сообщить RabbitMQ, что конкретное сообщение было получено, обработано и что RabbitMQ может его удалить.

Таймаутов сообщений нет; RabbitMQ повторно доставит сообщение, когда потребитель умрет. Это нормально, даже если обработка сообщения занимает очень и очень много времени.

Обработчики запуска

Итак, когда приходит новое сообщение, мы создаем функцию ack, которая отмечает сообщение как подтвержденное, используя _.once, чтобы функция выполнялась только один раз. Затем мы перебираем массив обработчиков this.queues[queue].forEach(h => h(msg, ack)), вызывая каждый обработчик с функцией сообщения и подтверждения.

Отправка и получение

Контроллер отправителя:

Здесь мы получаем экземпляр MessageBroker и вызываем метод send с именем очереди, равным test, и сообщением, равным Buffer, со строковым телом.

Принимающий контроллер:

Здесь мы получаем экземпляр MessageBroker и вызываем метод подписки с именем очереди и функцией обработчика. Обработчик будет печатать содержимое входящих сообщений и сразу же подтверждать их.

Давай отправим сообщение.

Напечатано в консоли приложения-приемника:

Message: {“msg”:”Hello”}

Полный код доступен здесь