Вы можете вычислить разницу двух версий таблицы, но, как вы уже догадались, это дорого. Также сложно вычислить фактическую разницу, когда в дельта-таблице есть изменения, отличные от добавлений.
обычно, когда люди спрашивают об этом, они пытаются разработать свою собственную систему, которая дает им ровно одну обработку данных от дельты до другого места; потоковая передача искры + для этого уже существует источник Delta
если вы хотите написать свой собственный, вы можете напрямую прочитать журнал транзакций (спецификация протокола находится в https://github.com/delta-io/delta/blob/master/PROTOCOL.md) и используйте действия в версиях между двумя вычисляемыми вами, чтобы выяснить, какие файлы изменения читать
Обратите внимание, что версии дельта-таблицы кэшируются (сохраняются Spark), поэтому сравнение разных наборов данных должно быть довольно дешевым.
val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
// v0 and v1 are persisted - see Storage tab in web UI
Получить версии v0 и v1 не дорого; сравнение двух может быть как дорогостоящим, так и сложным. Если таблица предназначена только для добавления, то это (v1 - v0); если у него есть апсерты, вам также нужно обработать (v0 - v1), а если у него есть метаданные или изменения протокола, это становится еще сложнее.
И когда вы проделываете всю эту логику самостоятельно, это подозрительно похоже на повторную реализацию DeltaSource.
Затем вы можете рассмотреть следующее:
val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
val v0 = log.getSnapshotAt(0)
val actionsAtV0 = v0.state
val v1 = log.getSnapshotAt(1)
val actionsAtV1 = v1.state
actionsAtV0
и actionsAtV1
- все действия, которые привели дельта-таблицу к версиям 0 и 1, соответственно, и могут считаться CDC дельта-таблицы.
Это в основном чтение журнала транзакций, за исключением использования некоторых внутренних API-интерфейсов Delta, чтобы упростить эту задачу.
person
Jacek Laskowski
schedule
04.01.2020