Как отправить результат оконной агрегации в тему вывода только после того, как окно будет завершено?

Я столкнулся с проблемой при агрегировании окон. Я хочу получить сумму значений для каждого ключа, и результат будет отправлен в тему вывода только после завершения окна. Проблема в том, что каждое событие во «входной» теме будет создавать событие для «выходной» темы. Я хотел бы опубликовать событие в теме вывода только после того, как окно будет завершено. Например, если окно составляет одну минуту, отправляйте одно событие для каждой клавиши в минуту. Пример кода выглядит следующим образом:

.groupByKey()  
.windowedBy(TimeWindows.of(Duration.ofMinutes(2))
.reduce((v1, v2) -> String.valueOf(Integer.parseInt(v1) + Integer.parseInt(v2)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("output_topic");

Но я получаю следующее исключение:

Исключение в потоке «learningtime_application-665cd31a-1957-448b-8cf7-779ab359cfd2-StreamThread-1» org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Не удалось очистить хранилище состояний KSTREAM-REDUCE-0000000003STATE-STATE Вызвано: java.lang.ClassCastException: класс org.apache.kafka.streams.kstream.Windowed не может быть преобразован в класс java.lang.String (org.apache.kafka.streams.kstream.Windowed находится в безымянном модуле загрузчика ' app '; java.lang.String находится в модуле java.base загрузчика' bootstrap ')


person Swati Bhatia    schedule 10.12.2019    source источник


Ответы (1)


Вы столкнулись с известной ошибкой: https://issues.apache.org/jira/browse/KAFKA-9259

Оператор suppress неправильно выбирает serdes по умолчанию из конфигурации, т. Е. Он использует ключ serde без преобразования его в оконный-key-serdes.

В качестве обходного пути вам необходимо явно указать serdes в reduce() через Materialized.with(...). Вы передаете простые ключи и значения serdes, и reduce преобразует key-serde в оконный key-serde, который затем также будет передан в suppress().

person Matthias J. Sax    schedule 12.12.2019