Flink Autojoin со столбцом времени строки

У меня есть таблица Flink со следующей структурой:

Id1, Id2, myTimestamp, value

Где время строки основано на myTimestamp.

У меня хорошо работает следующая обработка:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable " +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

Я хочу адаптировать предыдущий код, например, для каждого окна я использую только последнюю запись для Id2. Итак, я подумал, что изменение кода, как показано ниже, сработает:

Table processed = tableEnv.sqlQuery("SELECT " +
                "Id1, " +
                "MAX(myTimestamp) as myTimestamp, " +
                "SUM(value) as value " +
                "FROM MyTable, " + 
                "(SELECT Id2, MAX(myTimestamp) as latestTimestamp FROM MyTable GROUP BY Id2) as RecordsLatest" +
                "WHERE  MyTable.Id2 = RecordsLatest.Id2 AND MyTable.myTimestamp = RecordsLatest.myTimestamp" +
                "GROUP BY Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)");

Но когда я это сделаю, я получаю следующую ошибку:

Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Please check the documentation for the set of currently supported SQL features.
    at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:387)
    at org.apache.flink.table.api.TableEnvironment.optimizePhysicalPlan(TableEnvironment.scala:302)
    at org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:816)
    at org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
    at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
    at org.apache.flink.table.api.Table.insertInto(table.scala:1126)

Похоже, Флинк не «понимает», что две таблицы, к которым я присоединяюсь, - это одна и та же.

Как я могу делать то, что хочу?


person Nakeuh    schedule 26.08.2019    source источник
comment
возможный дубликат с stackoverflow.com/questions/57181771/   -  person MIkCode    schedule 26.08.2019
comment
Ну, вообще-то это тоже был мой вопрос, и это не одно и то же. По первому вопросу мои данные были статичными, и я смог обойти проблему с помощью TableFunction. Но в моем случае мои данные динамические, поэтому я не могу использовать это решение, и никакого другого рабочего решения не было предоставлено.   -  person Nakeuh    schedule 26.08.2019
comment
Вы пробовали этот подход ci.apache.org/projects/flink/flink-docs-stable/dev/table/   -  person MIkCode    schedule 26.08.2019


Ответы (1)


Ваш запрос не работает по нескольким причинам.

SELECT 
  Id1, MAX(myTimestamp) as myTimestamp, SUM(value) as value 
FROM 
  MyTable, 
  (SELECT Id2, MAX(myTimestamp) as latestTimestamp 
   FROM MyTable 
   GROUP BY Id2
  ) as RecordsLatest
WHERE 
  MyTable.Id2 = RecordsLatest.Id2 
  AND MyTable.myTimestamp = RecordsLatest.myTimestamp
GROUP BY 
  Id1, HOP(rowtime, INTERVAL 10 SECOND, INTERVAL 30 SECOND)

Некоторые из них связаны с ограничениями Flink, другие - более фундаментальными.

  1. latestTimestamp больше не является атрибутом rowtime. Это потому, что это вычислено. Как только вы используете атрибут rowtime в выражении (включая функции агрегирования, такие как MAX), они теряют свое свойство rowtime и становятся обычными атрибутами TIMESTAMP.
  2. Внутренний запрос создает динамическую таблицу, которая обновляет свои результаты. Это не таблица, предназначенная только для добавления. Как только максимальная временная метка Id2 изменится, предыдущую строку результатов необходимо отозвать и вставить новую строку результатов.
  3. Поскольку RecordsLatest является обновляемой таблицей (а не таблицей только для добавления), а latestTimestamp не является атрибутом rowtime, соединение RecordsLatest и MyTable является "обычным соединением" (а не соединением с временным окном), которое также дает результат обновления. и не результат только добавления. Обычное соединение не может создавать какие-либо атрибуты rowtime, потому что нет гарантии относительно порядка вывода строк (что является предварительным условием для атрибутов rowtime, потому что они должны быть выровнены с водяными знаками), и в будущем может потребоваться удалить результаты. Это вызывает сообщение об ошибке, которое вы видите.
  4. Предложение GROUP BY внешнего запроса требует входной таблицы только для добавления с атрибутом rowtime rowtime. Однако выходные данные соединения предназначены не только для добавления, но и для обновления, и атрибут rowtime не может быть атрибутом rowtime, как объяснялось ранее.

К сожалению, решить вашу задачу непросто, но вполне возможно.

Прежде всего, вы должны вернуть запрос, который возвращает для каждого (Id1, Id2) окна значения для строки с максимальной временной меткой:

SELECT 
  Id1, Id2,
  MAX(myTimestamp) AS maxT
  ValOfMaxT(valX, myTimestamp) AS valXOfMaxT,
  HOP_ROWTIME(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND) AS rowtime
FROM
  MyTable
GROUP BY
  Id1, Id2, HOP(myTimestamp, INTERVAL '10' SECOND, INTERVAL '30' SECOND)

Функция ValOfMaxT - это определяемая пользователем функция агрегирования, которая определяет значение максимальной отметки времени и возвращает его. rowtime - это новый атрибут rowtime и за 1 мс до конечной отметки времени окна.

Учитывая эту таблицу, назовем ее Temp, вы можете определить следующий запрос как:


SELECT
  Id1, MAX(maxT) as myTimestamp, SUM(valXOfMaxT)
FROM Temp
GROUP BY
  Id1, TUMBLE(rowtime, INTERVAL '10' SECONDS)

Этот запрос группирует только Id1 и TUMBLE окно. Это TUMBLE окно, потому что первое HOP окно уже сгруппировало каждую запись в три окна, и мы не должны делать это снова. Вместо этого мы группируем результат первого запроса в 10 вторых окон, потому что это длина слайда HOP окон в первом запросе.

person Fabian Hueske    schedule 30.08.2019
comment
Спасибо за Ваш ответ. Мой исходный код выполняет некоторую агрегацию по временному окну и полю Id1. В измененном коде я хочу сохранить для каждого Id2 только запись с наивысшим значением myTimestamp, а затем применить исходную агрегацию по Id1 и временному окну. Надеюсь, это яснее. - person Nakeuh; 30.08.2019
comment
Хм, так в каждом окне вы хотите иметь только запись с наивысшим myTimestamp и агрегировать по одной записи? - person Fabian Hueske; 30.08.2019
comment
Не совсем. Есть два разных ключа. Ключ Id2, для которого мне нужна только запись с наивысшим myTimestamp, а затем сгруппировать все эти «последние Id2» по ключу Id1 - person Nakeuh; 30.08.2019
comment
Идеально. Спасибо ! - person Nakeuh; 30.08.2019