Я не могу настроить пользовательские обработчики для моего потребителя потока с помощью StreamsBuilderFactoryBeanCustomizer.
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationHandler.class);
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomEventErrorHandler.class);
fb.getStreamsConfiguration().forEach((k,v) -> System.err.println("Key , Value "+ k + " , " + v));
};
}
После установки вышеуказанного в моем классе Spring boot @Configuration и запуске приложения я все еще вижу значения по умолчанию в SteamConfig. Согласно документам
Цитата
настройщик будет вызываться связывателем прямо перед запуском фабричного компонента.
Цитата
Но похоже, что связыватель не вызывает StreamsBuilderFactoryBeanCustomizer Это известная проблема? Я использую spring-cloud-stream-binder-kafka-streams с 2020.0.1 (spring-cloud.version) 2.4.2 (весенний ботинок)
021-03-11 11:52:04.386 [main] INFO o.apache.kafka.streams.StreamsConfig - StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = service-stream
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
max.task.idle.ms = 0
max.warmup.replicas = 2