В настоящее время я работаю над проектом, созданным с помощью Spring Integration 4.3.14, и мы решили попробовать использовать DSL, но у меня возникли проблемы с попыткой интегрировать различные подпотоки.
У меня определен следующий IntegrationFlow:
@Bean
public IntegrationFlow mainFlow() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel"))
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomething(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.channel("nullChannel")
.get();
}
Все вызываемые методы transform
и handle
недействительны и возвращают ненулевые значения. Основная причина, по которой мы выбрали этот подход, - наличие двух разных каналов для обработки ошибок в зависимости от того, в какой части потока они произошли, чтобы мы могли действовать соответствующим образом.
Тем не менее, когда я пытаюсь запустить этот код и вставляю запись в БД, и опросчик берет ее, она никогда не выходит за пределы первого шлюза. У меня есть только эти строки журнала:
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer : Performing transformation.
2018-06-06 11:43:58.864 INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer : Performing another transformation.
2018-06-06 11:43:58.848 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944 INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
Кажется очевидным, что сообщение действительно приходит на первый шлюз, но, очевидно, оно не передается на второй шлюз.
Во время запуска я вижу, что SI создает два подпотока (# 0 и # 1) и два канала для каждого (по одному для каждой операции, я думаю) с 1 подписчиком в каждом.
Я также попытался изменить определение на следующее:
@Bean
public IntegrationFlow getRecords() {
return IntegrationFlows
.from(
databaseSource(),
c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
.split()
.log()
.gateway(f -> f
.transform(Transformer::transform)
.transform(AnotherTransformer::transform),
e -> e
.errorChannel("transformErrorChannel")
.replyChannel("doThingsChannel"))
.get();
}
@Bean
public IntegrationFlow doThings() {
return IntegrationFlows
.from(
"doThingsChannel")
.gateway(f -> f
.<MyEntity>handle((p, h) -> this.doSomehting(p))
.<MyEntity>handle((p, h) -> this.doOtherThing(p)),
e -> e
.errorChannel("doErrorChannel"))
.get();
}
Но в конечном итоге возникла та же проблема, и установка replyChannel
на GatewayEndpointSpec
, и добавление явного потока .channel
в getRecords
после шлюза.