Добавлять только новые агрегаты на основе групповых ключей

Мне приходится обрабатывать некоторые файлы, которые приходят ко мне ежедневно. Информация имеет первичный ключ (date,client_id,operation_id). Поэтому я создал поток, который добавляет в дельта-таблицу только новые данные:

operations\
        .repartition('date')\
        .writeStream\
        .outputMode('append')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/operations/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/operations')

Это работает нормально, но мне нужно обобщить эту информацию, сгруппированную по (date,client_id), поэтому я создал еще один поток из этой таблицы операций в новую таблицу:

summarized= spark.readStream.format('delta').load('/mnt/sandbox/operations')

summarized= summarized.groupBy('client_id','date').agg(<a lot of aggs>)

summarized.repartition('date')\
        .writeStream\
        .outputMode('complete')\
        .trigger(once=True)\
        .option("checkpointLocation", "/mnt/sandbox/summarized/_chk")\
        .format('delta')\
        .partitionBy('date')\
        .start('/mnt/sandbox/summarized')

Это работает, но каждый раз, когда я добавляю новые данные в таблицу operations, spark пересчитывает summarized снова и снова. Я попытался использовать режим добавления во второй потоковой передаче, но для этого нужны водяные знаки, а дата - DateType.

Существует способ рассчитать новые агрегаты только на основе групповых ключей и добавить их в summarized?


person LeandroHumb    schedule 25.09.2019    source источник


Ответы (1)


Вам необходимо использовать Структурированная потоковая передача Spark — оконные операции

Когда вы используете оконные операции, он будет выполнять группировку в соответствии с windowDuration и slideDuration. windowDuration говорит вам, какова длина окна, а slideDuration говорит, на сколько времени вы должны сдвинуть окно.

Если вы группируете с помощью window( ) [docs], вы получите результирующий столбец window вместе с другими столбцами, которые вы группируете, например client_id

Например:

windowDuration = "10 minutes"
slideDuration = "5 minutes"
summarized = before_summary.groupBy(before_summary.client_id,
    window(before_summary.date, windowDuration, slideDuration)
).agg(<a lot of aggs>).orderBy('window')
person pissall    schedule 25.09.2019
comment
Могу ли я использовать дату для указания окна? - person LeandroHumb; 25.09.2019
comment
@LeandroHumb вам нужно будет указать «дни» или около того - person pissall; 25.09.2019
comment
Я надеюсь, что ваш столбец date относится к типу timestamp - person pissall; 25.09.2019
comment
это не так, дата - DateType. - person LeandroHumb; 25.09.2019
comment
Пожалуйста, попробуйте операцию window, если она не работает, преобразуйте столбец в timestamp и попробуйте разницу windowDuration и slideDuration и дайте мне знать, если возникнут какие-либо проблемы. - person pissall; 26.09.2019
comment
попытался преобразовать поле date в метку времени, и теперь потоковый процесс работает, но на сток ничего не пишет - person LeandroHumb; 26.09.2019
comment
я просто сделаю это партиями, больше не могу с этим бороться, но спасибо @pissall за внимание - person LeandroHumb; 26.09.2019
comment
@LeandroHumb, если на ваш вопрос найден подходящий ответ, проголосуйте и выберите его. Если вам нужна помощь с записью его в сток, пожалуйста, обновите свой вопрос или задайте новый. - person pissall; 27.09.2019
comment
хорошая идея, напишу еще - person LeandroHumb; 27.09.2019