Как добавить новый столбец в таблицу Delta Lake?

Я пытаюсь добавить новый столбец к данным, хранящимся в виде дельта-таблицы в хранилище BLOB-объектов Azure. Большинство действий, выполняемых с данными, - это обновления, с множеством обновлений и несколькими новыми вставками. Мой код для записи данных в настоящее время выглядит так:

DeltaTable.forPath(spark, deltaPath)
      .as("dest_table")
      .merge(myDF.as("source_table"),
             "dest_table.id = source_table.id")
      .whenNotMatched()
      .insertAll()
      .whenMatched(upsertCond)
      .updateExpr(upsertStat)
      .execute()

Из этих документов, похоже, что Delta Lake поддерживает добавление новых столбцов только для вызовов insertAll() и updateAll(). Однако я обновляю только тогда, когда выполняются определенные условия и хочу, чтобы новый столбец был добавлен ко всем существующим данным (со значением по умолчанию null).

Я придумал решение, которое кажется чрезвычайно неуклюжим, и мне интересно, есть ли более элегантный подход. Вот мое текущее предлагаемое решение:

// Read in existing data
val myData = spark.read.format("delta").load(deltaPath)
// Register table with Hive metastore
myData.write.format("delta").saveAsTable("input_data")

// Add new column
spark.sql("ALTER TABLE input_data ADD COLUMNS (new_col string)")

// Save as DataFrame and overwrite data on disk
val sqlDF = spark.sql("SELECT * FROM input_data")
sqlDF.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(deltaPath)

person Comrade_Question    schedule 21.08.2020    source источник
comment
Используйте jdbc, а не искру. Это не для этого.   -  person Lamanus    schedule 22.08.2020
comment
Вы нашли какое-нибудь решение?   -  person sp_user123    schedule 09.10.2020


Ответы (2)


Сначала измените свою дельта-таблицу, а затем выполните операцию слияния:

from pyspark.sql.functions import lit

spark.read.format("delta").load('/mnt/delta/cov')\
  .withColumn("Recovered", lit(''))\
  .write\
  .format("delta")\
  .mode("overwrite")\
  .option("overwriteSchema", "true")\
  .save('/mnt/delta/cov')
person ashok gupta    schedule 10.10.2020
comment
Спасибо! Это то, чем я закончил. Однако это также работает (по крайней мере, в Databricks в Azure): ALTER TABLE delta.wasbs://[email protected]/ ADD COLUMNS (mycol STRING); - person Comrade_Question; 23.10.2020
comment
Но таким образом ... мы не делаем Эволюцию схемы :-( - person Christian Herrera Jiménez; 13.02.2021

Вы пробовали использовать оператор слияния?

https://docs.databricks.com/spark/latest/spark-sql/language-manual/merge-into.html

person Ekapol Uppapansettee    schedule 01.09.2020