Я изучаю поведение Spark при присоединении таблицы к самой себе. Я использую Databricks.
Мой фиктивный сценарий:
Считайте внешнюю таблицу как кадр данных A (базовые файлы имеют дельта-формат)
Определите кадр данных B как кадр данных A с выбранными только определенными столбцами
Присоедините кадры данных A и B к столбцу1 и столбцу2
(Да, это не имеет особого смысла, я просто экспериментирую, чтобы понять основную механику Spark)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
b = a.select("column1", "column2", "columnA")
c= a.join(b, how="left", on = ["column1", "column2"])
Моей первой попыткой было запустить код как есть (попытка 1). Затем я попытался переразбить и кэшировать (попытка 2)
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).cache()
Наконец, я переразбил, отсортировал и кэшировал
a = spark.read.table("table") \
.select("column1", "column2", "column3", "column4") \
.withColumn("columnA", lower((concat(col("column4"), lit("_"), col("column5")))))
.repartition(col("column1"), col("column2")).sortWithinPartitions(col("column1"), col("column2")).cache()
Соответствующие сгенерированные даги прилагаются.
Мои вопросы:
Почему при попытке 1 таблица кажется кэшированной, хотя кэширование явно не указано.
Почему за InMemoreTableScan всегда следует другой узел этого типа.
Почему при попытке 3 кэширование происходит в два этапа?
Почему при попытке 3 WholeStageCodegen следует один (и только один) InMemoreTableScan.