Flink: Левое присоединение к потоку со статическим списком

Я хочу присоединиться к потоковой передаче попыток в статический список заблокированных писем и сгруппировать результат по IP, чтобы позже я мог подсчитать пакет соответствующей статистики. Результат должен отображаться в виде скользящего окна продолжительностью 30 минут через каждые 10 секунд. Ниже приведен один из нескольких способов, которыми я пытался этого добиться:

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS NOT NULL THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "LEFT JOIN LATERAL TABLE(blockedEmailsList()) AS T(blockedEmail) ON TRUE " +
        "WHERE Attempts.email <> '' AND Attempts.createdAt < CURRENT_TIMESTAMP " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

Здесь используется указанная ниже функция таблицы, определяемая пользователем, которая уже зарегистрирована в моем tableEnv как blockedEmailsList:

public class BlockedEmailsList extends TableFunction<Row> {
    private Collection<String> emails;

    public BlockedEmailsList(Collection<String> emails) {
        this.emails = emails;
    }

    public Row read(String email) {
        return Row.of(email);
    }

    public void eval() {
        this.emails.forEach(email -> collect(read(email)));
    }
}

Однако он возвращает ошибку ниже:

Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

Если я сделаю так, как он предлагает, и переведу created_at в TIMESTAMP, я получу вместо этого следующее:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Window can only be defined over a time attribute column.

Я нашел здесь другие вопросы о переполнении стека, связанные с этими исключениями, но они связаны с потоками и темпоральными таблицами, и ни один из них не решает случай присоединения потока к статическому списку.

Любые идеи?

РЕДАКТИРОВАТЬ: Похоже, для моего варианта использования в проекте Flink есть открытая проблема: https://cwiki.apache.org/confluence/display/FLINK/FLIP-17+Сторона+входы+для+DataStream+API

Итак, я также принимаю предложения по обходному пути.


person rodsoars    schedule 08.04.2020    source источник


Ответы (2)


Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

Причина в том, что боковая табличная функция - это обычное соединение Flink, а обычное соединение отправит нулевое значение, например

left:(K0, A), right(K1, T1)  => send    (K0, A, NULL, NULL)
left:         , right(K0, T2) => retract (K0, A, NULL, NULL )  
                                   send   (K0, A, K0, T2)

и, таким образом, атрибут времени из входного потока будет потерян после соединения.

В вашем случае вам не нужна TableFunction, вы можете использовать скалярную функцию, например:

 public static class BlockedEmailFunction extends ScalarFunction {
     private static List<String> blockedEmails = ...;
     public Boolean eval(String email) {
        return blockedEmails.contains(attempt.getEmail());
     }
 }


// register function
env.createTemporarySystemFunction("blockedEmailFunction", BlockedEmailFunction.class);

// call registered function in SQL and do window operation as your expected
env.sqlQuery("SELECT blockedEmailFunction(email) as status, ip, createdAt FROM Attempts");
 
person leonard    schedule 26.11.2020

Мне удалось реализовать обходной путь, который решил мою проблему!

Вместо объединения потоковых попыток со статическим списком писем я заранее сопоставил каждую попытку с новой с добавленным атрибутом blockedEmail. Если статический список blockedEmails содержит текущее электронное письмо с попыткой, я устанавливаю для его атрибута blockedEmail значение true.

DataStream<Attempt> attemptsStream = sourceApi.<Attempt>startStream().map(new MapFunction<Attempt, Attempt>() {
    @Override
    public Attempt map(Attempt attempt) throws Exception {
        if (blockedEmails.contains(attempt.getEmail())) {
            attempt.setBlockedEmail(true);
        }
        return attempt;
    }
});

Статический список blockedEmails имеет тип HashSet, поэтому поиск будет O (1).

Наконец, запрос на группировку был изменен на:

override fun performQuery(): Table {
    val query = "SELECT ip, " +
        "COUNT(CASE WHEN success IS false THEN 1 END) AS fails, " +
        "COUNT(CASE WHEN success IS true THEN 1 END) AS successes, " +
        "COUNT(DISTINCT id) accounts, " +
        "COUNT(CASE WHEN id = 0 THEN 1 END) AS non_existing_accounts, " +
        "COUNT(CASE WHEN blockedEmail IS true THEN 1 END) AS blocked_accounts " +
        "FROM Attempts " +
        "WHERE Attempts.email <> '' " +
        "GROUP BY HOP(Attempts.createdAt, INTERVAL '10' SECOND, INTERVAL '30' MINUTE), ip"

    return runQuery(query)
        .select("ip, accounts, fails, successes, non_existing_accounts, blocked_accounts")
}

Пока проблема объединения потоков и статических списков кажется нерешенной, но в моем случае вышеупомянутое временное решение решило ее нормально.

person rodsoars    schedule 14.04.2020