Я хочу присоединиться к потоковой передаче попыток в статический список заблокированных писем и сгруппировать результат по IP, чтобы позже я мог подсчитать пакет соответствующей статистики. Результат должен отображаться в виде скользящего окна продолжительностью 30 минут через каждые 10 секунд. Ниже приведен один из нескольких способов, которыми я пытался этого добиться:
override fun performQuery(): Table {
val query = "SELECT ip, " +
"COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
"COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
"COUNT(DISTINCT id) accounts, " +
"COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
"COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
"FROM Attempts " +
"LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
"WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
"GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"
return runQuery(query)
.select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}
Здесь используется указанная ниже функция таблицы, определяемая пользователем, которая уже зарегистрирована в моем tableEnv
как blockedEmailsList
:
public class BlockedEmailsList extends TableFunction<Row> {
private Collection<String> emails;
public BlockedEmailsList(Collection<String> emails) {
this.emails = emails;
}
public Row read(String email) {
return Row.of(email);
}
public void eval() {
this.emails.forEach(email -> collect(read(email)));
}
}
Однако он возвращает ошибку ниже:
Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.
Если я сделаю так, как он предлагает, и переведу created_at
в TIMESTAMP
, я получу вместо этого следующее:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.
Я нашел здесь другие вопросы о переполнении стека, связанные с этими исключениями, но они связаны с потоками и темпоральными таблицами, и ни один из них не решает случай присоединения потока к статическому списку.
Любые идеи?
РЕДАКТИРОВАТЬ: Похоже, для моего варианта использования в проекте Flink есть открытая проблема: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Сторона+входы+для+DataStream+API
Итак, я также принимаю предложения по обходному пути.