как выполнять запросы sql во время выполнения из карты потока данных или flatMap во flink

Я новичок в мигании и хочу понять, как запустить мой вариант использования с FLINK: приложение имеет три источника входных данных: а) исторические данные; б) получить все живые события от кафки; в) получить управляющее событие, которое будет иметь условие триггера

поскольку приложение работает с историческими данными, я подумал, что объединю исторические данные и текущие данные и создам таблицу в этом потоке.

Чтобы инициировать событие, мы должны написать SQL-запрос с помощью управляющего события, которое является источником ввода и содержит предложение where.

Моя проблема состоит в том, чтобы создать SQL-запрос, поскольку данные находятся в Stream, и когда я делаю что-то вроде

DataStream<ControlEvent> controlEvent
controlEvent.map(new FlatMapFunction(String, String)
{
   @override
   public String flatMap(String s, Collector<String> coll)
   {
     tableEnv.execute("select * from tableName");   /// throw serialization exception
   }
});

он выдает исключение не сериализации Localexecutionenvironment


person Ashutosh    schedule 01.07.2020    source источник


Ответы (1)


Подобная динамическая инъекция запросов (пока) не поддерживается Flink SQL.

Обновлять:

Учитывая то, что вы сказали о своих требованиях - что варианты запросов будут ограничены - вместо этого вы можете реализовать это с помощью API DataStream, а не SQL. Вероятно, это будет KeyedBroadcastProcessFunction, который будет содержать какое-то состояние с ключом, и вы можете транслировать обновления для запроса / запросов.

Взгляните на демонстрацию обнаружения мошенничества в качестве примера того, как создавать подобные вещи с помощью Flink.

person David Anderson    schedule 01.07.2020
comment
Спасибо, Дэвид, за быстрый ответ. Вы бы хотели предложить какой-либо другой подход для достижения этой цели? - person Ashutosh; 02.07.2020
comment
На что похожи запросы и сколько требуется вариаций? - person David Anderson; 02.07.2020
comment
не ожидая больших изменений, запрос будет выглядеть примерно так: SELECT * FROM EventMessages, где referencData =? и businessDate ‹или›? .. и через некоторое время потребуется самостоятельное присоединение. 1) простой запрос sql с полем предложения 2-3, где 2) запрос sql типа самостоятельного соединения - person Ashutosh; 02.07.2020
comment
И громкость этих controlSet будет примерно 2-5K. - person Ashutosh; 02.07.2020
comment
Дэвид, Итак, вы предлагаете выйти из подхода sql, поскольку BroadCastProcessFunction также не позволит выполнить запрос sql по той же причине (проблема сериализации) .. или я неправильно понял ваше утверждение? - person Ashutosh; 03.07.2020
comment
Правильно: сделать это с помощью SQL невозможно, поэтому я предложил альтернативу. С SQL запрос должен быть исправлен во время компиляции. - person David Anderson; 03.07.2020