Повторное использование объекта SparkContext в UDAF

Я пытаюсь реализовать агрегированную версию org.apache.spark.mllib.stat.KernelDensity для одновременной оценки вероятностной функции плотности нескольких распределений.

Идея состоит в том, чтобы иметь фрейм данных, скажем, с двумя столбцами: один для имени группы, второй, содержащий одномерные значения наблюдения (будет 1000 групп, следовательно, потребуется параллельная обработка).

Что я имею в виду примерно так (столбец pdf будет содержать массив со значениями PDF):

> val getPdf = new PDFGetter(sparkContext)
> df_with_group_and_observation_columns.groupBy("group").agg(getPdf(col("observations")).as("pdf")).show()

Я реализовал User-Defined-Aggrgated-Function, чтобы (надеюсь) сделать это. У меня есть 2 проблемы с текущей реализацией, и я ищу вашего совета:

  1. По-видимому, невозможно повторно использовать объект sparkContext в функции evaluate() UDAF. В настоящее время я получаю java.io.NotSerializableException, как только UDAF пытается получить доступ к объекту sparkContext (подробности см. ниже). ==> Это нормально? Есть идеи, как это можно исправить?
  2. Текущая реализация UDAF, которая у меня есть, будет получать все наблюдения для каждой группы из фрейма данных (который распространяется), помещать их в Seq() (WrappedArray), а затем пытаться запустить parallelize() для Seq() каждой группы для повторного распределения. наблюдения. Это кажется довольно неэффективным. ==> Есть ли способ для UDAF напрямую "дать" "суб-RDD" каждой группы каждой из своих evaluate() функций во время выполнения?

Ниже приведен подробный пример того, что у меня есть до сих пор (не обращайте внимания на возвращаемое значение в виде строки вместо массива, я просто хочу посмотреть, можно ли пока заставить плотность ядра работать в UDAF):

Spark context available as 'sc' (master = local[*], app id = local-1514639826952).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

scala> sc.toString
res27: String = org.apache.spark.SparkContext@2a96ed1b

scala> val df = Seq(("a", 1.0), ("a", 1.5), ("a", 2.0), ("a", 1.8), ("a", 1.1), ("a", 1.2), ("a", 1.9), ("a", 1.3), ("a", 1.2), ("a", 1.9), ("b", 10.0), ("b", 20.0), ("b", 11.0), ("b", 18.0), ("b", 13.0), ("b", 16.0), ("b", 15.0), ("b", 12.0), ("b", 18.0), ("b", 11.0)).toDF("group", "val")

scala> val getPdf = new PDFGetter(sc)

scala> df.groupBy("group").agg(getPdf(col("val")).as("pdf")).show()
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
    - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@2a96ed1b)
    - field (class: PDFGetter, name: sc, type: class org.apache.spark.SparkContext)
    - object (class PDFGetter, PDFGetter@38649ca3)
...

См. определение UDAF ниже (в остальном оно работает хорошо):

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.Row
import scala.collection.mutable.WrappedArray
import scala.collection.mutable.{ListBuffer, ArrayBuffer}
import org.apache.spark.mllib.stat.KernelDensity


class PDFGetter(var sc: org.apache.spark.SparkContext) extends UserDefinedAggregateFunction {

  // Define the schema of the input data, 
  // intermediate processing (deals with each individual observation within each group) 
  // and return type of the UDAF
  override def inputSchema: StructType = StructType(StructField("result_dbl", DoubleType) :: Nil)

  override def bufferSchema: StructType = StructType(StructField("observations", ArrayType(DoubleType)) :: Nil)

  override def dataType: DataType = StringType


  // The UDAF will always return the same results
  // given the same inputs
  override def deterministic: Boolean = true


  // How to initialize the intermediate processing buffer
  // for each group
  override def initialize(buffer: MutableAggregationBuffer): Unit = {
    buffer(0) = Array.emptyDoubleArray
  }

  // What to do with each new row within the group
  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    var values = new ListBuffer[Double]()
    values.appendAll(buffer.getAs[List[Double]](0))
    val newValue = input.getDouble(0)
    values.append(newValue)
    buffer.update(0, values)
  }

  // How to merge 2 buffers located on 2 separate
  // executor hosts or JVMs
  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    var values = new ListBuffer[Double]()
    values ++= buffer1.getAs[List[Double]](0)
    values ++= buffer2.getAs[List[Double]](0)
    buffer1.update(0, values)
  }


  // What to do with the data once intermediate processing
  // is completed
  override def evaluate(buffer: Row): String = {
    // Get the observations
    val observations = buffer.getSeq[Double](0)     // Or val observations = buffer.getAs[Seq[Double]](0)   // Returns a WrappedArray either way
    //observations.toString

    // Calculate the bandwidth
    val nObs = observations.size.toDouble
    val mean = observations.sum / nObs
    val stdDev = Math.sqrt(observations.map(x => Math.pow(x - mean, 2.0) ).sum / nObs)
    val bandwidth = stdDev / 2.5
    //bandwidth.toString


    // Kernel Density
    // From the example at http://spark.apache.org/docs/latest/api/java/index.html#org.apache.spark.sql.Dataset
    // val sample = sc.parallelize(Seq(0.0, 1.0, 4.0, 4.0))
    // val kd = new KernelDensity()
    //      .setSample(sample)
    //        .setBandwidth(3.0)
    // val densities = kd.estimate(Array(-1.0, 2.0, 5.0))

    // Get the observations as an rdd (required by KernelDensity.setSample)
    sc.toString     // <====   This fails
    val observationsRDD = sc.parallelize(observations)

    // Create a new Kernel density object
    // for these observations
    val kd = new KernelDensity()
    kd.setSample(observationsRDD)
    kd.setBandwidth(bandwidth)

    // Create the points at which
    // the PDF will be estimated
    val minObs = observations.min
    val maxObs = observations.max
    val nPoints = Math.min(nObs/2, 1000.0).toInt
    val increment = (maxObs - minObs) / nPoints.toDouble
    val points = new Array[Double](nPoints)
    for( i <- 0 until nPoints){
      points(i) = minObs + i.toDouble * increment;
    }

    // Estimate the PDF and return
    val pdf = kd.estimate(points)
    pdf.toString
  }
}

Приношу свои извинения за длинный пост, но он кажется довольно сложным, поэтому я решил, что все подробности будут полезны любому помощнику.

Спасибо!


person Raphvanns    schedule 10.01.2018    source источник


Ответы (2)


Это не сработает. Вы не можете:

  • Доступ к SparkContext, SparkSession, SQLContext на исполнителе (где evaluate вызывается).
  • Получите доступ или создайте распределенную структуру данных на исполнителе.

Чтобы ответить на возможные последующие вопросы - обходного пути нет. Это ключевое дизайнерское решение, лежащее в основе дизайна Spark.

person user9196334    schedule 10.01.2018
comment
Да, этого я и боялся... Спасибо! Я ищу Java-реализацию плотности ядра, которая не требовала бы распределения данных, чтобы каждый исполнитель мог заниматься своими делами без необходимости перераспределения своих собственных данных по всему кластеру. Это выглядит многообещающе: haifengl.github.io/smile/ api/java/smile/stat/distribution/ - person Raphvanns; 10.01.2018

Мне удалось устранить необходимость в sc.parallelize(observations), используя другую реализацию плотности ядра, которая не требует предоставления наблюдений через RDD, а вместо этого простое Array[Double] (поэтому не распространяется).

Подробности смотрите по следующим ссылкам:

http://haifengl.github.io/smile/index.html

https://github.com/haifengl/smile

Примечание: приведенный выше код по-прежнему является достойным примером для тех из вас, кто хочет познакомиться с UDAF — просто удалите аргумент sc в конструкторе и убедитесь, что не пытаетесь использовать SparkContext ни в одной из функций UDAF.

Ваше здоровье!

person Raphvanns    schedule 10.01.2018