Часть 1: Обоснование и общий дизайн
Часть 2: Сделайте это просто, используя Spring Boot и JPA
Часть 3: Кластеризация и параллелизм
Время от времени у разработчика наступает момент, когда ему нужна очередь. В моем случае это была в основном ситуация, когда мне приходилось выполнять вызов какой-либо удаленной системы надежным образом, то есть с повторными попытками, если это необходимо. Поскольку повторные попытки должны происходить даже в случае перезапуска вызывающего процесса, это означает, что наличие ожидающих запросов вызова является обязательным.
Теперь доступно множество вариантов обмена сообщениями - брокеры сообщений, такие как ActiveMQ, RabbitMQ или даже Kafka (хотя они изначально не разработаны для этого варианта использования), а также более легкие решения с использованием таких библиотек, как Apache Camel или Spring Integration. Конечно, есть много других вариантов, но это те, с которыми я работал.
Конечно, я предполагаю, что многие из вас также реализовали постоянную очередь с комбинацией простой таблицы SQL с периодическим опросом, но если вы посмотрите в Интернете, это не считается хорошим решением для «серьезных» очередей, поэтому можно найти много сообщений с заголовком что-то вроде «Таблица SQL, поскольку очередь сообщений является анти-шаблоном». Проблемы в основном связаны с тем, насколько неэффективна база данных SQL для большой пропускной способности, особенно с гарантиями того, что несколько потребителей не выберут одно и то же сообщение в очереди. Базы данных SQL обычно не работают, когда записи очень недолговечны, как в случае с записями сообщений очереди, а блокировка записей, необходимая для предотвращения состояния гонки, также может значительно снизить производительность.
Но, по моему опыту, многим приложениям не нужна большая пропускная способность, а также, когда мы говорим «постановка в очередь», типичное приложение требует от механизма очередей гораздо большего, чем просто «структура данных FIFO». Обычно мне требуются следующие характеристики:
- базовая обработка FIFO (first in - first out)
- настойчивость - чтобы пережить перезапуск процесса
- создание и потребление транзакционных сообщений - нам нужны гарантии того, что сообщение будет размещено и получено из очереди вместе с другими действиями базы данных по принципу «все или ничего» (без проблемы «двойной записи»)
- повторные попытки с подключаемой политикой
- запоминание причины неудачной обработки
- поиск сообщений очереди по их статусу (ERROR, PENDING) или с помощью различных доменных фильтров (например, поиск всех сообщений очереди, которые имеют
companyId=3345
) - удаление сообщений с помощью различных доменных фильтров - когда недопустимые сообщения попали в очередь
- изменение определенного приоритета сообщения - например. когда конкретное сообщение, запланированное для будущей обработки, хочет быть обработано как можно скорее
- иногда задержка начального потребления
Просматривая все возможные решения для обмена сообщениями, вы часто обнаруживаете, что практически у всех из них есть некоторые из следующих проблем:
- значительно снизить производительность при включении постоянства
- иногда не предлагают создание и использование транзакционных сообщений, или они предлагают, но производительность также значительно снижается
- содержимое очереди в основном непрозрачно - общие решения для обмена сообщениями обычно обрабатывают содержимое сообщения как двоичный массив (сериализованный любым способом, зависящим от приложения), но это обычно означает, что нет способа запросить эти сообщения по каким-либо полям, зависящим от домена
- отложенная повторная попытка требует другого дополнительного механизма, что увеличивает сложность реализации
- изменение приоритета или удаление определенных сообщений из очереди с использованием различных фильтров вообще невозможно (или, по крайней мере, сложно)
- отсрочка первоначального потребления невозможна, потому что очередь обычно работает «обработать как можно скорее»
Принимая во внимание все указанные выше пожелания, а также то, что высокая пропускная способность обычно не вызывает особого беспокойства, я обычно получаю таблицу SQL как лучшее общее решение. А поскольку почти каждое приложение уже использует базу данных SQL, вы не сможете внедрить в свою систему еще одну технологию, что всегда хорошо.
Может быть, вам интересно, что означает «не такая большая пропускная способность»? Что ж, у меня почти всегда была достаточная пропускная способность, поэтому реальной необходимости в тестировании производительности не было, но была одна ситуация, когда мы должны были удовлетворить некоторые справедливые требования к пропускной способности, поэтому мы выбрали установку с одной виртуальной машиной с 8 ядрами, на которой размещались одна такая служба очередей, а на другой машине мы использовали «ненастроенную по-любому» БД PostgreSQL. В течение нескольких дней тестирования мы достигли около 700 мс / сек при использовании транзакции потребления на одно сообщение (обработка «ровно один раз») и около 1400 мсг / сек при наличии транзакции на пакет сообщений (обработка «хотя бы один раз» , поскольку в случае перезапуска некоторые сообщения могут быть повторно обработаны из-за того, что предыдущая транзакция не была зафиксирована). Кстати, потребление было связано не с увеличением некоторого счетчика памяти, а с реальным примером выполнения удаленного вызова внешней системы.
Я знаю, что это описание слишком расплывчато, чтобы его можно было считать точным, но все же эти цифры могут дать вам хотя бы некоторое представление о пропускной способности, о которой мы здесь говорим.
Общий эскиз реализации
Предлагаемый механизм требует 2 компонентов:
- Несколько дополнительных столбцов SQL в таблице сообщений для сохранения состояния очереди
- Потребительская работа, которая периодически опрашивает ожидающие сообщения и обрабатывает их
Состояние очереди состоит из следующих столбцов SQL:
- status (не null) - перечисление значений NOT_ATTEMPTED, SUCCESS, ERROR; Я не очень доволен именем NOT_ATTEMPTED, поэтому, если вы найдете какое-то лучшее, просто используйте его здесь (изначально я думал что-то вроде PENDING, но на самом деле это не отражает того, что ожидающие сообщения также обычно находятся в состоянии ERROR, ожидая повторить попытку)
- next_attempt_time (обнуляемый) - отметка времени следующей попытки обработки; должны быть проиндексированы по соображениям производительности; при заполнении этот столбец в основном отмечает сообщение как находящееся в состоянии ожидания, независимо от его статуса.
- try_count (не ноль) - количество попыток обработки; полезна как сама информация, но также необходима для некоторых типов политик повтора.
- last_attempt_time (обнуляемое) - время последней попытки, независимо от того, успешна она или нет; также необходимо для некоторых типов политик повтора
- last_attempt_error_message (обнуляемый) - текстовая информация о последней неудачной попытке обработки; любой произвольный фрагмент текста, например сообщение об исключении или, возможно, даже дамп трассировки всего стека
Упомянутый next_attempt_time, пожалуй, самый важный столбец, служащий многим целям:
- когда заполнено текущими / прошедшими отметками времени, помечает сообщение как находящееся в состоянии «ожидание», которое должно быть использовано как можно скорее
- при заполнении будущих меток времени задерживает потребление сообщения до желаемого времени («отложенная» очередь)
- если оно пусто, сообщение больше не должно обрабатываться (например, когда оно находится в состоянии УСПЕШНО или когда оно находится в состоянии ОШИБКА, но достигнут предел повторных попыток)
- действует как поле «приоритет», так как задание опроса обычно выбирает сообщения, упорядоченные на основе этого значения - сообщения с более старой меткой времени будут обрабатываться первыми.
- обновление этого столбца может обеспечить:
- повторную попытку в случае сбоя обработки (просто установите желаемое время в будущем);
- повторную отправку успешного сообщения (null - ›через некоторое время)
- остановку обработка ожидающего сообщения (некоторое время - ›null)
- изменение порядка обработки (какое-то время -› какое-то другое время)
Все эти столбцы могут быть помещены в любую существующую таблицу, которая представляет объект для постановки в очередь. В некоторых случаях сущность представляет собой саму обработку и не требуется после успешной обработки в очереди, поэтому в таких случаях задание потребителя может удалить такие записи таблицы после обработки, и статус УСПЕХ вообще не нужен. Иногда успешные сообщения временно хранятся для целей аудита, а затем удаляются с помощью некоторого механизма очистки. Так что не стесняйтесь адаптировать механизм под свои нужды.
Потребительская работа должна периодически выполнять следующие действия:
- опросить ограниченное количество «ожидающих» сообщений путем запроса всех записей с next_attempt_time до текущего времени, упорядоченных по next_attempt_time; SQL-запрос будет выглядеть примерно так (без учета специфики БД):
SELECT * FROM some_table WHERE next_attempt_time < CURRENT_TIMESTAMP() ORDER BY next_attempt_time ASC LIMIT 200
- для каждого опрошенного сообщения…
- открытая транзакция БД
- выполнить логику обработки
- установить last_attempt_time на текущее время и увеличить счет_попыток
- обработать результат обработки:
- в случае успеха - установить для status значение SUCCESS и очистить next_attempt_time
- в случае неудачи - установить status на ERROR, last_attempt_error_message на некоторое описание ошибки и next_attempt_time на некоторое время повторной попытки в будущем, если предел повторной попытки не был достигнут - закрыть транзакцию БД
Механизм планирования, используемый для выполнения этого задания потребителя, должен заботиться о том, чтобы задание не выполнялось одновременно с точки зрения всей системы.
В следующих постах этой мини-серии мы рассмотрим, как реализовать такой механизм на Java. Быть в курсе.