Часть 1: Обоснование и общий дизайн
Часть 2: Сделайте это просто, используя Spring Boot и JPA
Часть 3: Кластеризация и параллелизм

Время от времени у разработчика наступает момент, когда ему нужна очередь. В моем случае это была в основном ситуация, когда мне приходилось выполнять вызов какой-либо удаленной системы надежным образом, то есть с повторными попытками, если это необходимо. Поскольку повторные попытки должны происходить даже в случае перезапуска вызывающего процесса, это означает, что наличие ожидающих запросов вызова является обязательным.

Теперь доступно множество вариантов обмена сообщениями - брокеры сообщений, такие как ActiveMQ, RabbitMQ или даже Kafka (хотя они изначально не разработаны для этого варианта использования), а также более легкие решения с использованием таких библиотек, как Apache Camel или Spring Integration. Конечно, есть много других вариантов, но это те, с которыми я работал.

Конечно, я предполагаю, что многие из вас также реализовали постоянную очередь с комбинацией простой таблицы SQL с периодическим опросом, но если вы посмотрите в Интернете, это не считается хорошим решением для «серьезных» очередей, поэтому можно найти много сообщений с заголовком что-то вроде «Таблица SQL, поскольку очередь сообщений является анти-шаблоном». Проблемы в основном связаны с тем, насколько неэффективна база данных SQL для большой пропускной способности, особенно с гарантиями того, что несколько потребителей не выберут одно и то же сообщение в очереди. Базы данных SQL обычно не работают, когда записи очень недолговечны, как в случае с записями сообщений очереди, а блокировка записей, необходимая для предотвращения состояния гонки, также может значительно снизить производительность.

Но, по моему опыту, многим приложениям не нужна большая пропускная способность, а также, когда мы говорим «постановка в очередь», типичное приложение требует от механизма очередей гораздо большего, чем просто «структура данных FIFO». Обычно мне требуются следующие характеристики:

  • базовая обработка FIFO (first in - first out)
  • настойчивость - чтобы пережить перезапуск процесса
  • создание и потребление транзакционных сообщений - нам нужны гарантии того, что сообщение будет размещено и получено из очереди вместе с другими действиями базы данных по принципу «все или ничего» (без проблемы «двойной записи»)
  • повторные попытки с подключаемой политикой
  • запоминание причины неудачной обработки
  • поиск сообщений очереди по их статусу (ERROR, PENDING) или с помощью различных доменных фильтров (например, поиск всех сообщений очереди, которые имеют companyId=3345)
  • удаление сообщений с помощью различных доменных фильтров - когда недопустимые сообщения попали в очередь
  • изменение определенного приоритета сообщения - например. когда конкретное сообщение, запланированное для будущей обработки, хочет быть обработано как можно скорее
  • иногда задержка начального потребления

Просматривая все возможные решения для обмена сообщениями, вы часто обнаруживаете, что практически у всех из них есть некоторые из следующих проблем:

  • значительно снизить производительность при включении постоянства
  • иногда не предлагают создание и использование транзакционных сообщений, или они предлагают, но производительность также значительно снижается
  • содержимое очереди в основном непрозрачно - общие решения для обмена сообщениями обычно обрабатывают содержимое сообщения как двоичный массив (сериализованный любым способом, зависящим от приложения), но это обычно означает, что нет способа запросить эти сообщения по каким-либо полям, зависящим от домена
  • отложенная повторная попытка требует другого дополнительного механизма, что увеличивает сложность реализации
  • изменение приоритета или удаление определенных сообщений из очереди с использованием различных фильтров вообще невозможно (или, по крайней мере, сложно)
  • отсрочка первоначального потребления невозможна, потому что очередь обычно работает «обработать как можно скорее»

Принимая во внимание все указанные выше пожелания, а также то, что высокая пропускная способность обычно не вызывает особого беспокойства, я обычно получаю таблицу SQL как лучшее общее решение. А поскольку почти каждое приложение уже использует базу данных SQL, вы не сможете внедрить в свою систему еще одну технологию, что всегда хорошо.

Может быть, вам интересно, что означает «не такая большая пропускная способность»? Что ж, у меня почти всегда была достаточная пропускная способность, поэтому реальной необходимости в тестировании производительности не было, но была одна ситуация, когда мы должны были удовлетворить некоторые справедливые требования к пропускной способности, поэтому мы выбрали установку с одной виртуальной машиной с 8 ядрами, на которой размещались одна такая служба очередей, а на другой машине мы использовали «ненастроенную по-любому» БД PostgreSQL. В течение нескольких дней тестирования мы достигли около 700 мс / сек при использовании транзакции потребления на одно сообщение (обработка «ровно один раз») и около 1400 мсг / сек при наличии транзакции на пакет сообщений (обработка «хотя бы один раз» , поскольку в случае перезапуска некоторые сообщения могут быть повторно обработаны из-за того, что предыдущая транзакция не была зафиксирована). Кстати, потребление было связано не с увеличением некоторого счетчика памяти, а с реальным примером выполнения удаленного вызова внешней системы.

Я знаю, что это описание слишком расплывчато, чтобы его можно было считать точным, но все же эти цифры могут дать вам хотя бы некоторое представление о пропускной способности, о которой мы здесь говорим.

Общий эскиз реализации

Предлагаемый механизм требует 2 компонентов:

  • Несколько дополнительных столбцов SQL в таблице сообщений для сохранения состояния очереди
  • Потребительская работа, которая периодически опрашивает ожидающие сообщения и обрабатывает их

Состояние очереди состоит из следующих столбцов SQL:

  • status (не null) - перечисление значений NOT_ATTEMPTED, SUCCESS, ERROR; Я не очень доволен именем NOT_ATTEMPTED, поэтому, если вы найдете какое-то лучшее, просто используйте его здесь (изначально я думал что-то вроде PENDING, но на самом деле это не отражает того, что ожидающие сообщения также обычно находятся в состоянии ERROR, ожидая повторить попытку)
  • next_attempt_time (обнуляемый) - отметка времени следующей попытки обработки; должны быть проиндексированы по соображениям производительности; при заполнении этот столбец в основном отмечает сообщение как находящееся в состоянии ожидания, независимо от его статуса.
  • try_count (не ноль) - количество попыток обработки; полезна как сама информация, но также необходима для некоторых типов политик повтора.
  • last_attempt_time (обнуляемое) - время последней попытки, независимо от того, успешна она или нет; также необходимо для некоторых типов политик повтора
  • last_attempt_error_message (обнуляемый) - текстовая информация о последней неудачной попытке обработки; любой произвольный фрагмент текста, например сообщение об исключении или, возможно, даже дамп трассировки всего стека

Упомянутый next_attempt_time, пожалуй, самый важный столбец, служащий многим целям:

  • когда заполнено текущими / прошедшими отметками времени, помечает сообщение как находящееся в состоянии «ожидание», которое должно быть использовано как можно скорее
  • при заполнении будущих меток времени задерживает потребление сообщения до желаемого времени («отложенная» очередь)
  • если оно пусто, сообщение больше не должно обрабатываться (например, когда оно находится в состоянии УСПЕШНО или когда оно находится в состоянии ОШИБКА, но достигнут предел повторных попыток)
  • действует как поле «приоритет», так как задание опроса обычно выбирает сообщения, упорядоченные на основе этого значения - сообщения с более старой меткой времени будут обрабатываться первыми.
  • обновление этого столбца может обеспечить:
    - повторную попытку в случае сбоя обработки (просто установите желаемое время в будущем);
    - повторную отправку успешного сообщения (null - ›через некоторое время)
    - остановку обработка ожидающего сообщения (некоторое время - ›null)
    - изменение порядка обработки (какое-то время -› какое-то другое время)

Все эти столбцы могут быть помещены в любую существующую таблицу, которая представляет объект для постановки в очередь. В некоторых случаях сущность представляет собой саму обработку и не требуется после успешной обработки в очереди, поэтому в таких случаях задание потребителя может удалить такие записи таблицы после обработки, и статус УСПЕХ вообще не нужен. Иногда успешные сообщения временно хранятся для целей аудита, а затем удаляются с помощью некоторого механизма очистки. Так что не стесняйтесь адаптировать механизм под свои нужды.

Потребительская работа должна периодически выполнять следующие действия:

  • опросить ограниченное количество «ожидающих» сообщений путем запроса всех записей с next_attempt_time до текущего времени, упорядоченных по next_attempt_time; SQL-запрос будет выглядеть примерно так (без учета специфики БД):
SELECT * 
FROM some_table 
WHERE next_attempt_time < CURRENT_TIMESTAMP() 
ORDER BY next_attempt_time ASC 
LIMIT 200
  • для каждого опрошенного сообщения…
  • открытая транзакция БД
  • выполнить логику обработки
  • установить last_attempt_time на текущее время и увеличить счет_попыток
  • обработать результат обработки:
    - в случае успеха - установить для status значение SUCCESS и очистить next_attempt_time
    - в случае неудачи - установить status на ERROR, last_attempt_error_message на некоторое описание ошибки и next_attempt_time на некоторое время повторной попытки в будущем, если предел повторной попытки не был достигнут
  • закрыть транзакцию БД

Механизм планирования, используемый для выполнения этого задания потребителя, должен заботиться о том, чтобы задание не выполнялось одновременно с точки зрения всей системы.

В следующих постах этой мини-серии мы рассмотрим, как реализовать такой механизм на Java. Быть в курсе.