ActiveMQ написан на Java и имеет полноценный JMS-клиент, но его поддержка STOMP и RESTful API позволяют нам легко взаимодействовать с очередью сообщений с помощью Node.js.

С Node вы обнаружите, что STOMP - отличный выбор протокола для подписки на очереди, в то время как HTTP идеально подходит для отправки сообщений (и особенно хорош для сообщений, которые должны быть доставлены в установленное время из-за большого количества информации. ответ, который мы получаем из очереди при планировании доставки сообщения).

Начнем с основ. Используя очень удобный узел-топ-клиент Рассела Херинга и Easternbloc, мы можем создать класс в Node.js, который позволяет нам подписываться на очередь ActiveMQ по нашему выбору. Вот как это может выглядеть:

'use strict';
var Stomp = require('stomp-client');
var MessageConsumer = function MessageConsumer(){};
MessageConsumer.prototype.init = function init(){
  var stompClient = new Stomp('127.0.0.1', 61613, 'user', 'pw');
  stompClient.connect(function(sessionId){
    stompClient.subscribe('/queue/queue1', function(body, headers){
      /*
      this callback function is invoked whenever our a client receives a message.
      */
    }
  });
};
module.exports = new MessageConsumer();

Просто вызовите метод init () этого класса, чтобы подписаться на очередь. (Здесь также стоит упомянуть, что способ, которым мы обращаемся к очередям в STOMP, отличается от того, как мы делаем это с JMS - с STOMP нам нужно добавить строку «/ queue /» к имени самой очереди.)

Отправить сообщение с помощью STOMP очень просто. Давайте создадим класс, который можно будет использовать для отправки сообщения ниже:

'use strict';
var Stomp = require(‘stomp-client’);
var MessageProducer = function MessageProducer(){
  this._stompClient = null;
};
MessageProducer.prototype.init = function init(){
  this._stompClient = new Stomp(‘127.0.0.1’, 61613, ‘user’, ‘pw’);
  this._stompClient.connect(function(sessionId){
    console.log(“STOMP client connected.”);
  });
};
MessageProducer.prototype.sendMessage = function sendMessage(messageToPublish){
  this._stompClient.publish('/queue/queue1', messageToPublish);
};
module.exports = new MessageProducer();

Вы можете заметить серьезное ограничение подхода к публикации сообщений, использованного выше - нет возможности проверить получение сообщения от брокера! STOMP 1.2 действительно дает нам что-то, называемое фреймом квитанция (вот клиент Node.js, который его реализует), и хотя он помогает нам по времени с процессами, которые должны произойти после того, как сообщение было отправлено брокеру и в зависимости от успешного получения сообщения этим брокером (скажем, например, вы хотите записать запись этого сообщения где-нибудь в базу данных), он не отправляет обратно никаких подробных данных, имеющих отношение к самому сообщению.

Представьте себе сценарий, в котором вы планируете доставку сообщения в будущем, но вам нужен какой-то механизм, который позволил бы вам отменить доставку этого сообщения (у ActiveMQ есть отличный планировщик, о котором вы можете прочитать здесь). Проблема в том, что вы можете удалять только отдельные сообщения, используя внутренний идентификатор сообщения, который генерирует планировщик сообщений ActiveMQ, а STOMP не дает вам возможности получить этот идентификатор сразу после планирования сообщения.

По-прежнему можно удалить запланированное сообщение с помощью STOMP. Вот что вам нужно сделать (есть немало обручей, через которые нужно пройти, но я собираюсь сделать тот же базовый метод, который описан здесь, но с внешним клиентом Node.js, а не с JMS):

  • Во-первых, обязательно отправьте какой-либо другой вид идентифицирующей информации с заголовками сообщения, которое мы публикуем. Например, можно использовать свойство id-корреляции ActiveMQ и присвоить ему значение UUID, которое может функционировать как первичный ключ для записи запланированного сообщения, которое мы сохраняем в базе данных. Для этого вы можете изменить метод sendMessage в своем классе MessageProducer, чтобы он выглядел следующим образом:
MessageProducer.prototype.sendScheduledMessage = function sendScheduledMessage(messageToPublish, scheduledTime, correlationId){
  this._stompClient.publish('/queue/queue1', messageToPublish, {
    AMQ_SCHEDULED_DELAY : scheduledTime,
    correlation-id : correlationId
  });
};
  • Как только ActiveMQ поставит в очередь несколько запланированных сообщений для доставки по расписанию, отправьте сообщение, которое инициирует запрос «просмотра». Для этого вам нужно использовать внутренний STOMP-клиент вашего класса MessageProducer для публикации сообщения в теме ActiveMQ.Scheduler.Management (поэтому первым аргументом, который вы передадите в stamppClient.subscribe, будет следующая строка: «/ topic / ActiveMQ .Scheduler.Management »). Сообщение, которое вы отправляете для начала просмотра, должно включать следующие заголовки:
{
reply-to: "/queue/browsedMessages",
AMQ_SCHEDULER_ACTION: "AMQ_SCHEDULER_ACTION_BROWSE"
}

(Примечание: очередь «browsedMessages» может называться как угодно - вы просто создаете очередь, которая будет получать копии любого сообщения, находящегося в настоящее время в планировщике ActiveMQ.)

  • Затем создайте еще одного клиента STOMP и подпишите его на эту очередь «browsedMessages». Он будет потреблять все сообщения, которые попадают в очередь browsedMessages, и как только он потребляет эту копию сообщения, которое вы хотите удалить (вы проверяете идентификатор сообщения, которое хотите удалить, проверяя заголовки для идентификатора корреляции, который вы отправлено вместе с сообщением, когда оно было запланировано), возьмите соответствующее свойство message-id (вы также найдете его в заголовках ответа) и отправьте его вместе как значение для «scheduleJobId» в заголовках нового сообщения. в ActiveMQ, который будет использоваться для удаления нужного сообщения из планировщика. Вот как будут выглядеть заголовки этого последующего запроса:
{
AMQ_SCHEDULER_ACTION : "REMOVE",
"scheduledJobId" : scheduledJobId
}
  • Приведенный ниже пример кода должен расширить ваш класс MessageProducer методами, которые позволят вам это реализовать:
MessageProducer.prototype._sendDeleteMessageRequest = function _sendDeleteMessageRequest(scheduledJobId){
  this._stompClient.publish(“/topic/ActiveMQ.Scheduler.Management”, null, {
    AMQ_SCHEDULER_ACTION : “REMOVE”,
    “scheduledJobId” : scheduledJobId
  });
};
MessageProducer.prototype._sendBrowseRequest = function _sendBrowseRequest(){
  this._stompClient.publish(“/topic/ActiveMQ.Scheduler.Management”, null, {
    “reply-to” : “/queue/browsedMessages”,
    AMQ_SCHEDULER_ACTION : “AMQ_SCHEDULER_ACTION_BROWSE”
  });
};
MessageProducer.prototype.deleteMessage = function deleteMessage(correlationId){
  var browserConsumer = new Stomp(‘127.0.0.1’, 61613, ‘user’, ‘pw’);
  var self = this;
  return new Promise(function(resolve, reject){
    browserConsumer.connect(function(sessionId){
    browserConsumer.subscribe(destination, function(body, headers) {
      if (headers[‘correlation-id’] === correlationId){
        browserConsumer.disconnect(function(){
          return resolve(headers[‘message-id’]);
        });
      }
    });
    process.nextTick(function(){
      self._sendBrowseRequest();
    });
  });
}).then(function(headers){
  self._sendDeleteMessageRequest(headers.scheduledJobId);
  });
};

Если весь этот процесс показался вам огромной, неоптимальной головной болью, не волнуйтесь - надежда есть.

Планирование и удаление с помощью HTTP и REST

Вот действительно простой и перспективный способ использовать HTTP для планирования сообщения, если у вас включен ActiveMQ’s REST API:

'use strict';
var request = require('request');
var Promise = require('bluebird');
var MessageScheduler = function MessageScheduler(){};
MessageScheduler.prototype.scheduleMessage = function scheduleMessage(messageToSend, delay){
  var promisifiedPostRequest = Promise.promisify(request.post);
  return promisifiedPostRequest(
    'http://localhost:8080/demo/message/queue1?type=queue', 
    {
      form : {
        body: JSON.stringify(notification),
        AMQ_SCHEDULED_DELAY : delay
    }
  })
  .then(function(response){
    return response.headers.messageid;
  });
};

Видите этот бит в блоке .then ()? Здесь мы можем получить доступ к идентификатору сообщения, который ActiveMQ присваивает запланированному сообщению, которое, кстати, имеет то же значение, что и selectedJobId сообщения.

В этом прелесть HTTP: он дает нам доступ к этому очень важному свойству в ответе на запрос, что означает, что нам не нужно отправлять запрос «просмотра», чтобы откопать его позже.

Теперь, когда у нас есть это значение, использовать его для удаления запланированного сообщения, которое оно идентифицирует, очень просто:

'use strict';
var Promise = require('bluebird');
var request = require('request');
var MessageDeleter = function MessageDeleter(messageID){
  var promisifiedPostRequest = Promise.promisify(request.post);
  return promisifiedPostRequest(
  'http://localhost:8080/demo/message/ActiveMQ.Scheduler.Management?type=topic', 
  {
    form: {
      AMQ_SCHEDULER_ACTION : 'REMOVE',
      scheduledJobId : scheduledJobId
    }
  }).then(function(response){
    //do any post-deletion processes you need to here
    return response;
  }
}

С REST это все.

Единственное серьезное ограничение на удаление сообщений (и это верно как для STOMP, так и для HTTP) заключается в том, что мы не получаем подтверждения от ActiveMQ, что сообщение было действительно удалено - просто запрос на удаление это сообщение было получено и обработано. Сообщение, предназначенное для удаления, могло быть даже доставлено потребителю до того, как сообщение, содержащее запрос на удаление, даже достигло очереди.

Тем не менее, это значительно приближает нас к наличию надежных, быстрых, основанных на узлах средств взаимодействия с возможностями планирования сообщений ActiveMQ.