Я пытаюсь добавить новый столбец к данным, хранящимся в виде дельта-таблицы в хранилище BLOB-объектов Azure. Большинство действий, выполняемых с данными, - это обновления, с множеством обновлений и несколькими новыми вставками. Мой код для записи данных в настоящее время выглядит так:
DeltaTable.forPath(spark, deltaPath)
.as("dest_table")
.merge(myDF.as("source_table"),
"dest_table.id = source_table.id")
.whenNotMatched()
.insertAll()
.whenMatched(upsertCond)
.updateExpr(upsertStat)
.execute()
Из этих документов, похоже, что Delta Lake поддерживает добавление новых столбцов только для вызовов insertAll()
и updateAll()
. Однако я обновляю только тогда, когда выполняются определенные условия и хочу, чтобы новый столбец был добавлен ко всем существующим данным (со значением по умолчанию null
).
Я придумал решение, которое кажется чрезвычайно неуклюжим, и мне интересно, есть ли более элегантный подход. Вот мое текущее предлагаемое решение:
// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")
// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")
// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)