Я столкнулся с проблемой при агрегировании окон. Я хочу получить сумму значений для каждого ключа, и результат будет отправлен в тему вывода только после завершения окна. Проблема в том, что каждое событие во «входной» теме будет создавать событие для «выходной» темы. Я хотел бы опубликовать событие в теме вывода только после того, как окно будет завершено. Например, если окно составляет одну минуту, отправляйте одно событие для каждой клавиши в минуту. Пример кода выглядит следующим образом:
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(2))
.reduce((v1, v2) -> String.valueOf(Integer.parseInt(v1) + Integer.parseInt(v2)))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.toStream((k,v) -> k.key())
.to("output_topic");
Но я получаю следующее исключение:
Исключение в потоке «learningtime_application-665cd31a-1957-448b-8cf7-779ab359cfd2-StreamThread-1» org.apache.kafka.streams.errors.ProcessorStateException: task [1_0] Не удалось очистить хранилище состояний KSTREAM-REDUCE-0000000003STATE-STATE Вызвано: java.lang.ClassCastException: класс org.apache.kafka.streams.kstream.Windowed не может быть преобразован в класс java.lang.String (org.apache.kafka.streams.kstream.Windowed находится в безымянном модуле загрузчика ' app '; java.lang.String находится в модуле java.base загрузчика' bootstrap ')