Потоковые события и триггер на основе правил

Учитывая Event-A, Event-B, Event-C, которые поступают (возможно, не по порядку) в течение нескольких дней друг от друга, я хочу запустить обработку для создания производной Event-ABC, как только я узнаю, что у меня есть все события в наборе.

События сгруппированы по userId/sessionId

В настоящее время я читаю все события из одной очереди, записываю в базу данных и обновляю метаданные, указывая, какие события были записаны. Как только метаданные содержат все события, основанные на правиле, я запускаю обработку агрегации. У этого подхода есть некоторые проблемы с производительностью из-за того, что обработчики очередей потенциально могут использовать один и тот же ключ при обработке событий, принадлежащих одной и той же группе, поэтому я ищу альтернативы.

То, что я хотел бы, - это более мелкозернистая программная маршрутизация и организация очереди событий на основе их userId/sessionId для обработки. Я думаю, что то, что я пытаюсь сделать, чем-то похоже на поиск событий.

Я искал, может ли Akka помочь с проблемой такого типа. С актером на userId/sessionId это уменьшит ненужный параллелизм и будет содержать логику триггера внутри актера. Меня беспокоят потенциально большие требования к памяти при использовании такого количества Актеров.


person smashbourne    schedule 01.08.2016    source источник


Ответы (2)


То, что вы описываете, больше похоже на Saga или Process Manager, чем на Event Sourcing. Вам нужно что-то, что обрабатывает несколько сообщений, а затем реагирует, как только спецификация будет удовлетворена.

Akka, безусловно, может справиться с этим. С Akka вы можете создать актера для каждого ключа, а затем перенаправлять сообщения отдельным актерам, когда вы их получаете. Я бы не слишком беспокоился о проблемах с памятью, поскольку системы Актеров должны справляться с тысячами и тысячами Актеров. Я думаю, вам нужно измерять производительность любого решения, к которому вы пришли.

Вам также необходимо подумать о том, как вы справляетесь со сбоем серверов — если вы храните все в памяти, вы рискуете потерять свои саги при сбое серверов. Это может быть или не быть проблемой в зависимости от ваших требований (т.е. если вы можете восстановиться после этого). Вы можете заглянуть в Akka Persistence, если это важно учитывать.

person tomliversidge    schedule 01.08.2016

У этого подхода есть некоторые проблемы с производительностью из-за того, что обработчики очередей потенциально могут использовать один и тот же ключ при обработке событий, принадлежащих одной и той же группе, поэтому я ищу альтернативы.

Отказ от ответственности: я не уверен, что понимаю, что вы здесь описываете, поэтому приведенное ниже решение может не подойти.

Я думаю, что то, что я пытаюсь сделать, чем-то похоже на поиск событий.

Да, ваше описание очень похоже на источник события process manager.

Обработчик событий (у вас может быть по одному для каждого из типов событий или один обработчик, который подписывается на все три) получает событие.

Из информации userId/userSession он вычисляет уникальный идентификатор для этого экземпляра вашего процесса. Подумайте о хэше или именованном uuid, построенном на основе уникального идентификатора процесса.

Загрузить текущее состояние процесса, соответствующего идентификатору. Это структура данных, отслеживающая, какие события были замечены ранее. Это может быть просто поток событий.

apply текущее событие в состояние процесса. Ожидается, что «применить» не будет работать, если это событие уже было замечено — ваши сообщения о событиях имеют уникальные идентификаторы, верно?

Сохраните обновленное состояние процесса. На этом транзакция заканчивается.

Теперь наблюдайте за состоянием процесса — вы можете сделать это сразу в обработчике событий или в асинхронном процессе. Если процесс "готов", то глагол генерировать Event-ABC.

Вышеприведенная схема соответствует общему шаблону, когда у вас есть диспетчеры процессов, которые отслеживают состояние запущенного процесса, но запускают бизнес-логику, выполняя команды для соответствующего агрегата.

В более простом дизайне вы можете объединить «агрегат» и «процесс». Базовый шаблон тот же — обработчик события вычисляет идентификатор агрегата, загружает его и вызывает команду обработки события. Агрегат обновляет свое состояние информацией, содержащейся в событии, и записывает это изменение состояния в свою историю. Если все необходимые события учтены, агрегат также записывает Event-ABC в свою собственную историю.

person VoiceOfUnreason    schedule 02.08.2016