Я новичок в мигании и хочу понять, как запустить мой вариант использования с FLINK: приложение имеет три источника входных данных: а) исторические данные; б) получить все живые события от кафки; в) получить управляющее событие, которое будет иметь условие триггера
поскольку приложение работает с историческими данными, я подумал, что объединю исторические данные и текущие данные и создам таблицу в этом потоке.
Чтобы инициировать событие, мы должны написать SQL-запрос с помощью управляющего события, которое является источником ввода и содержит предложение where.
Моя проблема состоит в том, чтобы создать SQL-запрос, поскольку данные находятся в Stream, и когда я делаю что-то вроде
DataStream<ControlEvent> controlEvent
controlEvent.map(new FlatMapFunction(String, String)
{
@override
public String flatMap(String s, Collector<String> coll)
{
tableEnv.execute("select * from tableName"); /// throw serialization exception
}
});
он выдает исключение не сериализации Localexecutionenvironment