Вам нужно каким-то образом настоять на том, чтобы он занимал все возможное, а не просто совпадал после первого. Вот один из способов сделать это.
public class CEPExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "b", "b", "a", "b", "b", "x");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
Pattern<String, String> pattern = Pattern.<String>begin("first", skipStrategy)
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return (element.equals("a"));
}
}).oneOrMore()
.next("second")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return (element.equals("b"));
}
}).oneOrMore()
.next("end")
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return (!element.equals("b"));
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern);
patternStream.select(new SelectSegment()).print();
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("first")) + String.join("", pattern.get("second"));
}
}
}
Если вместо этого вы хотите сопоставить a + b *, хотя я чувствую, что должно быть более простое решение, вот что-то, что работает:
public class CEPExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "x");
AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
Pattern<String, String> pattern = Pattern.<String>begin("a-or-b", skipStrategy)
.where(new SimpleCondition<String>() {
@Override
public boolean filter(String element) throws Exception {
return element.equals("a") || element.equals("b");
}
}).oneOrMore()
.next("end")
.where(new IterativeCondition<String>() {
@Override
public boolean filter(String element, Context<String> ctx) throws Exception {
List<String> list = new ArrayList<>();
ctx.getEventsForPattern("a-or-b").iterator().forEachRemaining(list::add);
int length = list.size();
if (!element.equals("a") && !element.equals("b")) return true;
return (((length >= 1) && element.equals("a") && list.get(length - 1).equals("b")));
}
});
PatternStream<String> patternStream = CEP.pattern(events, pattern);
patternStream.select(new SelectSegment()).print();
env.execute();
}
public static class SelectSegment implements PatternSelectFunction<String, String> {
public String select(Map<String, List<String>> pattern) {
return String.join("", pattern.get("a-or-b"));
}
}
}
Как бы то ни было, я обычно нахожу match_recognize предлагает более простой DSL для сопоставления шаблонов с Flink.
person
David Anderson
schedule
18.09.2019