Как получить ключи и значения из столбца MapType в SparkSQL DataFrame

У меня есть данные в файле паркета, который имеет 2 поля: object_id: String и alpha: Map<>.

Он считывается во фрейм данных в sparkSQL, и схема выглядит так:

scala> alphaDF.printSchema()
root
 |-- object_id: string (nullable = true)
 |-- ALPHA: map (nullable = true)
 |    |-- key: string
 |    |-- value: struct (valueContainsNull = true)

Я использую Spark 2.0 и пытаюсь создать новый фрейм данных, в котором столбцы должны быть object_id плюс ключи карты ALPHA, как в object_id, key1, key2, key2, ...

Сначала я пытался проверить, могу ли я хотя бы получить доступ к карте следующим образом:

scala> alphaDF.map(a => a(0)).collect()
<console>:32: error: Unable to find encoder for type stored in a Dataset.
Primitive types (Int, String, etc) and Product types (case classes) are 
supported by importing spark.implicits._  Support for serializing other
types will be added in future releases.
   alphaDF.map(a => a(0)).collect()

но, к сожалению, я не могу понять, как получить доступ к ключам карты.

Может ли кто-нибудь показать мне способ получить object_id плюс ключи карты в качестве имен столбцов и значения карты в качестве соответствующих значений в новом фрейме данных?


person lloydh    schedule 15.11.2016    source источник
comment
Нет супер чистого способа сделать это, я не думаю: stackoverflow.com/a/33345698/2661491   -  person evan.oman    schedule 15.11.2016


Ответы (2)


Spark> = 2.3

Вы можете упростить процесс с помощью функции map_keys:

import org.apache.spark.sql.functions.map_keys

Также есть map_values функция, но здесь она не будет полезна напрямую.

Spark ‹2.3

Общий метод можно выразить в несколько шагов. Первый требуемый импорт:

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.Row

и пример данных:

val ds = Seq(
  (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
  (2, Map("foo" -> (3, "c"))),
  (3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")

Для извлечения ключей мы можем использовать UDF (Spark ‹2.3)

val map_keys = udf[Seq[String], Map[String, Row]](_.keys.toSeq)

или встроенные функции

import org.apache.spark.sql.functions.map_keys

val keysDF = df.select(map_keys($"alpha"))

Найдите разные:

val distinctKeys = keysDF.as[Seq[String]].flatMap(identity).distinct
  .collect.sorted

Вы также можете обобщить извлечение keys с помощью explode:

import org.apache.spark.sql.functions.explode

val distinctKeys = df
  // Flatten the column into key, value columns
 .select(explode($"alpha"))
 .select($"key")
 .as[String].distinct
 .collect.sorted

И select:

ds.select($"id" +: distinctKeys.map(x => $"alpha".getItem(x).alias(x)): _*)
person zero323    schedule 15.11.2016
comment
А как это реализовать в PySpaek? - person Hailin FU; 29.04.2019

А если вы работаете в PySpark, я просто нахожу простую реализацию:

from pyspark.sql.functions import map_keys

alphaDF.select(map_keys("ALPHA").alias("keys")).show()

Вы можете проверить подробности в здесь

person Hailin FU    schedule 29.04.2019