Я использую Spark Structured Streaming с Azure Databricks Delta, где я пишу в дельта-таблицу (имя дельта-таблицы необработанное). Я читаю из файлов Azure, где я получаю данные не по порядку, и у меня есть 2 столбца в нем «smtUidNr
» и «_2 _». Я пытаюсь обработать дубликаты, используя Upsert в моем коде, но когда я запрашиваю свою дельта-таблицу «raw
». Я вижу следующие повторяющиеся записи в моей дельта-таблице
smtUidNr msgTs
57A94ADA218547DC8AE2F3E7FB14339D 2019-08-26T08:58:46.000+0000
57A94ADA218547DC8AE2F3E7FB14339D 2019-08-26T08:58:46.000+0000
57A94ADA218547DC8AE2F3E7FB14339D 2019-08-26T08:58:46.000+0000
Вот мой код:
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
// merge duplicates
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {
microBatchOutputDF.createOrReplaceTempView("updates")
microBatchOutputDF.sparkSession.sql(s"""
MERGE INTO raw t
USING updates s
ON (s.smtUidNr = t.smtUidNr and s.msgTs>t.msgTs)
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")
}
val df=spark.readStream.format("delta").load("abfss://[email protected]/entrypacket/")
df.createOrReplaceTempView("table1")
val entrypacket_DF=spark.sql("""SELECT details as dcl,invdetails as inv,eventdetails as evt,smtdetails as smt,msgHdr.msgTs,msgHdr.msgInfSrcCd FROM table1 LATERAL VIEW explode(dcl) dcl AS details LATERAL VIEW explode(inv) inv AS invdetails LATERAL VIEW explode(evt) evt as eventdetails LATERAL VIEW explode(smt) smt as smtdetails""").dropDuplicates()
entrypacket_DF.createOrReplaceTempView("ucdx")
//Here, we are adding a column date_timestamp which converts msgTs timestamp to YYYYMMDD format in column date_timestamp which eliminates duplicate for today & then we drop this column meaning which we are not tampering with msgTs column
val resultDF=spark.sql("select dcl.smtUidNr,dcl,inv,evt,smt,cast(msgTs as timestamp)msgTs,msgInfSrcCd from ucdx").withColumn("date_timestamp",to_date(col("msgTs"))).dropDuplicates(Seq("smtUidNr","date_timestamp")).drop("date_timestamp")
resultDF.createOrReplaceTempView("final_tab")
val finalDF=spark.sql("select distinct smtUidNr,max(dcl) as dcl,max(inv) as inv,max(evt) as evt,max(smt) as smt,max(msgTs) as msgTs,max(msgInfSrcCd) as msgInfSrcCd from final_tab group by smtUidNr")
finalDF.writeStream.format("delta").foreachBatch(upsertToDelta _).outputMode("update").start()
Структурированная потоковая передача не поддерживает агрегирование, оконную функцию и порядок по пункту? Что я могу сделать, чтобы изменить свой код, чтобы у меня была только одна запись определенного smtUidNr?