Apache Spark: влияние повторного разбиения, сортировки и кэширования на соединение

Я изучаю поведение Spark при присоединении таблицы к самой себе. Я использую Databricks.

Мой фиктивный сценарий:

  1. Считайте внешнюю таблицу как кадр данных A (базовые файлы имеют дельта-формат)

  2. Определите кадр данных B как кадр данных A с выбранными только определенными столбцами

  3. Присоедините кадры данных A и B к столбцу1 и столбцу2

(Да, это не имеет особого смысла, я просто экспериментирую, чтобы понять основную механику Spark)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))

b = a.select("column1", "column2", "columnA")

c= a.join(b, how="left", on = ["column1", "column2"])

Моей первой попыткой было запустить код как есть (попытка 1). Затем я попытался переразбить и кэшировать (попытка 2)

a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()

Наконец, я переразбил, отсортировал и кэшировал

 a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()

Соответствующие сгенерированные даги прилагаются.

Мои вопросы:

  1. Почему при попытке 1 таблица кажется кэшированной, хотя кэширование явно не указано.

  2. Почему за InMemoreTableScan всегда следует другой узел этого типа.

  3. Почему при попытке 3 кэширование происходит в два этапа?

  4. Почему при попытке 3 WholeStageCodegen следует один (и только один) InMemoreTableScan.

попытка 1

попытка 2

введите описание изображения здесь


person Dawid    schedule 03.01.2020    source источник
comment
Я подозреваю, что считыватель DataFrame автоматически кэширует данные, когда источником является внешняя таблица. У меня аналогичная ситуация, когда я читаю данные из таблицы базы данных, в то время как при загрузке вкладка SQL в «интерфейсе сведений о приложении» отображается количество загружаемых строк, но файл еще не сохранен в указанном месте. Я предполагаю, что он знает счет, потому что он где-то кэширует данные, и это то, что отображается в DAG. Если вы читаете данные из текстового файла локально, вы не увидите состояние кеша.   -  person Salim    schedule 08.01.2020


Ответы (1)


То, что вы видите в этих трех планах, представляет собой смесь среды выполнения DataBricks и Spark.

Во-первых, при запуске среды выполнения DataBricks 3.3+ кэширование автоматически включается для всех файлов паркета. Соответствующий конфиг для этого: spark.databricks.io.cache.enabled true

Для вашего второго запроса InMemoryTableScan происходит дважды, потому что прямо при вызове соединения spark пытался вычислить набор данных A и набор данных B параллельно. Предполагая, что вышеуказанные задачи были назначены разным исполнителям, обоим придется сканировать таблицу из кеша (DataBricks).

Что касается третьего, InMemoryTableScan сам по себе не относится к кэшированию. Это просто означает, что любой сформированный катализатор плана включал многократное сканирование кэшированной таблицы.

PS: я не могу визуализировать пункт 4 :)

person Ashvjit Singh    schedule 09.01.2020