Apache Flink динамически обновляет sql без перезапуска

У меня есть вопрос относительно поведения Flink. Ниже мой фрагмент кода. Как видите, некая служба предоставляет список критериев sql (скажем, около 10 тыс. Sql), которые Flink будет выполнять один за другим. Моя проблема в том, что всякий раз, когда sql обновляется, как мне указать, что flink работает с новым sql? Один из способов, который я вижу, - это остановить и запустить службу flink, которой я хочу избежать, поскольку другие критерии sql должны работать все время, и только тот, который обновляется, должен быть остановлен / запущен / или обновлен динамически. Кроме того, я не хочу отправлять 10 тыс. Sqls как 10 тыс. Разных заданий. Итак, поведение, которое я ищу, возможно с версией Flink 1.11?

env is StreamExecutionEnvironment... 

Psudo-code:

List<String> allConditionsSqls = get_SQL_FROM_some_Service();
for(String sql : allConditionsSqls)
{
    Table table = env.sqlQuery(sql);
    env.toRetractStream(table, Row.class)
     .process(new ProcessFunction <Tuple2<Boolean, Row>, Object>() {
         @Override
         public void processElement(Tuple2<Boolean, Row> value, Context ctx,Collector<Object> out) throws Exception {
             Row ev = value.f1;
             log.info(ev);
             // more code here
         }    
     });
}

person ParagM    schedule 11.11.2020    source источник


Ответы (1)


Нет, единственный способ сделать это - запускать каждый запрос как отдельное задание. (Как бы то ни было, есть люди, динамически генерирующие 10000 заданий Flink ежедневно - это можно сделать.)

person David Anderson    schedule 12.11.2020