Spring Integration Flow теряет подписчика

У меня есть поток SI, который потребляет org.springframework.web.reactive.socket.WebSocketMessage, выполняет некоторую работу с ним, включая обработку его полезной нагрузки с помощью Netty ByteBuf. В какой-то момент в моем потоке произошло исключение:

org.springframework.messaging.MessageHandlingException: error occurred in message handler [_org.springframework.integration.errorLogger.handler]; nested exception is io.netty.util.IllegalReferenceCountException: refCnt: 0
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:93) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
...
Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
    at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1356) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:175) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1315) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final]
    at org.springframework.core.io.buffer.NettyDataBuffer.hashCode(NettyDataBuffer.java:288) ~[spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.web.reactive.socket.WebSocketMessage.hashCode(WebSocketMessage.java:134) ~[spring-webflux-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at java.lang.Object.toString(Object.java:236) ~[?:1.8.0_161]
    at java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161]
    at java.lang.StringBuilder.append(StringBuilder.java:131) ~[?:1.8.0_161]

После этого обработка всех двоичных сообщений веб-сокетов завершается ошибкой со следующим исключением:

2018-11-26T10:38:29,133 ERROR --- [-server-epoll-7] o.s.i.h.LoggingHandler (:) org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'binaryWebSocketMessageChannel'; nested exception is java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages, failedMessage=GenericMessage [payload=MyPayload(payload=org.springframework.web.reactive.socket.WebSocketMessage@38552d5, session=ReactorNettyWebSocketSession[id=3e0be929, uri=http://localhost:8080/]), headers={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a, timestamp=1543225109132}]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158)
    at reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79)
...
Caused by: java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] doesn't have subscribers to accept messages
    at org.springframework.util.Assert.state(Assert.java:94)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445)
    ... 57 more

Может ли кто-нибудь указать мне любое направление, чтобы попытаться решить проблему? Кроме того, в каких случаях компонент SI EIP (маршрутизатор, преобразователь, фильтр, активатор услуг) откажется от подписки на канал?

Для справки, тип канала - org.springframework.integration.channel.FluxMessageChannel.

Редактировать:

Мой поток выглядит так:

WebSocketMessage -> router: (BINARY)  -> binaryWebSocketMessageChannel -> ...
                            (!BINARY) -> nullChannel

(Я знаю, что фильтр здесь лучше подходит, я планирую провести рефакторинг позже)

@ArtemBilan репо с примером находится здесь: https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber


person ioreskovic    schedule 28.11.2018    source источник
comment
Нам нужен какой-то простой проект, с которым можно поиграть и воспроизвести. Вы предоставляете слишком мало информации о конфигурации. Спасибо   -  person Artem Bilan    schedule 28.11.2018
comment
Я постараюсь извлечь части к концу этой недели и разместить их на GitHub. Спасибо :)   -  person ioreskovic    schedule 29.11.2018
comment
@ArtemBilan Я обновил вопрос репо, шаги для воспроизведения и журналы   -  person ioreskovic    schedule 29.11.2018
comment
Ваше приложение выглядит неплохо, но его не так просто просмотреть: слишком много нестандартного кода. К тому же предпосылки для репликации слишком сложны. Разве вы не можете просто придумать в проекте тест junit, чтобы изолировать проблему в одном месте? Я не занимаюсь отдельной машиной, чтобы запускать какое-то другое программное обеспечение.   -  person Artem Bilan    schedule 29.11.2018
comment
Конечно, завтра первое. Я только что опубликовал этот пример, потому что те, в которых исходные шаги, которые привели к этой ошибке   -  person ioreskovic    schedule 29.11.2018
comment
@ArtemBilan Я обновил репо с помощью элементарного теста.   -  person ioreskovic    schedule 30.11.2018
comment
Только что обновили свой проект до Boot 2.1.1 - работает хорошо. Тестирование с 2.0.7 ...   -  person Artem Bilan    schedule 30.11.2018
comment
Да ... тоже не получается. К сожалению, onErrorContinue() является особенностью Reactor 3.2, и у нас нет выбора, кроме как обновиться до последней версии Spring Boot 2.1.1.   -  person Artem Bilan    schedule 30.11.2018
comment
Ах, это то, что есть :) Еще раз спасибо за помощь: D Если я когда-нибудь встречусь с вами лично, я куплю вам пиво или любой другой напиток по вашему выбору: D   -  person ioreskovic    schedule 30.11.2018


Ответы (1)


Дело в том, что Publisher в FluxMessageChannel отменяется в этой версии Spring Integration.

Мы начали использовать onErrorContinue() из Reactor 3.2 в версии 5.1. Чтобы решить вашу проблему, было бы лучше рассмотреть возможность обновления вашего приложения до последней версии Spring Boot 2.1.1.

В качестве обходного пути вы можете принять исключение в BinaryWsmToBytesTransformer и не возвращать его обратно в FluxMessageChannel.

person Artem Bilan    schedule 30.11.2018