как добавить новое событие в поток данных флинка CEP?

Я использую flink 1.5.2 для решения проблемы CEP.

мои данные из списка, какой-то другой процесс добавит новый объект Event в этот список во время работы системы. Это не сокет или сетевое сообщение. Читал пример с официального сайта. Вот шаги, которые, как я полагаю, мне следует делать.

  1. создать DataStream с помощью env.fromCollection (list);
  2. определить шаблон Pattern
  3. получить PatternStream, используя CEP.pattern (data_stream, pattern)
  4. используйте pattern_stream.select (... реализуйте интерфейс выбора ...), чтобы получить результат сложного события в виде DataStream

Но мой входной поток должен быть неограниченным. Я не нашел метода add () в объекте DataStream ‹>. Как мне этого добиться? а также, нужно ли мне сообщать DataStream ‹>, когда очищать устаревшие события?


person Maxi Wu    schedule 10.08.2018    source источник


Ответы (1)


Коллекции подходят в качестве источника ввода для Flink только при работе с ограниченным набором входных данных, который фиксируется заранее, например, при написании теста или просто экспериментировании. Если вам нужен неограниченный поток, вам нужно будет выбрать другой источник, например сокет или систему очередей сообщений, такую ​​как Kafka.

С сокетами легко работать для экспериментов. В системах Linux и MacOS вы можете использовать

nc -lk 9999

чтобы создать сокет, к которому Flink может привязаться к порту 9999, и все, что вы предоставите в качестве входных данных для nc (netcat), будет передаваться в ваше задание Flink по одной строке за раз. Netcat также доступен для Windows, но не установлен заранее.

Однако вы не должны планировать использование сокетов в производственной среде, так как они не могут быть перемотаны (что имеет решающее значение для достижения точных результатов с помощью Flink во время восстановления после сбоя).

person David Anderson    schedule 10.08.2018
comment
Понятно, у меня есть брокер mqtt, но есть SSL. Я попробую использовать flink для подключения к этому брокеру. Спасибо - person Maxi Wu; 13.08.2018
comment
Я реализовал SourceFunction ‹› для тестирования потока. Я добавляю источник с помощью env.addSource (). Метод run () в моей SourceFunction ‹› имеет цикл while, который вызывает context.collect () для выдачи элемента. Наконец, я вызываю env.execute (). Но метод run () никогда никем не вызывается. Я, должно быть, неправильно понял, как работает Flink CEP. Есть ли документ, объясняющий, как использовать библиотеку CEP для мониторинга потока данных? - person Maxi Wu; 13.08.2018
comment
Я нашел проблему. Я использую цикл для генерации тестового события и передачи в свой собственный источник. Но у меня в генераторе есть Thread.sleep (t). Когда вызывается Thread.sleep (t), вся программа зависает. Возможно, Thread.sleep () конфликтует с env.execute (); Есть ли что-то, что мне нужно обрабатывать в многопоточном источнике? - person Maxi Wu; 13.08.2018
comment
О, я был неправ. Новый поток работает правильно, настоящая проблема - объект очереди. SourceFunction.run () получает очередь без элементов []. Но объект очереди, в который я добавляю элементы, не пустой. Я не знал, почему есть две очереди, а у меня только один исходный объект. - person Maxi Wu; 13.08.2018
comment
Я предполагаю, что объект Source необходимо сериализовать, а десериализовать перед запуском (). Вот почему это разные экземпляры. - person Maxi Wu; 14.08.2018