Я пытаюсь понять следующий пример: https://github.com/spring-projects/spring-integration-samples/tree/d8e71c687e86e7a7e35d515824832a92df9d4638/basic/barrier
и перепишите его с помощью java DSL.
Прежде всего, я хочу понять, что происходит в этом примере. Конечно, я читал объяснение с github но я все еще не понимаю. Я спрашиваю здесь, потому что знаю, что руководители группы SI (авторы этого примера) отвечают на все вопросы с меткой Spring-integration.
конфигурация сервера:
<int-http:inbound-gateway request-channel="receiveChannel"
path="/postGateway"
error-channel="errorChannel"
supported-methods="POST"/>
<int-http:inbound-gateway request-channel="createPayload"
path="/getGateway"
error-channel="errorChannel"
supported-methods="GET"/>
<int:transformer input-channel="createPayload" output-channel="receiveChannel" expression="'A,B,C'" />
<int:channel id="receiveChannel" />
<int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
<int:header name="ackCorrelation" expression="headers['id']" />
</int:header-enricher>
<int:publish-subscribe-channel id="processChannel" />
<int:chain input-channel="processChannel" order="1">
<int:header-filter header-names="content-type, content-length" />
<int:splitter delimiters="," />
<int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
confirm-ack-channel="confirmations"
confirm-nack-channel="confirmations"
return-channel="errorChannel"
confirm-correlation-expression="#this"/>
</int:chain>
<!-- Suspend the HTTP thread until the publisher confirms are asynchronously received -->
<int:barrier id="barrier" input-channel="processChannel" order="2"
correlation-strategy-expression="headers['ackCorrelation']"
output-channel="transform" timeout="10000" />
<int:transformer input-channel="transform" expression="payload[1]" />
<!-- Aggregate the publisher confirms and send the result to the barrier release channel -->
<int:chain input-channel="confirmations" output-channel="release">
<int:header-filter header-names="replyChannel, errorChannel" />
<int:service-activator expression="payload" /> <!-- INT-3791; use s-a to retain ack header -->
<int:aggregator>
<bean class="org.springframework.integration.samples.barrier.AckAggregator" />
</int:aggregator>
</int:chain>
<int:channel id="release" />
<int:outbound-channel-adapter channel="release" ref="barrier.handler" method="trigger" />
<!-- Consumer -> nullChannel -->
<int-amqp:inbound-channel-adapter channel="nullChannel"
queue-names="barrier.sample.queue"
connection-factory="rabbitConnectionFactory" />
<!-- Infrastructure -->
<rabbit:queue name="barrier.sample.queue" auto-delete="true" />
<rabbit:direct-exchange name="barrier.sample.exchange" auto-delete="true">
<rabbit:bindings>
<rabbit:binding queue="barrier.sample.queue" key="barrier.sample.key" />
</rabbit:bindings>
</rabbit:direct-exchange>
Насколько я понимаю, есть 3 стороны:
- клиент
- сервер
- rabbitMq
Клиент отправляет A,B,C
на сервер
RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class);
String request = "A,B,C";
System.out.println("\n\n++++++++++++ Sending: " + request + " ++++++++++++\n");
String reply = requestGateway.echo(request);
Сервер принимает этот запрос:
<int-http:inbound-gateway request-channel="receiveChannel"
path="/postGateway"
error-channel="errorChannel"
supported-methods="POST"
затем в заголовки сообщений добавляются некоторые значения:
<int:header-enricher input-channel="receiveChannel" output-channel="processChannel">
<int:header name="ackCorrelation" expression="headers['id']" />
</int:header-enricher>
некоторая обработка:
<int:chain input-channel="processChannel" order="1">
<int:header-filter header-names="content-type, content-length" />
<int:splitter delimiters="," />
<int-amqp:outbound-channel-adapter amqp-template="rabbitTemplate"
exchange-name="barrier.sample.exchange" routing-key="barrier.sample.key"
confirm-ack-channel="confirmations"
confirm-nack-channel="confirmations"
return-channel="errorChannel"
confirm-correlation-expression="#this"/>
</int:chain>
Если я не ошибаюсь - сообщение разделено на ,
, поэтому у нас есть 3 сообщения A
B
и C
и отправляем на обмен rabbitMq barrier.sample.exchange
с keybarrier.sample.key
Вот так. Дальнейшие шаги для меня непонятны. Не могли бы вы уточнить?
P.S.
Я понимаю, что мы хотим дождаться всех сообщений ACK от rabbit, чтобы убедиться, что сообщения мы доставили кролику, и когда мы получим все ACK от rabbit, мы хотим ответить клиенту. Но я не понимаю определения компонентов SI, которые приводят к такому результату. Не могли бы вы предоставить какую-либо схему / изображение / объяснение, как это происходит?
Более конкретные вопросы:
Зачем нужны обогащающие заголовки и как мы это делаем?
Какова цель receiveChannel? - глупый вопрос.
Где 2 http сервиса - /postGateway
и /getGateway
. /postGateway
используется для получения сообщения от клиента, разделения на части, отправки кролику, получения подтверждений и последующего ответа клиенту. Но какова цель /getGateway
?
P.S.
На данный момент мое объявление потока выглядит так:
@Bean
public IntegrationFlow integrationFlow() {
return IntegrationFlows.from(Http.inboundGateway("/spring_integration_post")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(String.class))
.enrich(enricherSpec -> {
enricherSpec.header("correlationId", 1); //or ackCorrelationId ?
})
.split(s -> s.applySequence(false).get().getT2().setDelimiters(","))
.log()
//.barrier(1000L) is it correct place for barrier?
.log()
.handle(Amqp.outboundAdapter(amqpTemplate())
.exchangeName("barrierExchange")
.routingKey("barrierKey"))
.get();
}
Я не знаю, как использовать здесь барьер.
Со стороны сервера все частично работает - я вижу сообщения в очереди (в веб-интерфейсе rabbitMq), но со стороны клиента я начал видеть следующую трассировку стека:
2019-08-28 22:38:43.432 ERROR 12936 --- [ask-scheduler-8] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: HTTP request execution failed for URI [http://localhost:8080/spring_integration_post]; nested exception is org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null, failedMessage=GenericMessage [payload=6, headers={id=36781a7f-3d4f-e17d-60e6-33450c9307e4, timestamp=1567021122424}]
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:171)
at org.springframework.integration.http.outbound.AbstractHttpRequestExecutingMessageHandler.handleRequestMessage(AbstractHttpRequestExecutingMessageHandler.java:289)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:234)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:390)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:329)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null$1(AbstractPollingEndpoint.java:277)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$2(AbstractPollingEndpoint.java:274)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:264)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.web.client.HttpServerErrorException$InternalServerError: 500 null
at org.springframework.web.client.HttpServerErrorException.create(HttpServerErrorException.java:79)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:124)
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:102)
at org.springframework.web.client.ResponseErrorHandler.handleError(ResponseErrorHandler.java:63)
at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:778)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:736)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:710)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:598)
at org.springframework.integration.http.outbound.HttpRequestExecutingMessageHandler.exchange(HttpRequestExecutingMessageHandler.java:165)
... 30 more