Как барьерный компонент работает в весенней интеграции?

Я пытаюсь понять следующий пример: https://github.com/spring-projects/spring-integration-samples/tree/d8e71c687e86e7a7e35d515824832a92df9d4638/basic/barrier

и перепишите его с помощью java DSL.

Прежде всего, я хочу понять, что происходит в этом примере. Конечно, я читал объяснение с github но я все еще не понимаю. Я спрашиваю здесь, потому что знаю, что руководители группы SI (авторы этого примера) отвечают на все вопросы с меткой Spring-integration.

конфигурация сервера:

<int-http:inbound-gateway request-channel="receiveChannel"
                            path="/postGateway"
                            error-channel="errorChannel"
                            supported-methods="POST"/>

    <int-http:inbound-gateway request-channel="createPayload"
                            path="/getGateway"
                            error-channel="errorChannel"
                            supported-methods="GET"/>

    <int:transformer input-channel="createPayload" output-channel="receiveChannel" expression="'A,B,C'" />

    <int:channel id="receiveChannel" />

    <int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
        <int:header name="ackCorrelation" expression="headers['id']" />
    </int:header-enricher>

    <int:publish-subscribe-channel id="processChannel" />

    <int:chain input-channel="processChannel" order="1">
        <int:header-filter header-names="content-type, content-length" />
        <int:splitter delimiters="," />
        <int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
            exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
            confirm-ack-channel="confirmations"
            confirm-nack-channel="confirmations"
            return-channel="errorChannel"
            confirm-correlation-expression="#this"/>
    </int:chain>

    <!-- Suspend the HTTP thread until the publisher confirms are asynchronously received -->

    <int:barrier id="barrier" input-channel="processChannel" order="2"
        correlation-strategy-expression="headers['ackCorrelation']"
        output-channel="transform" timeout="10000" />

    <int:transformer input-channel="transform" expression="payload[1]" />

    <!-- Aggregate the publisher confirms and send the result to the barrier release channel -->

    <int:chain input-channel="confirmations" output-channel="release">
        <int:header-filter header-names="replyChannel, errorChannel" />
        <int:service-activator expression="payload" /> <!-- INT-3791; use s-a to retain ack header -->
        <int:aggregator>
            <bean class="org.springframework.integration.samples.barrier.AckAggregator" />
        </int:aggregator>
    </int:chain>

    <int:channel id="release" />

    <int:outbound-channel-adapter channel="release" ref="barrier.handler" method="trigger" />

    <!-- Consumer -> nullChannel -->

    <int-amqp:inbound-channel-adapter channel="nullChannel"
        queue-names="barrier.sample.queue"
        connection-factory="rabbitConnectionFactory" />

    <!-- Infrastructure -->

    <rabbit:queue name="barrier.sample.queue" auto-delete="true" />

    <rabbit:direct-exchange name="barrier.sample.exchange" auto-delete="true">
        <rabbit:bindings>
            <rabbit:binding queue="barrier.sample.queue" key="barrier.sample.key" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

Насколько я понимаю, есть 3 стороны:

  • клиент
  • сервер
  • rabbitMq

Клиент отправляет A,B,C на сервер

RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class);
String request = "A,B,C";
System.out.println("\n\n++++++++++++ Sending: " + request + " ++++++++++++\n");
String reply = requestGateway.echo(request);

Сервер принимает этот запрос:

<int-http:inbound-gateway request-channel="receiveChannel"
                        path="/postGateway"
                        error-channel="errorChannel"
                        supported-methods="POST"

затем в заголовки сообщений добавляются некоторые значения:

<int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
        <int:header name="ackCorrelation" expression="headers['id']" />
    </int:header-enricher>

некоторая обработка:

<int:chain input-channel="processChannel" order="1">
        <int:header-filter header-names="content-type, content-length" />
        <int:splitter delimiters="," />
        <int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
            exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
            confirm-ack-channel="confirmations"
            confirm-nack-channel="confirmations"
            return-channel="errorChannel"
            confirm-correlation-expression="#this"/>
    </int:chain>

Если я не ошибаюсь - сообщение разделено на ,, поэтому у нас есть 3 сообщения A B и C и отправляем на обмен rabbitMq barrier.sample.exchange с keybarrier.sample.key

Вот так. Дальнейшие шаги для меня непонятны. Не могли бы вы уточнить?

P.S.

Я понимаю, что мы хотим дождаться всех сообщений ACK от rabbit, чтобы убедиться, что сообщения мы доставили кролику, и когда мы получим все ACK от rabbit, мы хотим ответить клиенту. Но я не понимаю определения компонентов SI, которые приводят к такому результату. Не могли бы вы предоставить какую-либо схему / изображение / объяснение, как это происходит?

Более конкретные вопросы:

Зачем нужны обогащающие заголовки и как мы это делаем?

Какова цель receiveChannel? - глупый вопрос.

Где 2 http сервиса - /postGateway и /getGateway. /postGateway используется для получения сообщения от клиента, разделения на части, отправки кролику, получения подтверждений и последующего ответа клиенту. Но какова цель /getGateway?

P.S.

На данный момент мое объявление потока выглядит так:

    @Bean
public IntegrationFlow integrationFlow() {
    return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(String.class))
            .enrich(enricherSpec -> {
                enricherSpec.header("correlationId", 1); //or ackCorrelationId ?
            })
            .split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
            .log()
            //.barrier(1000L) is it correct place for barrier?
            .log()
            .handle(Amqp.outboundAdapter(amqpTemplate())
                    .exchangeName("barrierExchange")
                    .routingKey("barrierKey"))
            .get();
}

Я не знаю, как использовать здесь барьер.

Со стороны сервера все частично работает - я вижу сообщения в очереди (в веб-интерфейсе rabbitMq), но со стороны клиента я начал видеть следующую трассировку стека:

2019-08-28 22:38:43.432 ERROR 12936 --- [ask-scheduler-8] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=6, headers={id=36781a7f-3d4f-e17d-60e6-33450c9307e4, timestamp=1567021122424}]
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
    at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
    at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
    at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
    at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
    ... 30 more

person gstackoverflow    schedule 28.08.2019    source источник


Ответы (1)


Это довольно просто; непонятно, чего ты не понимаешь.

  • получаем запрос от http
  • мы добавляем корреляционный заголовок к сообщению (заголовок обогащен)
  • мы отправляем 3 сообщения rabbitmq
  • нам нужно дождаться подтверждений - поэтому поток HTTP приостановлен в барьере
  • асинхронно возвращаются 3 подтверждения
  • мы объединяем ответы в одно сообщение (используя корреляционный заголовок)
  • мы отправляем это сообщение на барьер, который освобождает поток HTTP

receiveChannel - это входной канал для расширения заголовка.

Вам нужно добавить поток, чтобы получать подтверждения, агрегировать и запускать барьер.

person Gary Russell    schedule 28.08.2019
comment
Вам нужно добавить еще один поток, чтобы получать подтверждения, агрегировать и запускать барьер. - person Gary Russell; 28.08.2019
comment
Ok. спасибо - изображение высокого уровня четкое, но все еще есть несколько вопросов. 1. Начнем с обогащения. Как видите, мой enricher жестко запрограммирован и его нужно как-то переписать. Верно ли, что сообщения A, B и C должны иметь одинаковый идентификатор correlationId (или ackCorrelationId?). 2. Насколько я понимаю, мне нужно каким-то образом инициализировать барьер с количеством разделенных сообщений. Я понятия не имею, как это сделать - person gstackoverflow; 28.08.2019
comment
Я также обновил последнюю часть темы - посмотрите, пожалуйста. - person gstackoverflow; 28.08.2019
comment
Идентификатор корреляции должен быть уникальным - поэтому я использовал заголовок id для идентификатора корреляции в сообщении до разделения. Да, заголовок будет скопирован в разделы. Нет; агрегатор обрабатывает все, что разделитель добавляет информацию о последовательности в разделы, и стратегия выпуска по умолчанию использует ее, чтобы решить, когда группа будет завершена. - person Gary Russell; 28.08.2019
comment
>I also updated last part of the topic - Зачем вообще нужен барьер? Ваш обработчик, похоже, не делает что-то, что генерирует асинхронные сообщения, которых вам нужно ждать. - person Gary Russell; 28.08.2019
comment
Мне нужен барьер, потому что я хочу узнать, как он работает, и пытаюсь повторить ваш образец, но через DSL. Я скоро начну писать второй поток, но сначала хочу разобраться в других вещах. - person gstackoverflow; 28.08.2019
comment
Мне очень жаль, но я не понимаю ваш комментарий, связанный с идентификатором корреляции. Насколько уникальным это должно быть? должно ли оно быть одинаковым для всех частей (A, B, C)?. Перед разделением сообщения вы сохраняете идентификатор в заголовке и называете его ackCorellationId. - person gstackoverflow; 28.08.2019
comment
Барьер не будет работать, пока не появится поток асинхронных ответов; что-то должно вызвать триггер барьера, чтобы освободить поток. Входящее сообщение имеет заголовок id, который всегда уникален; поэтому я использовал его в качестве заголовка, в котором я хочу сопоставить подтверждения, возвращаемые кроликом, для входящего сообщения - мы могли бы иметь дело с несколькими входящими сообщениями одновременно. Предлагаю вам прочитать об агрегаторе (и сплиттер), чтобы понять больше. - person Gary Russell; 28.08.2019
comment
Барьер выдвинут; было бы лучше сначала разобраться в более основных компонентах. - person Gary Russell; 28.08.2019
comment
Я просто исследую все образцы из базовой папки один за другим. Я уже потратил много времени (своего и даже вашего) на то, чтобы разобраться, и мне было бы жаль переходить на другой пример сейчас. Я уверен, что смогу разобраться с вашей помощью) По крайней мере, я знаком с синхронизаторами java. Это должно быть так - person gstackoverflow; 28.08.2019
comment
Я думаю, мне нужно сначала завершить синхронизацию (без нашего барьера) - person gstackoverflow; 28.08.2019
comment
Таким образом, все разделенные сообщения, которые были разделены из одного и того же клиентского сообщения, будут иметь ** одинаковый ** (без каких-либо постфиксов) correlationId? (True или false), как я могу добиться этого с помощью DSl? - person gstackoverflow; 28.08.2019
comment
Правильно - разделитель просто скопирует заголовки из предварительно разделенного сообщения (из обогащающего). Больше нечего делать, кроме как реализовать поток агрегатора - вы не можете заставить работать половину этого приложения без другой половины. - person Gary Russell; 28.08.2019
comment
текущая версия потока ничего не возвращает клиенту - последняя трассировка стека из темы - как это исправить? Как реализовать enricher для копирования заголовка идентификатора с помощью DSL? - person gstackoverflow; 28.08.2019
comment
Более того, строка из xml config ‹int: transformer input-channel = createPayload output-channel = receiveChannel expression = 'A, B, C' /› выглядит подозрительно, поскольку содержит A, B, C - person gstackoverflow; 28.08.2019
comment
Вы должны включить подтверждения в фабрике соединений и адаптере - см. XML. См. connectionFactory.setPublisherConfirms(true); в примере приложения. В современных версиях загрузки это можно сделать в свойствах приложения. - person Gary Russell; 28.08.2019
comment
Это только для второго шлюза (так что вы можете выполнить GET из браузера). Для первого шлюза. A, B, C отправляются в POST со стороны клиента (адаптер HTTP через шлюз обмена сообщениями). Здесь администраторам не нравятся расширенные комментарии к ответам - см. Избегайте расширенных обсуждений в комментариях выше. - person Gary Russell; 29.08.2019
comment
Я уже спрашивал о цели «второй конечной точки» в теме. Можем ли мы продолжить обсуждение в чате? - person gstackoverflow; 29.08.2019
comment
Позвольте нам продолжить это обсуждение в чате. - person Gary Russell; 29.08.2019