Я использую flink 1.5.2 для решения проблемы CEP.
мои данные из списка, какой-то другой процесс добавит новый объект Event в этот список во время работы системы. Это не сокет или сетевое сообщение. Читал пример с официального сайта. Вот шаги, которые, как я полагаю, мне следует делать.
- создать DataStream с помощью env.fromCollection (list);
- определить шаблон Pattern
- получить PatternStream, используя CEP.pattern (data_stream, pattern)
- используйте pattern_stream.select (... реализуйте интерфейс выбора ...), чтобы получить результат сложного события в виде DataStream
Но мой входной поток должен быть неограниченным. Я не нашел метода add () в объекте DataStream ‹>. Как мне этого добиться? а также, нужно ли мне сообщать DataStream ‹>, когда очищать устаревшие события?