Хотите крутой мерч Future Vision? Посетите наш магазин здесь

Уровень хранения с открытым исходным кодом от Databricks, создателей Spark, для создания более простых и надежных корпоративных озер данных как локально, так и в облаке.

Это был один из самых громких анонсов, сделанных на Spark + AI Summit этого года. Первоначально эта функция была доступна только на платформе Databricks, но теперь она имеет открытый исходный код с разрешающей лицензией Apache License V2.

Что такое Дельта-Лейк?

Delta Lake - это, по сути, уровень вычислений, который будет располагаться поверх существующего кластера On Prem HDFS, вашего любимого облачного хранилища или даже запускать его локально на вашем ноутбуке (лучшая часть)! Данные хранятся в вышеупомянутом хранилище в виде версионных файлов Parquet. Любые данные, считываемые с помощью Spark, можно использовать для чтения и записи с помощью Delta Lake. Delta lakes предоставляет единую платформу для поддержки рабочих нагрузок как пакетной, так и потоковой обработки на единой платформе.

Зачем мне это нужно?

Ниже приведены некоторые из ключевых особенностей, которые мне показались интересными!

  1. ACID-транзакции в Spark: в типичном озере данных много пользователей будет иметь доступ, т.е. считывать и записывать данные в нем, и очень важно, чтобы сохранялась целостность данных. ACID - ключевая особенность большинства баз данных, но когда дело доходит до HDFS или S3, как правило, очень сложно предоставить те же гарантии долговечности, которые предоставляют нам базы данных ACID. Delta Lake хранит журнал транзакций, чтобы отслеживать все коммиты, сделанные в каталоге таблицы для предоставления транзакций ACID. Он обеспечивает уровни изоляции Serializable для обеспечения согласованности данных между несколькими пользователями.
  2. Унифицированная пакетная и потоковая обработка: в озере данных, если у нас есть вариант использования как потоковой обработки, так и пакетной обработки, нормально следовать архитектуре Lamdba. В озере данных данные, поступающие как поток (возможно, из kafka), или любые исторические данные, которые у вас есть (например, HDFS), представляют собой одну и ту же таблицу. Это дает единое представление об обеих этих двух разных парадигмах. прием данных, пакетная историческая засыпка и интерактивные запросы работают сразу после установки без особых дополнительных усилий.
  3. Применение схемы: озеро данных помогает избежать попадания неверных данных в ваши озера данных, предоставляя возможность указывать схему и обеспечивать ее соблюдение. Оно предотвращает повреждение данных, предотвращая попадание неверных данных в систему еще до данные загружаются в озеро данных путем выдачи разумных сообщений об ошибках.
  4. Путешествие во времени: данные в озере данных будут версионироваться, и будут предоставлены моментальные снимки, чтобы вы могли запрашивать их, как если бы этот моментальный снимок был текущим состоянием системы. Это помогает нам вернуться к более старым версиям нашего озера данных для аудита, отката и тому подобного.

Начните с этого:

Я хотел бы поделиться некоторыми примерами на Scala и добавлю несколько примеров на Python в своей следующей статье.

Во-первых, мы должны добавить зависимость библиотеки в наш файл Scala SBT.

"io.delta" %% "delta-core" % "0.1.0",

Если вы используете искровую оболочку:

spark-shell --packages io.delta:delta-core_2.12:0.1.0

Мы можем использовать DataframeReader / Writer для чтения или записи данных в озеро Дельта.

val df = Seq( (1, "IL", "USA"),(2, "IL", "USA"),(3, "MO", "USA"),(4, "IL", "USA"),(5, "KA", "INDIA"),(6, "MEL", "AUS")
).toDF("id", "state", "country")
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("delta/myDeltaTable")
val df1 = spark.read.format("delta").load("delta/myDeltaTable")
df1.show()

overwriteSchema пригодится при перезаписи таблицы, поскольку по умолчанию перезапись данных в таблице не перезаписывает схему, даже если установлен режим overwrite.

Чтобы запросить более старый снимок таблицы Delta Lake

Мы можем запросить состояние таблицы на основе числа timestamp или version.

Допустим, мы обновляем вышеуказанный фрейм данных.

val df2 = df1.withColumn("versionedcountry",lower(col("country"))).drop("country")
df2.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("delta/myDeltaTable")
df2.show()
+---+-----+--------------+
| id|state|versioncountry|
+---+-----+--------------+
|  5|   KA|         india|
|  6|  MEL|           aus|
|  3|   MO|           usa|
|  2|   IL|           usa|
|  4|   IL|           usa|
|  1|   IL|           usa|
+---+-----+--------------+
spark.read.format("delta").option("versionAsOf", 0).load("delta/myDeltaTable").show()
+---+-----+-------+
| id|state|country|
+---+-----+-------+
|  5|   KA|  INDIA|
|  6|  MEL|    AUS|
|  1|   IL|    USA|
|  2|   IL|    USA|
|  3|   MO|    USA|
|  4|   IL|    USA|
+---+-----+-------+
spark.read.format("delta").option("timestampAsOf", "2019-04-27 15:18:37").load("delta/myDeltaTable").show()
+---+-----+--------------+
| id|state|versioncountry|
+---+-----+--------------+
|  5|   KA|         india|
|  6|  MEL|           aus|
|  3|   MO|           usa|
|  2|   IL|           usa|
|  4|   IL|           usa|
|  1|   IL|           usa|
+---+-----+--------------+

Отметка времени может быть строкой даты или отметки времени.

Проверка схемы

Допустим, мы получаем другой похожий df, но с той же другой схемой. Здесь все то же самое, но только тип данных столбца id отличается от id, который мы сохранили ранее, который был IntegerType. Это бросит

val df = Seq( ("1", "IL", "USA")).toDF("id", "state", "country")

При несоответствии схемы он немедленно выдаст ошибку.

df.write.format("delta").mode("append").save("delta/myDeltaTable")
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types IntegerType and StringType;;
 at org.apache.spark.sql.delta.schema.SchemaUtils$.$anonfun$mergeSchemas$1(SchemaUtils.scala:526)

Когда есть некоторые отсутствующие данные и остальная часть схемы совпадает, это поле устанавливается равным нулю.

val df = Seq( (100, "CA")).toDF("id", "state")
df.write.format("delta").mode("append").save("delta/myDeltaTable")

  spark.read.format("delta").load("delta/myDeltaTable").show()
+---+-----+--------------+
| id|state|versioncountry|
+---+-----+--------------+
|  5|   KA|         india|
|  6|  MEL|           aus|
|  3|   MO|           usa|
|  2|   IL|           usa|
|  4|   IL|           usa|
|  1|   IL|           usa|
|100|   CA|          null|
+---+-----+--------------+

Но если есть какой-либо новый столбец, он немедленно вызовет исключение Analysis.

val df = Seq((100, "CA")).toDF("id", "state","New")
df.write.format("delta").mode("append").save("delta/myDeltaTable")
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (2): _1, _2
New column names (3): id, state, New

Функции пока недоступны:

Delta Lake не поддерживает транзакции с несколькими таблицами и внешние ключи.
Delta Lake поддерживает транзакции на уровне таблицы.
Delta не поддерживает DStream API.

Спасибо за прочтение! Пожалуйста, поделитесь статьей, если она вам понравилась. Любые комментарии и предложения приветствуются! Посмотрите другие мои статьи здесь.