Как я могу обнаружить паттерн a + b + с помощью Flink CEP

Flink CEP не работает для моего шаблона. У меня есть последовательность например aabbbbaaaabbabb (a + b +). Я хочу, чтобы процесс функции выводил такой результат: {aabbbb} {aaaabb} {abb}

AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipPastLastEvent();
Pattern<JsonNode, JsonNode> attemptPattern = Pattern.<JsonNode>begin("first", skipStrategy)
        .where(new SPCondition() {
            @Override
            public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
                return element.get("endpoint").textvalue().equals("A");
            }
        }).oneOrMore()
        .next("second")
        .where(new SPCondition() {
            @Override
            public boolean filter(JsonNode element, Context<JsonNode> context) throws Exception {
                return element.get("endpoint").textvalue().equals("B");
            }
        }).oneOrMore();

мой результат:

{aab} {aaaab} {ab}


person Mohammad Hossein Gerami    schedule 17.09.2019    source источник


Ответы (1)


Вам нужно каким-то образом настоять на том, чтобы он занимал все возможное, а не просто совпадал после первого. Вот один из способов сделать это.

public class CEPExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "b", "b", "a", "b", "b", "x");

        AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
        Pattern<String, String> pattern = Pattern.<String>begin("first", skipStrategy)
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return (element.equals("a"));
                    }
                }).oneOrMore()
                .next("second")
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return (element.equals("b"));
                    }
                }).oneOrMore()
                .next("end")
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return (!element.equals("b"));
                    }
                });

        PatternStream<String> patternStream = CEP.pattern(events, pattern);
        patternStream.select(new SelectSegment()).print();
        env.execute();
    }

    public static class SelectSegment implements PatternSelectFunction<String, String> {
        public String select(Map<String, List<String>> pattern) {
            return String.join("", pattern.get("first")) + String.join("", pattern.get("second"));
        }
    }

}

Если вместо этого вы хотите сопоставить a + b *, хотя я чувствую, что должно быть более простое решение, вот что-то, что работает:

public class CEPExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<String> events = env.fromElements("a", "a", "b", "b", "b", "b", "a", "a", "a", "a", "x");

        AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.skipToFirst("end");
        Pattern<String, String> pattern = Pattern.<String>begin("a-or-b", skipStrategy)
                .where(new SimpleCondition<String>() {
                    @Override
                    public boolean filter(String element) throws Exception {
                        return element.equals("a") || element.equals("b");
                    }
                }).oneOrMore()
                .next("end")
                .where(new IterativeCondition<String>() {
                    @Override
                    public boolean filter(String element, Context<String> ctx) throws Exception {
                        List<String> list = new ArrayList<>();
                        ctx.getEventsForPattern("a-or-b").iterator().forEachRemaining(list::add);
                        int length = list.size();
                        if (!element.equals("a") && !element.equals("b")) return true;
                        return (((length >= 1) && element.equals("a") && list.get(length - 1).equals("b")));
                    }
                });

        PatternStream<String> patternStream = CEP.pattern(events, pattern);
        patternStream.select(new SelectSegment()).print();
        env.execute();
    }

    public static class SelectSegment implements PatternSelectFunction<String, String> {
        public String select(Map<String, List<String>> pattern) {
            return String.join("", pattern.get("a-or-b"));
        }
    }

}

Как бы то ни было, я обычно нахожу match_recognize предлагает более простой DSL для сопоставления шаблонов с Flink.

person David Anderson    schedule 18.09.2019
comment
Уважаемый Дэвид. Спасибо за ваш ответ. решил мою проблему, но я также хочу шаблон + b *. как я могу обнаружить a + b *? (например, ввод: обнаружен шаблон aaaaaaa: aaaaaaa) - person Mohammad Hossein Gerami; 20.09.2019
comment
См. Мой обновленный ответ, а также мое предложение исследовать использование Flink SQL с match_recognize (который компилируется в CEP ниже). - person David Anderson; 20.09.2019