Использование TestChannelBinderConfiguration, но два обработчика регистрируются

У меня есть приложение Spring Cloud Stream, которое обрабатывает события (кредитные карты), некоторые из которых обрабатываются синхронно, а некоторые асинхронно. В Котлине я придумал примерно следующее:

весеннее облако-стартер-ручей-кролик = 3.1.0

@Service
class CardEventProcessor(
    private val streamBridge: StreamBridge,
) : Consumer<AsyncCardEvent> {

    fun process(cardEvent: SyncCardEvent): Result { return businessLogic() }

    fun processAsynchronously(cardEvent: AsyncCardEvent) {
        streamBridge.send("cardEventProcessor-out-0", cardEvent)
    }

    override fun accept(cardEvent: AsyncCardEvent) { businessLogic() }
}

и настроили его так:

  rabbitmq: ...
  cloud:
    stream:
      bindings:
        cardEventProcessor-in-0:
          destination: cardevents
          group: CardEventProcessor
        cardEventProcessor-out-0:
          destination: cardevents

Кажется, все работает нормально, за исключением интеграционных тестов, обработка которых завершается сбоем после второго события асинхронной карты. Мне удалось отладить / уменьшить проблему до двух обработчиков, зарегистрированных в UnicastingDispatcher, который имеет циклическую стратегию: один для TestChannelBinder, а другой для OutputDestination $ lamdba.

Вот как выглядит мой класс интеграционного теста:

@SpringBootTest
@Transactional
@AutoConfigureMockMvc
@AutoConfigureEmbeddedDatabase
@Import(TestChannelBinderConfiguration::class)
class IntegrationTests {

    @Test
    fun `Use case one`() {
        sendFirstAsyncRequest()  // processed correctly in CardEventProcessor.accept()
        sendSecondAsyncRequest() // message never arrives in CardEventProcessor.accept()
    }
}

Я следил за разделом «Тестирование» в документации Spring Cloud Stream и не могу понять, что мне не хватает, чтобы это работало. В примере есть функция ‹›, а не потребитель ‹›, и я создаю в том же классе @Service, что и потребитель (потому что очередь - это просто деталь реализации для решения асинхронности, а не типичный вариант использования очередей между микросервисами) но насколько я понимаю, это должно работать, и действительно работает, когда не выполняется в качестве интеграционного теста.

Я видел Отключить Spring Cloud Stream Rabbit для тестов, но не хотел зависеть от устаревшего spring-cloud-stream-test-support, и два других предложения тоже не сработали. Любые идеи?


person Gergely Nagy    schedule 02.02.2021    source источник
comment
Насколько я понимаю, ваш CardEventProcessor не соответствует концепции FunctionalInterface, поскольку у него есть два общедоступных метода для обработки сообщений (что может привести к двум привязкам), поэтому я подозреваю, что это связано с вашей проблемой. Тот факт, что это даже работает частично, немного озадачивает меня, так что я исследую и продолжу.   -  person Oleg Zhurakousky    schedule 03.02.2021
comment
Спасибо Олег. CardEventProcessor реализует Consumer ‹› и в соответствии с документами. И в этом контексте bean-компоненты типа Supplier, Function или Consumer рассматриваются как де-факто обработчики сообщений .... Это также единственный такой bean-компонент в контексте, не уверен, влияет ли это на привязку / проводка. Возможно, это отдельный вопрос, но пример в документации показывает value.toUpperCase() подобные функции без внешних зависимостей, а старые привязки на основе аннотаций теперь помечены как устаревшие. Итак, что делать, если потребитель, например, нужно писать в базу данных? Как это соотносится с функциональной концепцией?   -  person Gergely Nagy    schedule 04.02.2021
comment
Извините, я это пропустил, не так часто работаю с Kotlin;). Итак, если это Потребитель, будет создана только одна привязка - ввод. Это не означает, что вы не можете использовать StreamBridge для отправки в cardEventProcessor-out-0, но это также означает, что привязка cardEventProcessor-out-0 будет создаваться динамически. Итак, вопрос, почему бы просто не реализовать функцию CardEventProcessor, которая привела бы к двум привязкам, и вам не нужно беспокоиться о ручной отправке, поскольку фреймворк позаботится об этом?   -  person Oleg Zhurakousky    schedule 04.02.2021
comment
Кроме того, не могли бы вы создать образец проекта с минимальным минимумом и упомянутым вами тестом, который воспроизводит проблему, чтобы мы могли ее рассмотреть?   -  person Oleg Zhurakousky    schedule 04.02.2021
comment
Функция здесь не будет работать, потому что цепочка - производитель - ›потребитель, а не потребитель -› производитель. В потребителе нет возвращаемого значения, обработанное сообщение записывается в БД, поэтому, по сути, потребитель является недействительной функцией - следовательно, потребитель. Производитель запускается при получении сообщения SOAP по HTTP, а не из входящей очереди. Причина, по которой process () находится в CardEventProcessor, - это согласованность: не хотелось передавать детали очереди в конечную точку SOAP, это деталь реализации CardEventProcessor.   -  person Gergely Nagy    schedule 04.02.2021
comment
Вот пример проекта для воспроизведения: github.com/gnagy/test-channel-binder-triage   -  person Gergely Nagy    schedule 04.02.2021
comment
Возникла проблема - github.com/spring-cloud/spring-cloud- stream / issues / 2106   -  person Oleg Zhurakousky    schedule 04.02.2021
comment
Я столкнулся с той же проблемой, реагируя на свой код из StreamListener / Bindings в StreamBridge и обработчики функционального стиля. Я потратил около 8 часов, чтобы разобраться в проблеме, но безуспешно. Кажется, что очень уродливым решением является @DirtiesContext для обеспечения чистого контекста приложения для каждого метода тестирования, но это не будет работать для всех сценариев и работает медленно ...   -  person agassner    schedule 24.02.2021