Шлюз не устанавливает заголовок replyChannel

В настоящее время я работаю над проектом, созданным с помощью Spring Integration 4.3.14, и мы решили попробовать использовать DSL, но у меня возникли проблемы с попыткой интегрировать различные подпотоки.

У меня определен следующий IntegrationFlow:

@Bean
public IntegrationFlow mainFlow() {
    return IntegrationFlows
            .from(
                    databaseSource(),
                    c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
            .split()
            .log()
            .gateway(f -> f
                            .transform(Transformer::transform)
                            .transform(AnotherTransformer::transform),
                    e -> e
                            .errorChannel("transformErrorChannel"))
            .gateway(f -> f
                            .<MyEntity>handle((p, h) -> this.doSomething(p))
                            .<MyEntity>handle((p, h) -> this.doOtherThing(p)),
                    e -> e
                            .errorChannel("doErrorChannel"))
            .channel("nullChannel")
            .get();
}

Все вызываемые методы transform и handle недействительны и возвращают ненулевые значения. Основная причина, по которой мы выбрали этот подход, - наличие двух разных каналов для обработки ошибок в зависимости от того, в какой части потока они произошли, чтобы мы могли действовать соответствующим образом.

Тем не менее, когда я пытаюсь запустить этот код и вставляю запись в БД, и опросчик берет ее, она никогда не выходит за пределы первого шлюза. У меня есть только эти строки журнала:

2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@1863292e
2018-06-06 11:43:58.864  INFO 6492 --- [ask-scheduler-1] c.e.transformation.Transformer           : Performing transformation.
2018-06-06 11:43:58.864  INFO 6492 --- [ask-scheduler-1] c.e.transformation.AnotherTransformer    : Performing another transformation. 
2018-06-06 11:43:58.848  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : started org.springframework.integration.gateway.GatewayProxyFactoryBean@55d1f065
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : stopped org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] ProxyFactoryBean$MethodInvocationGateway : started org.springframework.integration.gateway.GatewayProxyFactoryBean$MethodInvocationGateway@433a796
2018-06-06 11:43:58.944  INFO 6492 --- [ask-scheduler-1] o.s.i.gateway.GatewayProxyFactoryBean    : started org.springframework.integration.gateway.GatewayProxyFactoryBean@f9a5e3f

Кажется очевидным, что сообщение действительно приходит на первый шлюз, но, очевидно, оно не передается на второй шлюз.

Во время запуска я вижу, что SI создает два подпотока (# 0 и # 1) и два канала для каждого (по одному для каждой операции, я думаю) с 1 подписчиком в каждом.

Я также попытался изменить определение на следующее:

    @Bean
public IntegrationFlow getRecords() {
    return IntegrationFlows
            .from(
                    databaseSource(),
                    c -> c.poller(Pollers.fixedDelay(5000).transactional().get()))
            .split()
            .log()
            .gateway(f -> f
                            .transform(Transformer::transform)
                            .transform(AnotherTransformer::transform),
                    e -> e
                            .errorChannel("transformErrorChannel")
                            .replyChannel("doThingsChannel"))
            .get();
}

@Bean
public IntegrationFlow doThings() {
    return IntegrationFlows
            .from(
                    "doThingsChannel")
            .gateway(f -> f
                            .<MyEntity>handle((p, h) -> this.doSomehting(p))
                            .<MyEntity>handle((p, h) -> this.doOtherThing(p)),
                    e -> e
                            .errorChannel("doErrorChannel"))
            .get();
}

Но в конечном итоге возникла та же проблема, и установка replyChannel на GatewayEndpointSpec, и добавление явного потока .channel в getRecords после шлюза.


person gnzlrm    schedule 06.06.2018    source источник


Ответы (1)


Я только что выполнил этот тестовый пример в проекте Spring Integration Java DSL:

@Test
public void testGateways() {
    IntegrationFlow flow = f -> f
            .gateway(sf -> sf
                    .transform(p -> "foo#" + p)
                    .transform(p -> "bar#" + p))
            .gateway(sf -> sf
                    .handle((p, h) -> "handle1:" + p)
                    .handle((p, h) -> "handle2:" + p))
            .handle(System.out::println);

    IntegrationFlowRegistration flowRegistration = this.integrationFlowContext.registration(flow).register();

    flowRegistration.getInputChannel()
            .send(new GenericMessage<>("test"));

    flowRegistration.destroy();
}

Мой вывод такой:

GenericMessage [payload=handle2:handle1:bar#foo#test, headers={id=ae09df5c-f63e-4b68-d73c-29b85f3689a8, timestamp=1528314852110}]

Итак, оба шлюза работают должным образом, и применяются все преобразователи и обработчики. Плюс результат последнего шлюза опрашивается в основной поток для последнего System.out шага.

Не уверен, что происходит в вашем случае: только идея, что ваш .transform(AnotherTransformer::transform) не возвращает значение или что-то еще там происходит.

По поводу варианта replyChannel. Это не куда отправлять результат шлюза. Вот где ждать ответа:

/**
 * Specify the channel from which reply messages will be received; overrides the
 * encompassing gateway's default reply channel.
 * @return the channel name.
 */
String replyChannel() default "";
person Artem Bilan    schedule 06.06.2018
comment
Да, ты прав. Фактически, преобразователь действительно возвращает значение; но он не соответствует типу, объявленному в первом .handle методе второго шлюза, поскольку AnotherTransformer возвращал MyEntity.Builder вместо MyEntity, и сообщение автоматически отбрасывалось. Спасибо за вашу помощь! - person gnzlrm; 07.06.2018