ActiveMQ написан на Java и имеет полноценный JMS-клиент, но его поддержка STOMP и RESTful API позволяют нам легко взаимодействовать с очередью сообщений с помощью Node.js.
С Node вы обнаружите, что STOMP - отличный выбор протокола для подписки на очереди, в то время как HTTP идеально подходит для отправки сообщений (и особенно хорош для сообщений, которые должны быть доставлены в установленное время из-за большого количества информации. ответ, который мы получаем из очереди при планировании доставки сообщения).
Начнем с основ. Используя очень удобный узел-топ-клиент Рассела Херинга и Easternbloc, мы можем создать класс в Node.js, который позволяет нам подписываться на очередь ActiveMQ по нашему выбору. Вот как это может выглядеть:
'use strict'; var Stomp = require('stomp-client'); var MessageConsumer = function MessageConsumer(){}; MessageConsumer.prototype.init = function init(){ var stompClient = new Stomp('127.0.0.1', 61613, 'user', 'pw'); stompClient.connect(function(sessionId){ stompClient.subscribe('/queue/queue1', function(body, headers){ /* this callback function is invoked whenever our a client receives a message. */ } }); }; module.exports = new MessageConsumer();
Просто вызовите метод init () этого класса, чтобы подписаться на очередь. (Здесь также стоит упомянуть, что способ, которым мы обращаемся к очередям в STOMP, отличается от того, как мы делаем это с JMS - с STOMP нам нужно добавить строку «/ queue /» к имени самой очереди.)
Отправить сообщение с помощью STOMP очень просто. Давайте создадим класс, который можно будет использовать для отправки сообщения ниже:
'use strict'; var Stomp = require(‘stomp-client’); var MessageProducer = function MessageProducer(){ this._stompClient = null; }; MessageProducer.prototype.init = function init(){ this._stompClient = new Stomp(‘127.0.0.1’, 61613, ‘user’, ‘pw’); this._stompClient.connect(function(sessionId){ console.log(“STOMP client connected.”); }); }; MessageProducer.prototype.sendMessage = function sendMessage(messageToPublish){ this._stompClient.publish('/queue/queue1', messageToPublish); }; module.exports = new MessageProducer();
Вы можете заметить серьезное ограничение подхода к публикации сообщений, использованного выше - нет возможности проверить получение сообщения от брокера! STOMP 1.2 действительно дает нам что-то, называемое фреймом квитанция (вот клиент Node.js, который его реализует), и хотя он помогает нам по времени с процессами, которые должны произойти после того, как сообщение было отправлено брокеру и в зависимости от успешного получения сообщения этим брокером (скажем, например, вы хотите записать запись этого сообщения где-нибудь в базу данных), он не отправляет обратно никаких подробных данных, имеющих отношение к самому сообщению.
Представьте себе сценарий, в котором вы планируете доставку сообщения в будущем, но вам нужен какой-то механизм, который позволил бы вам отменить доставку этого сообщения (у ActiveMQ есть отличный планировщик, о котором вы можете прочитать здесь). Проблема в том, что вы можете удалять только отдельные сообщения, используя внутренний идентификатор сообщения, который генерирует планировщик сообщений ActiveMQ, а STOMP не дает вам возможности получить этот идентификатор сразу после планирования сообщения.
По-прежнему можно удалить запланированное сообщение с помощью STOMP. Вот что вам нужно сделать (есть немало обручей, через которые нужно пройти, но я собираюсь сделать тот же базовый метод, который описан здесь, но с внешним клиентом Node.js, а не с JMS):
- Во-первых, обязательно отправьте какой-либо другой вид идентифицирующей информации с заголовками сообщения, которое мы публикуем. Например, можно использовать свойство id-корреляции ActiveMQ и присвоить ему значение UUID, которое может функционировать как первичный ключ для записи запланированного сообщения, которое мы сохраняем в базе данных. Для этого вы можете изменить метод sendMessage в своем классе MessageProducer, чтобы он выглядел следующим образом:
MessageProducer.prototype.sendScheduledMessage = function sendScheduledMessage(messageToPublish, scheduledTime, correlationId){ this._stompClient.publish('/queue/queue1', messageToPublish, { AMQ_SCHEDULED_DELAY : scheduledTime, correlation-id : correlationId }); };
- Как только ActiveMQ поставит в очередь несколько запланированных сообщений для доставки по расписанию, отправьте сообщение, которое инициирует запрос «просмотра». Для этого вам нужно использовать внутренний STOMP-клиент вашего класса MessageProducer для публикации сообщения в теме ActiveMQ.Scheduler.Management (поэтому первым аргументом, который вы передадите в stamppClient.subscribe, будет следующая строка: «/ topic / ActiveMQ .Scheduler.Management »). Сообщение, которое вы отправляете для начала просмотра, должно включать следующие заголовки:
{ reply-to: "/queue/browsedMessages", AMQ_SCHEDULER_ACTION: "AMQ_SCHEDULER_ACTION_BROWSE" }
(Примечание: очередь «browsedMessages» может называться как угодно - вы просто создаете очередь, которая будет получать копии любого сообщения, находящегося в настоящее время в планировщике ActiveMQ.)
- Затем создайте еще одного клиента STOMP и подпишите его на эту очередь «browsedMessages». Он будет потреблять все сообщения, которые попадают в очередь browsedMessages, и как только он потребляет эту копию сообщения, которое вы хотите удалить (вы проверяете идентификатор сообщения, которое хотите удалить, проверяя заголовки для идентификатора корреляции, который вы отправлено вместе с сообщением, когда оно было запланировано), возьмите соответствующее свойство message-id (вы также найдете его в заголовках ответа) и отправьте его вместе как значение для «scheduleJobId» в заголовках нового сообщения. в ActiveMQ, который будет использоваться для удаления нужного сообщения из планировщика. Вот как будут выглядеть заголовки этого последующего запроса:
{ AMQ_SCHEDULER_ACTION : "REMOVE", "scheduledJobId" : scheduledJobId }
- Приведенный ниже пример кода должен расширить ваш класс MessageProducer методами, которые позволят вам это реализовать:
MessageProducer.prototype._sendDeleteMessageRequest = function _sendDeleteMessageRequest(scheduledJobId){ this._stompClient.publish(“/topic/ActiveMQ.Scheduler.Management”, null, { AMQ_SCHEDULER_ACTION : “REMOVE”, “scheduledJobId” : scheduledJobId }); }; MessageProducer.prototype._sendBrowseRequest = function _sendBrowseRequest(){ this._stompClient.publish(“/topic/ActiveMQ.Scheduler.Management”, null, { “reply-to” : “/queue/browsedMessages”, AMQ_SCHEDULER_ACTION : “AMQ_SCHEDULER_ACTION_BROWSE” }); }; MessageProducer.prototype.deleteMessage = function deleteMessage(correlationId){ var browserConsumer = new Stomp(‘127.0.0.1’, 61613, ‘user’, ‘pw’); var self = this; return new Promise(function(resolve, reject){ browserConsumer.connect(function(sessionId){ browserConsumer.subscribe(destination, function(body, headers) { if (headers[‘correlation-id’] === correlationId){ browserConsumer.disconnect(function(){ return resolve(headers[‘message-id’]); }); } }); process.nextTick(function(){ self._sendBrowseRequest(); }); }); }).then(function(headers){ self._sendDeleteMessageRequest(headers.scheduledJobId); }); };
Если весь этот процесс показался вам огромной, неоптимальной головной болью, не волнуйтесь - надежда есть.
Планирование и удаление с помощью HTTP и REST
Вот действительно простой и перспективный способ использовать HTTP для планирования сообщения, если у вас включен ActiveMQ’s REST API:
'use strict'; var request = require('request'); var Promise = require('bluebird'); var MessageScheduler = function MessageScheduler(){}; MessageScheduler.prototype.scheduleMessage = function scheduleMessage(messageToSend, delay){ var promisifiedPostRequest = Promise.promisify(request.post); return promisifiedPostRequest( 'http://localhost:8080/demo/message/queue1?type=queue', { form : { body: JSON.stringify(notification), AMQ_SCHEDULED_DELAY : delay } }) .then(function(response){ return response.headers.messageid; }); };
Видите этот бит в блоке .then ()? Здесь мы можем получить доступ к идентификатору сообщения, который ActiveMQ присваивает запланированному сообщению, которое, кстати, имеет то же значение, что и selectedJobId сообщения.
В этом прелесть HTTP: он дает нам доступ к этому очень важному свойству в ответе на запрос, что означает, что нам не нужно отправлять запрос «просмотра», чтобы откопать его позже.
Теперь, когда у нас есть это значение, использовать его для удаления запланированного сообщения, которое оно идентифицирует, очень просто:
'use strict'; var Promise = require('bluebird'); var request = require('request'); var MessageDeleter = function MessageDeleter(messageID){ var promisifiedPostRequest = Promise.promisify(request.post); return promisifiedPostRequest( 'http://localhost:8080/demo/message/ActiveMQ.Scheduler.Management?type=topic', { form: { AMQ_SCHEDULER_ACTION : 'REMOVE', scheduledJobId : scheduledJobId } }).then(function(response){ //do any post-deletion processes you need to here return response; } }
С REST это все.
Единственное серьезное ограничение на удаление сообщений (и это верно как для STOMP, так и для HTTP) заключается в том, что мы не получаем подтверждения от ActiveMQ, что сообщение было действительно удалено - просто запрос на удаление это сообщение было получено и обработано. Сообщение, предназначенное для удаления, могло быть даже доставлено потребителю до того, как сообщение, содержащее запрос на удаление, даже достигло очереди.
Тем не менее, это значительно приближает нас к наличию надежных, быстрых, основанных на узлах средств взаимодействия с возможностями планирования сообщений ActiveMQ.