Архитектура нашей компании основана на шаблоне микросервисов, варианте сервис-ориентированной архитектуры (SOA), которая структурирует приложение как набор слабосвязанных сервисов. Для обмена сообщениями между службами (например, запуска действия) мы используем RabbitMQ в качестве брокера сообщений.

Эта проблема

У нас было несколько случаев, когда потребитель услуги не мог завершить свою логику из-за непредвиденной ошибки или недоступности зависимого ресурса, что вызывало отклонение сообщения, которое автоматически отправляет сообщение в DLX (обмен мертвыми письмами). Нам приходилось обрабатывать отклоненные сообщения вручную.

Если мы решили отклонить сообщение с истинным значением re-queue, оно будет немедленно доставлено потребителю, что приведет к очень высокой нагрузке на систему.

Нам нужно было повторить действие, перенаправив сообщение обратно в исходную очередь, но с задержкой.

Возможные решения

Я взял на себя эту задачу и исследовал, чтобы найти подходящее решение, которое соответствовало бы потоку и потребностям нашей компании, и это были варианты:

  • Сочетание функции TTL сообщения и функции мертвого письма: путем объединения этих двух функций (наличие очереди и мертвой буквы в одном обмене с соответствующими ключами маршрутизации), отклоненные сообщения будут перемещаться из очереди в мертвую букву. и после истечения срока жизни сообщение будет перенаправлено в очередь и так далее, пока оно не будет успешно использовано. Руководство для этого можно найти здесь.
  • Плагин сообщений с задержкой: когда сообщение отклоняется, оно перемещается в отложенный обмен и зависает до тех пор, пока не истечет время задержки, а затем потребитель задержки проверяет количество попыток повторения, и, если оно допустимо, мы возвращаемся в исходную очередь, и если это не так переходит в основную очередь DLX для ручных действий

Выбранный

Плагин RabbitMQ Delayed Message (RabbitMQ 3.5.3 и более поздние версии) добавляет в RabbitMQ новый тип обмена, при котором сообщения, маршрутизируемые этим обменом, могут задерживаться, если издатель добавляет задержку. заголовок сообщения.

шаги по реализации:

  • Определите обмен задержкой кролика ЗАДЕРЖКА, установив и включив плагин « rabbitmq_delayed_message_exchange
  • Определите обмен DELAY как очередь DLX для отклоненных сообщений.
channel.exchangeDeclare("DELAY", "x-delayed-message", {
    autoDelete: false,
    durable: true, 
    arguments: {
        'x-delayed-type':  "fanout", 
        'dead-letter-exchange': "DLX.DEAD.LETTERS"
    }
}))
  • Добавьте следующие заголовки в метаданные отклоненного сообщения:
'x-retry': counter of retries attempts
'x-retry-limit': limit number of retry attempts
'x-delay': number of milliseconds the message should be delayed

Чтобы отложить сообщение, пользователь должен опубликовать сообщение со специальным заголовком, называемым x-delay, который принимает целое число, представляющее количество миллисекунд, на которое RabbitMQ должен задержать сообщение.

  • Потребитель задержки проверяет, находится ли попытка повторения в пределах диапазона повторных попыток, и перенаправляет на исходный обмен с тем же ключом маршрутизации для другой попытки после увеличения счетчика заголовка «x-retry».
  • Если предел повторных попыток превышен, он будет отклонен потребителем задержки и передан в очередь DLX для ручных действий и отладки.
  • Добавить монитор для очереди DLX, чтобы отправлять уведомление, когда сообщение поступает в DLX после превышения количества попыток повторения.

Диаграмма потока:

Пример кода

const RETRY_LIMIT = 5
const DELAY_DEFAULT = 5000
const RETRY_ATTEMPT = 'x-retry'
const DELAY_INTERVAL = 'x-delay'
const X_DEATH = 'x-death'
const EXCHANGE = 'exchange'
const ROUTING_KEY = 'routing-keys'
class RetriesConsumer extends BaseConsumer {
  constructor() {
    super('retries.queue')
    this.setHandler(this.retry.bind(this))
  }
shouldRetry(headers) {
    return headers && RETRY_ATTEMPT in headers && headers[RETRY_ATTEMPT] <= RETRY_LIMIT
  }
async shovelMsg(exchange, routingKey, data, headers) {
    await publisherService.publish(exchange, routingKey, data, headers)
  }
getDelayRetryInterval(retryCount) {
    return Math.pow(2, retryCount - 1) * DELAY_DEFAULT
  }
async retry(msg) {
    try {
      const {metadata, data} = msg
      const {headers} = metadata
      const causeOfDeath = headers[X_DEATH][0]
      const exchange = causeOfDeath[EXCHANGE]
      const routingKey = causeOfDeath[ROUTING_KEY][0]
if (this.shouldRetry(headers)) {
        const retries_attempt = headers[RETRY_ATTEMPT]
        const delayInterval = this.getDelayRetryInterval(retries_attempt)
const retryConfig = {}
        retryConfig[RETRY_ATTEMPT] = Number(retries_attempt || 0) + 1
        retryConfig[DELAY_INTERVAL] = delayInterval
await this.shovelMsg(exchange, routingKey, data, retryConfig)
        return Promise.resolve()
      }
return Promise.reject(e)
    } catch (e) {
      Logger.error(`Failed to re-process operation. Message: ${JSON.stringify(data)} Error: ${err}`)
      return Promise.reject(e)
    }
  }
}

Резюме

Ручная обработка отклоненных сообщений плохо масштабируется. После того, как мы увидели, что это происходит все чаще и чаще, мы решили, что нам нужна автоматизация. Мы решили использовать механизм отложенной очереди, поскольку он предлагает инфраструктурное решение, которое достаточно хорошо отвечает нашим требованиям, и его легко добавлять и поддерживать.

От себя лично: мне нравится работать со службой Rabbitmq, и мне понравилось исследовать и находить подходящее решение для наших нужд.

Как всегда, прокомментируйте, если у вас есть предложения или отзывы.

Спасибо за прочтение!