Как сравнить две версии дельта-таблицы, чтобы получить изменения, аналогичные CDC?

Если я хочу использовать дельта-путешествие во времени для сравнения двух версий, чтобы получить изменения, аналогичные CDC, как это сделать?

Я вижу два варианта:

  1. в SQL у вас есть запрос EXCEPT / MINUS, в котором вы сравниваете все данные с другой таблицей. Я полагаю, вы тоже можете это использовать, верно? Но достаточно ли этого времени, если сравниваемые вами версии становятся все больше и больше, и вам всегда нужно сравнивать все со всеми строками последней версии?

  2. Делает ли Delta какой-то хэш для каждой строки и может это делать очень быстро, или это занимает очень много времени для дельты?


Найдено на резерве


person Jacek Laskowski    schedule 04.01.2020    source источник


Ответы (2)


Вы можете вычислить разницу двух версий таблицы, но, как вы уже догадались, это дорого. Также сложно вычислить фактическую разницу, когда в дельта-таблице есть изменения, отличные от добавлений.

обычно, когда люди спрашивают об этом, они пытаются разработать свою собственную систему, которая дает им ровно одну обработку данных от дельты до другого места; потоковая передача искры + для этого уже существует источник Delta

если вы хотите написать свой собственный, вы можете напрямую прочитать журнал транзакций (спецификация протокола находится в https://github.com/delta-io/delta/blob/master/PROTOCOL.md) и используйте действия в версиях между двумя вычисляемыми вами, чтобы выяснить, какие файлы изменения читать


Обратите внимание, что версии дельта-таблицы кэшируются (сохраняются Spark), поэтому сравнение разных наборов данных должно быть довольно дешевым.

val v0 = spark.read.format("delta").option("versionAsOf", 0).load("/tmp/delta/t2")
val v1 = spark.read.format("delta").option("versionAsOf", 1).load("/tmp/delta/t2")
// v0 and v1 are persisted - see Storage tab in web UI

Получить версии v0 и v1 не дорого; сравнение двух может быть как дорогостоящим, так и сложным. Если таблица предназначена только для добавления, то это (v1 ​​- v0); если у него есть апсерты, вам также нужно обработать (v0 - v1), а если у него есть метаданные или изменения протокола, это становится еще сложнее.

И когда вы проделываете всю эту логику самостоятельно, это подозрительно похоже на повторную реализацию DeltaSource.


Затем вы можете рассмотреть следующее:

val log = DeltaLog.forTable(spark, "/tmp/delta/t2")
val v0 = log.getSnapshotAt(0)
val actionsAtV0 = v0.state

val v1 = log.getSnapshotAt(1)
val actionsAtV1 = v1.state

actionsAtV0 и actionsAtV1 - все действия, которые привели дельта-таблицу к версиям 0 и 1, соответственно, и могут считаться CDC дельта-таблицы.

Это в основном чтение журнала транзакций, за исключением использования некоторых внутренних API-интерфейсов Delta, чтобы упростить эту задачу.

person Jacek Laskowski    schedule 04.01.2020

Databricks недавно добавили поток данных изменений (ранее известный как сбор данных об изменении дельты), и, похоже, он напрямую обращается к этому варианту использования -

https://docs.databricks.com/release-notes/runtime/8.2.html#incrementally-ingest-updates-and-deletions-in-delta-tables-using-a-change-data-feed-public-preview

Фид данных изменений дельта-таблицы представляет изменения на уровне строк между различными версиями таблицы. Если этот параметр включен, среда выполнения записывает дополнительную информацию об изменениях на уровне строк для каждой операции записи в таблице. Вы можете запросить эти изменения с помощью средств чтения SQL и DataFrame и DataStream. Корм позволяет:

  • Эффективное последующее использование слияния, обновления и удаления. Получение строк, которые были обновлены, вставлены или удалены, значительно улучшает производительность последующего задания, использующего выходные данные слияния, поскольку теперь целые файлы не нужно обрабатывать и дедуплицировать.
  • Поддержание синхронизации между репликами двух разных таблиц, представляющих одни и те же данные. Обычной практикой является поддержание двух версий одной и той же таблицы: одна узкая таблица как источник истины и более широкая таблица с дополнительными данными. Изменения могут быть эффективно применены от узкого стола к более широкому.
person Tagar    schedule 28.04.2021
comment
Это только Databricks Runtime 8.2 (коммерческая Delta Lake) и недоступна в версии Delta Lake OSS. - person Jacek Laskowski; 01.05.2021