Spring Cloud Stream 3.0 StreamsBuilderFactoryBeanCustomizer

Я не могу настроить пользовательские обработчики для моего потребителя потока с помощью StreamsBuilderFactoryBeanCustomizer.

@Bean
    public StreamsBuilderFactoryBeanCustomizer customizer() { 
        return fb -> {          
    fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationHandler.class);
            fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomEventErrorHandler.class);               
            fb.getStreamsConfiguration().forEach((k,v) -> System.err.println("Key , Value "+ k + " , " + v));
        };
    }

После установки вышеуказанного в моем классе Spring boot @Configuration и запуске приложения я все еще вижу значения по умолчанию в SteamConfig. Согласно документам

Цитата

настройщик будет вызываться связывателем прямо перед запуском фабричного компонента.

Цитата

Но похоже, что связыватель не вызывает StreamsBuilderFactoryBeanCustomizer Это известная проблема? Я использую spring-cloud-stream-binder-kafka-streams с 2020.0.1 (spring-cloud.version) 2.4.2 (весенний ботинок)

021-03-11 11:52:04.386 [main] INFO  o.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
    acceptable.recovery.lag = 10000
    application.id = service-stream
    application.server = 
    bootstrap.servers = [localhost:9092]
    buffered.records.per.partition = 1000
    built.in.metrics.version = latest
    cache.max.bytes.buffering = 10485760
    client.id = 
    commit.interval.ms = 1000
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    max.task.idle.ms = 0
    max.warmup.replicas = 2

person Andy    schedule 11.03.2021    source источник


Ответы (2)


Похоже, есть два интерфейса с одинаковым именем: Один из Spring Kafka и еще один в Spring Boot. Переплет учитывает только папку из Spring Kafka. Убедитесь, что вы его реализуете. Чтобы устранить это несоответствие, мы зарегистрировали проблему в Boot.

person sobychacko    schedule 11.03.2021
comment
Спасибо @sobychacko, это действительно так. Использование правильного оператора импорта вызывает ожидаемую регистрацию. - person Andy; 12.03.2021
comment
Хорошо знать. Не стесняйтесь принять ответ, на всякий случай, если у кого-то еще возникнет такая же проблема, и зайдите в эту ветку. - person sobychacko; 12.03.2021
comment
Но интерфейс из spring-kafka устарел. - person Bhavesh Shah; 06.07.2021
comment
Spring Kafka устарел StreamsBuilderFactoryBeanCustomizer и вместо этого предоставляет StreamsBuilderFactoryBeanConfigurer, чтобы избежать конфликта имен с Spring Boot. Подробнее см. В этой проблеме: github.com/spring-projects/spring-kafka/ issues / 1736 - person sobychacko; 06.07.2021

Это было обновлено в весенней облачной версии 2020.0.3. Вместо этого можно использовать org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer.

person Nipun Agarwal    schedule 24.06.2021