Функция Spring Cloud StreamВывод маршрутизатора пытается привязаться к теме Kafka

Я пытаюсь перейти на новую модель функционального программирования для Spring Cloud Stream, заменив условные аннотации StreamListener, подобные этому

@StreamListener("app-input", condition = "headers['eventName']=='Funded'")

с чем-то вроде

@Bean
fun router() = MessageRoutingCallback {
    when (it.headers["eventName"]) {
        "Funded" -> "funded"
        else -> "ignored"
    }
}
@Bean
fun funded() = Consumer { message: Message<Funded> ->
    ...
}

@Bean
fun ignored() = Consumer { message: Message<*> ->
    ...
}

со связанными свойствами, связывающими канал с темой

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=MyTopic

Мне нужен этот уровень косвенности, потому что в MyTopic поступает несколько типов сообщений Avro, которые необходимо десериализовать и маршрутизировать по-разному.

Все это работает довольно успешно, и я могу получать и маршрутизировать сообщения, как и ожидалось. Однако существует неожиданный и нежелательный побочный эффект от использования functionRouter таким образом, который заключается в том, что он пытается также связать functionRouter-out-0 с Kafka, когда нет доступной темы, и поэтому каждые 30 секунд приложение пытается подключиться к тема на брокере называется functionRouter-out-0 и завершается с ошибкой авторизации, как и следовало ожидать.

2021-05-06 12:57:55.654 WARN  [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : No partitions have been retrieved for the topic (functionRouter-out-0). This will affect the health check.
2021-05-06 12:57:56.198 WARN  [screening]                            org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-3] Error while fetching metadata with correlation id 4 : {functionRouter-out-0=TOPIC_AUTHORIZATION_FAILED}
2021-05-06 12:57:56.199 ERROR [screening]                            org.apache.kafka.clients.Metadata        : [Producer clientId=producer-3] Topic authorization failed for topics [functionRouter-out-0]
2021-05-06 12:57:56.199 ERROR [screening]                            o.s.c.s.b.k.p.KafkaTopicProvisioner      : Failed to obtain partition information
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [functionRouter-out-0]

Таким образом, возникает вопрос: а) как я могу остановить канал functionRouter-out-0, пытающийся привязаться к Kafka, или б) как еще я могу добиться этого, не нуждаясь в промежуточном канале?

Функция маршрутизации событий Spring Cloud Stream автоматически создает новую тему похожа, но никогда не получала ответа.


person Andy Palmer    schedule 06.05.2021    source источник


Ответы (1)


Я считаю, что это ошибка. Я открыл вопрос, если вы хотите следить за ним:

https://github.com/spring-cloud/spring-cloud-stream/issues/2168

В качестве обходного пути просто укажите его на тот же пункт назначения

spring.cloud.function.definition=functionRouter
spring.cloud.stream.bindings.functionRouter-in-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-out-0.destination=so67419839
spring.cloud.stream.bindings.functionRouter-in-0.group=so67419839

Поскольку все делегаты Consumer, мы никогда ничего не отправим.

person Gary Russell    schedule 06.05.2021
comment
Я добавил обходной путь. - person Gary Russell; 06.05.2021
comment
Быстрая работа! Спасибо, Гэри. Подтверждено локально и для меня. - person Andy Palmer; 06.05.2021