Как управлять номером файла в выводе слияния Delta Lake

Я использую Delta Lake 0.4.0 с Merge, например:

target.alias("t")
          .merge(
            src.as("s"),
            "s.id = t.id 
          )
          .whenMatched().updateAll()
          .whenNotMatched().insertAll()
          .execute()

src читает из папки с тысячами файлов. Результаты слияния также генерируют множество небольших файлов. Есть ли способ контролировать номер файла в результатах слияния, например эффект перераспределения (1) или объединения (1)?

Спасибо


person processadd    schedule 19.11.2019    source источник


Ответы (2)


Невозможно контролировать количество файлов в операции дельта-вывода. Вместо этого используйте OPTIMIZE в нужное время или на таких платформах, как Databricks, воспользуйтесь преимуществом автоматическая оптимизация.

person Sim    schedule 19.11.2019
comment
хм, мы используем дельта-озеро с открытым исходным кодом, поэтому мы не можем использовать это, верно? Мы загружаем небольшие файлы данных в реальном времени в дельта-лейк. Мы рассчитываем избежать повторного сжатия. Возможно, нам придется разделить слияние, чтобы обновить и вставить его самостоятельно. - person processadd; 19.11.2019
comment
Если вы разделите его, вы потеряете атомарность, которая является ключевой особенностью Delta. Я бы настоятельно рекомендовал против этого. Вы можете запускать OPTIMIZE по расписанию. - person Sim; 19.11.2019
comment
Спасибо. Я думал об атомарности. Скажем, мы сначала обновляем, а затем вставляем. Если обновление выполнено успешно, но вставка не удалась, в следующий раз он снова обновит и вставит те же данные. Он должен быть идемпотентным (нужно полностью проверить это). Дело в том, что при слиянии для 500 файлов (около 1 ГБ) создается много маленьких файлов ~ 4 МБ. Не уверен, что оптимизирует работы для дельта-озера с открытым исходным кодом. - person processadd; 19.11.2019
comment
Проблема с атомарностью заключается в том, что между обновлением и вставкой таблица находится в несогласованном состоянии и, если вы не предпримете явных шагов для предотвращения этого, из-за характера того, как вы решаете проблему, через сервер рабочего процесса или (межкластерный ) система блокировки, операция может использовать это противоречивое состояние. Другими словами, без атомарности идемпотентность гарантируется только при определенном порядке операций, который вы должны обеспечить. Когда у вас есть атомарность, вам не нужно беспокоиться о порядке операций. (Идемпотентность — это всегда путь.) - person Sim; 20.11.2019
comment
Привет, Сим, автооптимизация, о которой ты упомянул, еще не включена в дельта-лейк с открытым исходным кодом, верно? Я попытался добавить spark.databricks.delta.optimizeWrite.enabled и spark.databricks.delta.autoCompact.enabled, но, похоже, это не сработало. - person processadd; 25.11.2019
comment
Ага, видел это. Если вы можете приостановить запись потока, вы можете перезаписать таблицу, используя df.repartition() и ACID I/O Delta. Это ручное уплотнение. - person Sim; 27.11.2019

Согласно https://docs.delta.io/latest/delta-update.html#performance-tuning теперь вы можете установить для spark.delta.merge.repartitionBeforeWrite значение true, чтобы этого избежать.

person Danilo Brambila    schedule 15.05.2020