Исключение в Flink TableAPI

Я пытаюсь выполнить простое задание Flink для подсчета слов с помощью TableAPI. Использовал API DataStream для чтения потока данных и использовал API StreamTableEnvironment для создания среды таблиц. Я получаю исключение ниже. Может кто-нибудь помочь мне, что не так в коде? Я использую версию Flink 1.8.

Исключение:

**Exception in thread "main" org.apache.flink.table.api.TableException: Only the first field can reference an atomic type.**
    at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1117)
    at org.apache.flink.table.api.TableEnvironment$$anonfun$5.apply(TableEnvironment.scala:1112)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
    at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:1112)
    at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:546)
    at org.apache.flink.table.api.java.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:91)
    at Udemy_Course.TableAPIExample.CountWordExample.main(CountWordExample.java:30)

Код

public class CountWordExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment streamTableEnvironment=StreamTableEnvironment.create(environment);


        DataStream<WC> streamOfWords =
                environment.fromElements(
                        new WC("Hello",1L),
                        new WC("Howdy",1L),
                        new WC("Hello",1L),
                        new WC("Hello",1L));

        Table t1 = streamTableEnvironment.fromDataStream(streamOfWords, "word, count");

        Table result = streamTableEnvironment.sqlQuery("select word, count(word) as wordcount 
        from " + t1 + " group by word");


        streamTableEnvironment.toRetractStream(result, CountWordExample.class ).print();

        environment.execute();
    }

    public static class WC {

        private String word;
        private Long count;

        public WC() {}

        public WC(String word,Long count) {
            this.word=word;
            this.count=count;
        }
    }
}

person Shailendra    schedule 01.09.2020    source источник


Ответы (1)


Все, что нужно для этого, - это изменить закрытые поля word и count в WC, чтобы они были общедоступными, и использовать Row.class вместо CountWordExample.class в toRetractStream.

Эти поля либо должны быть общедоступными, либо иметь общедоступные геттеры и сеттеры, чтобы механизм SQL мог с ними работать.

public class CountWordExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment=StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment streamTableEnvironment=StreamTableEnvironment.create(environment);
        
        DataStream<WC> streamOfWords =
                environment.fromElements(
                        new WC("Hello",1L),
                        new WC("Howdy",1L),
                        new WC("Hello",1L),
                        new WC("Hello",1L));

        Table t1 = streamTableEnvironment.fromDataStream(streamOfWords, "word, count");

        Table result = streamTableEnvironment.sqlQuery("select word, count(word) as wordcount from " + t1 + " group by word");

        streamTableEnvironment.toRetractStream(result, Row.class).print();

        environment.execute();
    }

    public static class WC {

        public String word;
        public Long count;

        public WC() {}

        public WC(String word,Long count) {
            this.word=word;
            this.count=count;
        }
    }
}
person David Anderson    schedule 02.09.2020
comment
Спасибо, сработало. Когда я выполнил указанную выше программу, я получил результат, как показано ниже. (true, Hello, 1) 3 ›(true, Howdy, 1) 4› (false, Hello, 1) 4 ›(true, Hello, 2) 4› (false, Hello, 2) 4 ›(true, Hello, 3) В каждом выводе я вижу истину или ложь. Я не понимал, почему это происходит. Кроме того, как вы предложили использовать Row.class, а не CountWordExample.class. Не могли бы вы объяснить почему? Спасибо! - person Shailendra; 02.09.2020