Регистрация агрегированного UDF в Apache Flink

Я пытаюсь выполнить шаги, здесь, чтобы создать базовую UDF-функцию Flink Aggregate. Я добавил зависимости () и реализовал

public class MyAggregate extends AggregateFunction<Long, TestAgg> {..}

Я реализовал обязательные методы, а также несколько других: accumulate, merge, etc. Все это строится без ошибок. Теперь, согласно документам, я могу зарегистрировать это как

    StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment sTableEnv = StreamTableEnvironment.getTableEnvironment(sEnv);
    sTableEnv.registerFunction("MyMin", new MyAggregate());

Но registerFucntion, похоже, хочет ScalarFunction только в качестве входных данных. Я получаю сообщение об ошибке несовместимого типа: The method registerFunction(String, ScalarFunction) in the type TableEnvironment is not applicable for the arguments (String, MyAggregate)

Любая помощь была бы замечательной.


person Max Arbiter    schedule 15.03.2018    source источник


Ответы (1)


Вам необходимо импортировать StreamTableEnvironment для выбранного вами языка, который в вашем случае org.apache.flink.table.api.java.StreamTableEnvironment.

org.apache.flink.table.api.StreamTableEnvironment - это общий абстрактный класс для вариантов StreamTableEnvironment Java и Scala. Мы заметили, что эта часть API сбивает пользователей с толку, и мы будем улучшать ее в будущем.

person Fabian Hueske    schedule 15.03.2018