У меня есть вопрос относительно поведения Flink. Ниже мой фрагмент кода. Как видите, некая служба предоставляет список критериев sql (скажем, около 10 тыс. Sql), которые Flink будет выполнять один за другим. Моя проблема в том, что всякий раз, когда sql обновляется, как мне указать, что flink работает с новым sql? Один из способов, который я вижу, - это остановить и запустить службу flink, которой я хочу избежать, поскольку другие критерии sql должны работать все время, и только тот, который обновляется, должен быть остановлен / запущен / или обновлен динамически. Кроме того, я не хочу отправлять 10 тыс. Sqls как 10 тыс. Разных заданий. Итак, поведение, которое я ищу, возможно с версией Flink 1.11?
env is StreamExecutionEnvironment...
Psudo-code:
List<String> allConditionsSqls = get_SQL_FROM_some_Service();
for(String sql : allConditionsSqls)
{
Table table = env.sqlQuery(sql);
env.toRetractStream(table, Row.class)
.process(new ProcessFunction <Tuple2<Boolean, Row>, Object>() {
@Override
public void processElement(Tuple2<Boolean, Row> value, Context ctx,Collector<Object> out) throws Exception {
Row ev = value.f1;
log.info(ev);
// more code here
}
});
}