Как использовать набор данных для группировки

У меня есть просьба использовать для этого rdd :

val test = Seq(("New York", "Jack"),
    ("Los Angeles", "Tom"),
    ("Chicago", "David"),
    ("Houston", "John"),
    ("Detroit", "Michael"),
    ("Chicago", "Andrew"),
    ("Detroit", "Peter"),
    ("Detroit", "George")
  )
sc.parallelize(test).groupByKey().mapValues(_.toList).foreach(println)

В результате :

(Нью-Йорк, Лист (Джек))

(Детройт, Лист (Майкл, Питер, Джордж))

(Лос-Анджелес, Список (Том))

(Хьюстон, Лист (Джон))

(Чикаго, Лист (Дэвид, Эндрю))

Как использовать набор данных с spark2.0?

У меня есть способ использовать пользовательскую функцию, но ощущение настолько сложное, что нет простого точечного метода?


person monkeysjourney    schedule 07.06.2017    source источник


Ответы (3)


Я бы посоветовал вам начать с создания case class как

case class Monkey(city: String, firstName: String)

Этот case class должен быть определен вне основного класса. Затем вы можете просто использовать функцию toDS и использовать функцию groupBy и aggregation с именем collect_list, как показано ниже.

import sqlContext.implicits._
import org.apache.spark.sql.functions._

val test = Seq(("New York", "Jack"),
  ("Los Angeles", "Tom"),
  ("Chicago", "David"),
  ("Houston", "John"),
  ("Detroit", "Michael"),
  ("Chicago", "Andrew"),
  ("Detroit", "Peter"),
  ("Detroit", "George")
)
sc.parallelize(test)
  .map(row => Monkey(row._1, row._2))
  .toDS()
  .groupBy("city")
  .agg(collect_list("firstName") as "list")
  .show(false)

У вас будет вывод как

+-----------+------------------------+
|city       |list                    |
+-----------+------------------------+
|Los Angeles|[Tom]                   |
|Detroit    |[Michael, Peter, George]|
|Chicago    |[David, Andrew]         |
|Houston    |[John]                  |
|New York   |[Jack]                  |
+-----------+------------------------+

Вы всегда можете преобразовать обратно в RDD, просто вызвав функцию .rdd

person Ramesh Maharjan    schedule 07.06.2017

Чтобы создать набор данных, сначала определите класс случая вне вашего класса как

case class Employee(city: String, name: String)

Затем вы можете преобразовать список в набор данных как

  val spark =
    SparkSession.builder().master("local").appName("test").getOrCreate()
    import spark.implicits._
    val test = Seq(("New York", "Jack"),
    ("Los Angeles", "Tom"),
    ("Chicago", "David"),
    ("Houston", "John"),
    ("Detroit", "Michael"),
    ("Chicago", "Andrew"),
    ("Detroit", "Peter"),
    ("Detroit", "George")
    ).toDF("city", "name")
    val data = test.as[Employee]

Or

    import spark.implicits._
    val test = Seq(("New York", "Jack"),
      ("Los Angeles", "Tom"),
      ("Chicago", "David"),
      ("Houston", "John"),
      ("Detroit", "Michael"),
      ("Chicago", "Andrew"),
      ("Detroit", "Peter"),
      ("Detroit", "George")
    )

    val data = test.map(r => Employee(r._1, r._2)).toDS()

Теперь вы можете groupby и выполнять любую агрегацию как

data.groupBy("city").count().show

data.groupBy("city").agg(collect_list("name")).show

Надеюсь это поможет!

person koiralo    schedule 07.06.2017

Сначала я бы превратил ваш RDD в DataSet:

val spark: org.apache.spark.sql.SparkSession = ???
import spark.implicits._

val testDs = test.toDS()

Здесь вы получите имена ваших столбцов :) Используйте это с умом!

testDs.schema.fields.foreach(x => println(x))

В конце концов, вам нужно использовать только groupBy:

testDs.groupBy("City?", "Name?")

Я думаю, что RDD - это не совсем версия 2.0. Если у вас есть вопросы, просто задавайте их.

person András Nagy    schedule 07.06.2017
comment
testDs.columns даже быстрее получить имена столбцов (как Array[String]) без типов. - person Garren S; 14.07.2017
comment
Хорошая точка зрения! Действительно - person András Nagy; 19.07.2017