Spring интеграция dsl-буфера

У меня есть требование, когда мне нужно хранить / буферизовать сообщения, полученные по каналу, и сохранять в базе данных в зависимости от количества сообщений или тайм-аута, что означает отсутствие сообщений в течение 1 минуты. Есть ли способ добиться этого в весенней интеграции

IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(connectionFactory)
                    .destination(sourceQueue))
                .transform(someTransform, "transform")
                .handle(someService, "save")
                .get();

person nagendra    schedule 15.11.2018    source источник


Ответы (1)


Существует оператор .aggregate(), основанный на реализации Aggregator EI-шаблона.

Его можно настроить с помощью JdbcMessageStore для буферизации сообщений и сохранения их в БД.

Вы можете удерживать их там до тех пор, пока не возникнет какое-либо состояние через ReleaseStrategy (в зависимости от каждого поступившего сообщения) или освободить их из-за group timeout.

Если вы не заинтересованы в том, чтобы все они впоследствии были единым агрегированным сообщением, вы можете рассмотреть возможность использования SimpleMessageGroupProcessor, которое просто производит Collection<Message<?>> и выполняет итерацию по ним для отправки на выход одно за другим.

Дополнительную информацию об агрегаторе см. В Справочном руководстве: https://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#aggregator.

person Artem Bilan    schedule 15.11.2018
comment
Мы хотим накапливать сообщение без correlationId, мы видим следующую ошибку Caused by: java.lang.IllegalStateException: Null correlation not allowed. Maybe the CorrelationStrategy is failing? с этим изменением .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(group -> group.size() > 2)) - person nagendra; 15.11.2018
comment
Вы можете выполнить статическую корреляциюKey: .correlationStrategy(m -> 1). Также вам необходимо убедиться, что expireGroupsUponCompletion(true). - person Artem Bilan; 15.11.2018
comment
Спасибо .aggregate(aggregatorSpec -> aggregatorSpec.correlationStrategy(m -> 1).expireGroupsUponCompletion(true).releaseStrategy(new TimeoutCountSequenceSizeReleaseStrategy(2, 100))) работал - person nagendra; 15.11.2018
comment
вышеупомянутое не работает без .expireGroupsUponTimeout(true).groupTimeout(2000) Нам действительно нужно снова вызвать groupTimeout, даже если мы используем TimeoutCountSequenceSizeReleaseStrategy - person nagendra; 16.11.2018
comment
TimeoutCountSequenceSizeReleaseStrategy выполняет свою логику только тогда, когда сообщение поступает в агрегатор. Смысл groupTimeout() иметь некоторую фоновую задачу, когда нет входящего сообщения в течение определенного времени. - person Artem Bilan; 16.11.2018