У меня есть приложение Spring Cloud Stream, которое обрабатывает события (кредитные карты), некоторые из которых обрабатываются синхронно, а некоторые асинхронно. В Котлине я придумал примерно следующее:
весеннее облако-стартер-ручей-кролик = 3.1.0
@Service
class CardEventProcessor(
private val streamBridge: StreamBridge,
) : Consumer<AsyncCardEvent> {
fun process(cardEvent: SyncCardEvent): Result { return businessLogic() }
fun processAsynchronously(cardEvent: AsyncCardEvent) {
streamBridge.send("cardEventProcessor-out-0", cardEvent)
}
override fun accept(cardEvent: AsyncCardEvent) { businessLogic() }
}
и настроили его так:
rabbitmq: ...
cloud:
stream:
bindings:
cardEventProcessor-in-0:
destination: cardevents
group: CardEventProcessor
cardEventProcessor-out-0:
destination: cardevents
Кажется, все работает нормально, за исключением интеграционных тестов, обработка которых завершается сбоем после второго события асинхронной карты. Мне удалось отладить / уменьшить проблему до двух обработчиков, зарегистрированных в UnicastingDispatcher
, который имеет циклическую стратегию: один для TestChannelBinder, а другой для OutputDestination $ lamdba.
Вот как выглядит мой класс интеграционного теста:
@SpringBootTest
@Transactional
@AutoConfigureMockMvc
@AutoConfigureEmbeddedDatabase
@Import(TestChannelBinderConfiguration::class)
class IntegrationTests {
@Test
fun `Use case one`() {
sendFirstAsyncRequest() // processed correctly in CardEventProcessor.accept()
sendSecondAsyncRequest() // message never arrives in CardEventProcessor.accept()
}
}
Я следил за разделом «Тестирование» в документации Spring Cloud Stream и не могу понять, что мне не хватает, чтобы это работало. В примере есть функция ‹›, а не потребитель ‹›, и я создаю в том же классе @Service, что и потребитель (потому что очередь - это просто деталь реализации для решения асинхронности, а не типичный вариант использования очередей между микросервисами) но насколько я понимаю, это должно работать, и действительно работает, когда не выполняется в качестве интеграционного теста.
Я видел Отключить Spring Cloud Stream Rabbit для тестов, но не хотел зависеть от устаревшего spring-cloud-stream-test-support
, и два других предложения тоже не сработали. Любые идеи?
CardEventProcessor
не соответствует концепции FunctionalInterface, поскольку у него есть два общедоступных метода для обработки сообщений (что может привести к двум привязкам), поэтому я подозреваю, что это связано с вашей проблемой. Тот факт, что это даже работает частично, немного озадачивает меня, так что я исследую и продолжу. - person Oleg Zhurakousky   schedule 03.02.2021value.toUpperCase()
подобные функции без внешних зависимостей, а старые привязки на основе аннотаций теперь помечены как устаревшие. Итак, что делать, если потребитель, например, нужно писать в базу данных? Как это соотносится с функциональной концепцией? - person Gergely Nagy   schedule 04.02.2021cardEventProcessor-out-0
, но это также означает, что привязкаcardEventProcessor-out-0
будет создаваться динамически. Итак, вопрос, почему бы просто не реализовать функцию CardEventProcessor, которая привела бы к двум привязкам, и вам не нужно беспокоиться о ручной отправке, поскольку фреймворк позаботится об этом? - person Oleg Zhurakousky   schedule 04.02.2021