Я пытаюсь перейти на новую модель функционального программирования для Spring Cloud Stream, заменив условные аннотации StreamListener, подобные этому
@StreamListener("app-input", condition = "headers['eventName']=='Funded'")
с чем-то вроде
@Bean
fun router() = MessageRoutingCallback {
when (it.headers["eventName"]) {
"Funded" -> "funded"
else -> "ignored"
}
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
...
}
@Bean
fun ignored() = Consumer { message: Message<*> ->
...
}
со связанными свойствами, связывающими канал с темой
spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic
Мне нужен этот уровень косвенности, потому что в MyTopic поступает несколько типов сообщений Avro, которые необходимо десериализовать и маршрутизировать по-разному.
Все это работает довольно успешно, и я могу получать и маршрутизировать сообщения, как и ожидалось. Однако существует неожиданный и нежелательный побочный эффект от использования functionRouter таким образом, который заключается в том, что он пытается также связать functionRouter-out-0 с Kafka, когда нет доступной темы, и поэтому каждые 30 секунд приложение пытается подключиться к тема на брокере называется functionRouter-out-0 и завершается с ошибкой авторизации, как и следовало ожидать.
2021-05-06 12:57:55.654 WARN [screening] o.s.c.s.b.k.p.KafkaTopicProvisioner : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN [screening] org.apache.kafka.clients.NetworkClient : [Producer clientId=producer-3] Error while fetching metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_FAILED}
2021-05-06 12:57:56.199 ERROR [screening] org.apache.kafka.clients.Metadata : [Producer clientId=producer-3] Topic authorization failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening] o.s.c.s.b.k.p.KafkaTopicProvisioner : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]
Таким образом, возникает вопрос: а) как я могу остановить канал functionRouter-out-0, пытающийся привязаться к Kafka, или б) как еще я могу добиться этого, не нуждаясь в промежуточном канале?
Функция маршрутизации событий Spring Cloud Stream автоматически создает новую тему похожа, но никогда не получала ответа.