У меня есть таблица Flink со следующей структурой:
Id1, Id2, myTimestamp, value
Где время строки основано на myTimestamp
.
У меня хорошо работает следующая обработка:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable " +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
Я хочу адаптировать предыдущий код, например, для каждого окна я использую только последнюю запись для Id2
. Итак, я подумал, что изменение кода, как показано ниже, сработает:
Table processed = tableEnv.sqlQuery("SELECT " +
"Id1, " +
"MAX(myTimestamp) as myTimestamp, " +
"SUM(value) as value " +
"FROM MyTable, " +
"(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
"WHERE MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
"GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");
Но когда я это сделаю, я получаю следующую ошибку:
Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
at org.apache.flink.table.api.Table.insertInto(table.scala:1126)
Похоже, Флинк не «понимает», что две таблицы, к которым я присоединяюсь, - это одна и та же.
Как я могу делать то, что хочу?