Архитектура нашей компании основана на шаблоне микросервисов, варианте сервис-ориентированной архитектуры (SOA), которая структурирует приложение как набор слабосвязанных сервисов. Для обмена сообщениями между службами (например, запуска действия) мы используем RabbitMQ в качестве брокера сообщений.
Эта проблема
У нас было несколько случаев, когда потребитель услуги не мог завершить свою логику из-за непредвиденной ошибки или недоступности зависимого ресурса, что вызывало отклонение сообщения, которое автоматически отправляет сообщение в DLX (обмен мертвыми письмами). Нам приходилось обрабатывать отклоненные сообщения вручную.
Если мы решили отклонить сообщение с истинным значением re-queue, оно будет немедленно доставлено потребителю, что приведет к очень высокой нагрузке на систему.
Нам нужно было повторить действие, перенаправив сообщение обратно в исходную очередь, но с задержкой.
Возможные решения
Я взял на себя эту задачу и исследовал, чтобы найти подходящее решение, которое соответствовало бы потоку и потребностям нашей компании, и это были варианты:
- Сочетание функции TTL сообщения и функции мертвого письма: путем объединения этих двух функций (наличие очереди и мертвой буквы в одном обмене с соответствующими ключами маршрутизации), отклоненные сообщения будут перемещаться из очереди в мертвую букву. и после истечения срока жизни сообщение будет перенаправлено в очередь и так далее, пока оно не будет успешно использовано. Руководство для этого можно найти здесь.
- Плагин сообщений с задержкой: когда сообщение отклоняется, оно перемещается в отложенный обмен и зависает до тех пор, пока не истечет время задержки, а затем потребитель задержки проверяет количество попыток повторения, и, если оно допустимо, мы возвращаемся в исходную очередь, и если это не так переходит в основную очередь DLX для ручных действий
Выбранный
Плагин RabbitMQ Delayed Message (RabbitMQ 3.5.3 и более поздние версии) добавляет в RabbitMQ новый тип обмена, при котором сообщения, маршрутизируемые этим обменом, могут задерживаться, если издатель добавляет задержку. заголовок сообщения.
шаги по реализации:
- Определите обмен задержкой кролика ЗАДЕРЖКА, установив и включив плагин « rabbitmq_delayed_message_exchange
- Определите обмен DELAY как очередь DLX для отклоненных сообщений.
channel.exchangeDeclare("DELAY", "x-delayed-message", { autoDelete: false, durable: true, arguments: { 'x-delayed-type': "fanout", 'dead-letter-exchange': "DLX.DEAD.LETTERS" } }))
- Добавьте следующие заголовки в метаданные отклоненного сообщения:
'x-retry': counter of retries attempts 'x-retry-limit': limit number of retry attempts 'x-delay': number of milliseconds the message should be delayed
Чтобы отложить сообщение, пользователь должен опубликовать сообщение со специальным заголовком, называемым x-delay, который принимает целое число, представляющее количество миллисекунд, на которое RabbitMQ должен задержать сообщение.
- Потребитель задержки проверяет, находится ли попытка повторения в пределах диапазона повторных попыток, и перенаправляет на исходный обмен с тем же ключом маршрутизации для другой попытки после увеличения счетчика заголовка «x-retry».
- Если предел повторных попыток превышен, он будет отклонен потребителем задержки и передан в очередь DLX для ручных действий и отладки.
- Добавить монитор для очереди DLX, чтобы отправлять уведомление, когда сообщение поступает в DLX после превышения количества попыток повторения.
Диаграмма потока:
Пример кода
const RETRY_LIMIT = 5 const DELAY_DEFAULT = 5000 const RETRY_ATTEMPT = 'x-retry' const DELAY_INTERVAL = 'x-delay' const X_DEATH = 'x-death' const EXCHANGE = 'exchange' const ROUTING_KEY = 'routing-keys' class RetriesConsumer extends BaseConsumer { constructor() { super('retries.queue') this.setHandler(this.retry.bind(this)) } shouldRetry(headers) { return headers && RETRY_ATTEMPT in headers && headers[RETRY_ATTEMPT] <= RETRY_LIMIT } async shovelMsg(exchange, routingKey, data, headers) { await publisherService.publish(exchange, routingKey, data, headers) } getDelayRetryInterval(retryCount) { return Math.pow(2, retryCount - 1) * DELAY_DEFAULT } async retry(msg) { try { const {metadata, data} = msg const {headers} = metadata const causeOfDeath = headers[X_DEATH][0] const exchange = causeOfDeath[EXCHANGE] const routingKey = causeOfDeath[ROUTING_KEY][0] if (this.shouldRetry(headers)) { const retries_attempt = headers[RETRY_ATTEMPT] const delayInterval = this.getDelayRetryInterval(retries_attempt) const retryConfig = {} retryConfig[RETRY_ATTEMPT] = Number(retries_attempt || 0) + 1 retryConfig[DELAY_INTERVAL] = delayInterval await this.shovelMsg(exchange, routingKey, data, retryConfig) return Promise.resolve() } return Promise.reject(e) } catch (e) { Logger.error(`Failed to re-process operation. Message: ${JSON.stringify(data)} Error: ${err}`) return Promise.reject(e) } } }
Резюме
Ручная обработка отклоненных сообщений плохо масштабируется. После того, как мы увидели, что это происходит все чаще и чаще, мы решили, что нам нужна автоматизация. Мы решили использовать механизм отложенной очереди, поскольку он предлагает инфраструктурное решение, которое достаточно хорошо отвечает нашим требованиям, и его легко добавлять и поддерживать.
От себя лично: мне нравится работать со службой Rabbitmq, и мне понравилось исследовать и находить подходящее решение для наших нужд.
Как всегда, прокомментируйте, если у вас есть предложения или отзывы.
Спасибо за прочтение!