Приложение, основанное на flink-sql или flink-cep, не сможет этого сделать, потому что эти библиотеки могут обрабатывать только те правила, которые определены во время компиляции задания. Вам нужно будет начинать новое задание для каждого нового правила, которое может не соответствовать вашим требованиям.
Если вы хотите иметь одно задание, которое может обрабатывать динамический набор правил, предоставляемых во время выполнения задания, вам придется создать его самостоятельно. Вы можете использовать KeyedBroadcastProcessFunction
для этого (похоже, что вы уже начали экспериментировать).
Вот набросок возможной реализации:
Вы можете использовать ключевое состояние в KeyedBroadcastProcessFunction, чтобы отслеживать текущий счет в каждом окне. Если правила можно охарактеризовать временным интервалом и порогом подсчета, вы можете использовать MapState
, где ключи — это идентификаторы правил, а значения на карте — текущий подсчет для этого правила. У вас может быть таймер для каждого правила, которое срабатывает при закрытии каждого окна.
По мере поступления событий вы перебираете карту на основе правил, увеличивая счетчик для каждого соответствующего правила. И когда срабатывают таймеры, вы находите соответствующие правила, сравниваете счетчики с пороговыми значениями, предпринимаете соответствующие действия и очищаете эти счетчики.
Некоторые возможные осложнения, о которых следует помнить:
- Эта реализация требует, чтобы вы разделили свой поток с помощью
keyBy
, чтобы вы могли использовать MapState и таймеры.
- С широковещательным потоком не могут быть связаны таймеры, поэтому таймерами должен управлять метод
processElement
, обрабатывающий ключевой поток.
- Flink позволяет использовать только один таймер для данного ключа и заданной метки времени. Поэтому будьте осторожны, если вам нужно обрабатывать случай, когда два правила должны запускаться одновременно.
- Если события могут поступать не по порядку, вам нужно либо сначала отсортировать поток по отметке времени, либо разрешить одновременное открытие нескольких окон.
person
David Anderson
schedule
11.11.2019