Как распаковать несколько ключей в наборе данных Spark

У меня есть следующий DataSet со следующей структурой.

case class Person(age: Int, gender: String, salary: Double)

Я хочу определить среднюю зарплату по gender и age, поэтому я группирую DS по обоим ключам. Я столкнулся с двумя основными проблемами: во-первых, оба ключа смешаны в одном, но я хочу сохранить их в двух разных столбцах, во-вторых, столбец aggregated получает глупое длинное имя, и я не могу понять как его переименовать (видимо as и alias не подойдут) все это через DS API.

val df = sc.parallelize(List(Person(100000.00, "male", 27), 
  Person(120000.00, "male", 27), 
  Person(95000, "male", 26),
  Person(89000, "female", 31),
  Person(250000, "female", 51),
  Person(120000, "female", 51)
)).toDF.as[Person]

df.groupByKey(p => (p.gender, p.age)).agg(typed.avg(_.salary)).show()

+-----------+------------------------------------------------------------------------------------------------+
|        key| TypedAverage(line2503618a50834b67a4b132d1b8d2310b12.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Person)|          
+-----------+------------------------------------------------------------------------------------------------+ 
|[female,31]|  89000.0... 
|[female,51]| 185000.0...
|  [male,27]| 110000.0...
|  [male,26]|  95000.0...
+-----------+------------------------------------------------------------------------------------------------+

person Alberto Bonsanto    schedule 19.03.2017    source источник


Ответы (2)


Псевдоним — это нетипизированное действие, поэтому вы должны повторить его после. И единственный способ распаковать ключ - сделать это после, через выбор или что-то в этом роде:

df.groupByKey(p => (p.gender, p.age))
  .agg(typed.avg[Person](_.salary).as("average_salary").as[Double])
  .select($"key._1",$"key._2",$"average_salary").show
person Justin Pihony    schedule 19.03.2017

Самый простой способ достичь обеих целей — снова map() перейти от результата агрегации к экземпляру Person:

.map{case ((gender, age), salary) => Person(gender, age, salary)}

Результат будет выглядеть лучше, если немного изменить порядок аргументов в конструкторе case-класса:

case class Person(gender: String, age: Int, salary: Double)
+------+---+--------+
|gender|age|  salary|
+------+---+--------+
|female| 31| 89000.0|
|female| 51|185000.0|
|  male| 27|110000.0|
|  male| 26| 95000.0|
+------+---+--------+

Полный код:

import session.implicits._
val df = session.sparkContext.parallelize(List(
  Person("male", 27, 100000),
  Person("male", 27, 120000),
  Person("male", 26, 95000),
  Person("female", 31, 89000),
  Person("female", 51, 250000),
  Person("female", 51, 120000)
)).toDS

import org.apache.spark.sql.expressions.scalalang.typed
df.groupByKey(p => (p.gender, p.age))
  .agg(typed.avg(_.salary))
  .map{case ((gender, age), salary) => Person(gender, age, salary)}
  .show()
person Vlad.Bachurin    schedule 10.07.2017