Обработка дубликатов при обработке потоковых данных в таблице Delta Databricks с помощью Spark Structured Streaming?

Я использую Spark Structured Streaming с Azure Databricks Delta, где я пишу в дельта-таблицу (имя дельта-таблицы необработанное). Я читаю из файлов Azure, где я получаю данные не по порядку, и у меня есть 2 столбца в нем «smtUidNr» и «_2 _». Я пытаюсь обработать дубликаты, используя Upsert в моем коде, но когда я запрашиваю свою дельта-таблицу «raw». Я вижу следующие повторяющиеся записи в моей дельта-таблице

    smtUidNr                                 msgTs
    57A94ADA218547DC8AE2F3E7FB14339D    2019-08-26T08:58:46.000+0000
    57A94ADA218547DC8AE2F3E7FB14339D    2019-08-26T08:58:46.000+0000
    57A94ADA218547DC8AE2F3E7FB14339D    2019-08-26T08:58:46.000+0000

Вот мой код:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._


// merge duplicates
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {


  microBatchOutputDF.createOrReplaceTempView("updates")


  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO raw t
    USING updates s
    ON (s.smtUidNr = t.smtUidNr and s.msgTs>t.msgTs) 
    WHEN MATCHED THEN UPDATE SET * 
    WHEN NOT MATCHED THEN INSERT *
  """)
}


val df=spark.readStream.format("delta").load("abfss://[email protected]/entrypacket/")
df.createOrReplaceTempView("table1")
val entrypacket_DF=spark.sql("""SELECT details as dcl,invdetails as inv,eventdetails as evt,smtdetails as smt,msgHdr.msgTs,msgHdr.msgInfSrcCd FROM table1 LATERAL VIEW explode(dcl) dcl AS details LATERAL VIEW explode(inv) inv AS invdetails LATERAL VIEW explode(evt) evt as eventdetails LATERAL VIEW explode(smt) smt as smtdetails""").dropDuplicates()


entrypacket_DF.createOrReplaceTempView("ucdx")

//Here, we are adding a column date_timestamp which converts msgTs timestamp to YYYYMMDD format in column date_timestamp which eliminates duplicate for today & then we drop this column meaning which we are not tampering with msgTs column
val resultDF=spark.sql("select dcl.smtUidNr,dcl,inv,evt,smt,cast(msgTs as timestamp)msgTs,msgInfSrcCd from ucdx").withColumn("date_timestamp",to_date(col("msgTs"))).dropDuplicates(Seq("smtUidNr","date_timestamp")).drop("date_timestamp")

resultDF.createOrReplaceTempView("final_tab")

val finalDF=spark.sql("select distinct smtUidNr,max(dcl) as dcl,max(inv) as inv,max(evt) as evt,max(smt) as smt,max(msgTs) as msgTs,max(msgInfSrcCd) as msgInfSrcCd from final_tab group by smtUidNr")


finalDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start()

Структурированная потоковая передача не поддерживает агрегирование, оконную функцию и порядок по пункту? Что я могу сделать, чтобы изменить свой код, чтобы у меня была только одна запись определенного smtUidNr?


person AKSHAY SHINGOTE    schedule 09.10.2019    source источник


Ответы (2)


Что вам нужно сделать, так это выполнить дедупликацию в методе foreachBatch, поэтому вы гарантируете, что каждое пакетное слияние записывает только одно значение для каждого ключа.

В вашем примере вы бы сделали следующее:

def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {

  microBatchOutputDF
    .select('smtUidNr, struct('msgTs, 'dcl, 'inv, 'evt, 'smt, 'msgInfSrcCd).as("cols"))
    .groupBy('smtUidNr)
    .agg(max('cols).as("latest"))
    .select("smtUidNr", "latest.*")
    .createOrReplaceTempView("updates")

  microBatchOutputDF.sparkSession.sql(s"""
    MERGE INTO raw t
    USING updates s
    ON (s.smtUidNr = t.smtUidNr and s.msgTs>t.msgTs) 
    WHEN MATCHED THEN UPDATE SET * 
    WHEN NOT MATCHED THEN INSERT *
  """)
}

finalDF.writeStream.foreachBatch(upsertToDelta _).outputMode("update").start()

Вы можете увидеть еще несколько примеров в документации, здесь

person Silvio    schedule 18.10.2019

Следующий фрагмент кода поможет вам найти последнюю запись, если существует несколько строк с одним и тем же уникальным идентификатором. А также возьмите только одну строку, если несколько строк абсолютно одинаковы.

Пусть уникальный ключ / ключи, с помощью которых вы будете фильтровать свою строку / запись, будут 'id'. У вас есть столбец «отметка времени», чтобы найти последнюю запись для того же идентификатора.

def upsertToDelta(micro_batch_df, batchId) :
   delta_table = DeltaTable.forName(spark, f'{database}.{table_name}')
   df = micro_batch_df.dropDuplicates(['id']) \
       .withColumn("r", rank().over(Window.partitionBy('id') \
       .orderBy(col('timestamp').desc()))).filter("r==1").drop("r")
   delta_table.alias("t") \
      .merge(df.alias("s"), 's.id = t.id') \
      .whenMatchedUpdateAll() \
      .whenNotMatchedInsertAll() \
      .execute()
final_df.writeStream \
  .foreachBatch(upsertToDelta) \
  .option('checkpointLocation', '/mnt/path/checkpoint') \
  .outputMode('update') \
  .start()
person Vignesh G    schedule 02.04.2021