Я пытаюсь выполнить шаги, здесь, чтобы создать базовую UDF-функцию Flink Aggregate. Я добавил зависимости () и реализовал
public class MyAggregate extends AggregateFunction<Long, TestAgg> {..}
Я реализовал обязательные методы, а также несколько других: accumulate, merge, etc
. Все это строится без ошибок. Теперь, согласно документам, я могу зарегистрировать это как
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment sTableEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
sTableEnv.registerFunction("MyMin", new MyAggregate());
Но registerFucntion
, похоже, хочет ScalarFunction
только в качестве входных данных. Я получаю сообщение об ошибке несовместимого типа: The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)
Любая помощь была бы замечательной.