Spark DataFrame объединяет несколько столбцов в один столбец в виде строки

Я хочу преобразовать Spark DataFrame в другой DataFrame следующим образом:

У меня есть Spark DataFrame:

+---------+------------+
|protocol |   count    |
+---------+------------+
|      TCP|    8231    |
|     ICMP|    7314    |
|      UDP|    5523    |
|     IGMP|    4423    |
|      EGP|    2331    |
+---------+------------+

И я хочу превратить это в:

+----------------------------------------------------------+
|Aggregated                                                |
+----------------------------------------------------------+
|{tcp: 8231, icmp: 7314, udp: 5523, igmp: 4423, egp: 2331} |
+----------------------------------------------------------+

Агрегированный столбец может быть списком, картой или строкой. Возможно ли это с помощью функций DataFrame или мне нужно создать свой собственный udf для агрегирования этого?


person Dimas Rizky    schedule 17.05.2018    source источник
comment
Вы хотите использовать при этом все строки в фрейме данных? Поскольку их не так много, вероятно, будет проще собрать данные и использовать чистый Scala для их преобразования.   -  person Shaido    schedule 17.05.2018
comment
@Shaido Да, все строки, но приведенный пример - не единственные существующие строки. Строки будут увеличиваться со временем   -  person Dimas Rizky    schedule 17.05.2018


Ответы (3)


pivot и toJSON предоставят вам то, что вам нужно

import org.apache.spark.sql.functions.first

df.groupBy().pivot("protocol").agg(first("count")).toJSON.show(false)
// +----------------------------------------------------------+                    
// |value                                                     |
// +----------------------------------------------------------+
// |{"EGP":2331,"ICMP":7314,"IGMP":4423,"TCP":8321,"UDP":5523}|
// +----------------------------------------------------------+
person Alper t. Turker    schedule 17.05.2018

Поскольку вы хотите преобразовать все столбцы в один, а столбцов не так много, вы можете collect передать фрейм данных драйверу и использовать чистый код Scala для преобразования его в нужный формат.

Следующее даст вам Array[String]:

val res = df.as[(String, Int)].collect.map{case(protocol, count) => protocol + ": " + count}

Чтобы преобразовать его в одну строку, просто выполните:

val str = res.mkString("{", ", ", "}")
person Shaido    schedule 17.05.2018

Сконфигурируйте столбцы в фрейме данных и создайте новый столбец:

var new_df = df.withColumn("concat", concat($"protocol", lit(" : "), $"count"))

Чтобы объединить его в одну строку в виде списка, вы можете сделать это.

var new_df = new_df.groupBy().agg(collect_list("concat").as("aggregated"))
new_df.show

Если вы хотите получить данные в виде строки, а не фрейма данных, вы можете собрать их следующим образом.

new_df.select("concat").collect.map(x=> x.get(0)).mkString("{", ",", "}")
person Mann    schedule 17.05.2018