Аккумуляторы искры: Правильный аккумулятор иногда много или всегда один?

Я пытаюсь использовать аккумулятор Spark для удаления группы по запросу с низкой производительностью.

import org.apache.spark._
object CountPairsParam extends AccumulatorParam[Map[Int, Set[Int]]] {

  def zero(initialValue: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
    Map.empty[Int, Set[Int]]
  }

  def addInPlace(m1: Map[Int, Set[Int]], m2: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
    val keys = m1.keys ++ m2.keys
     keys.map((k: Int) => (k -> (m1.getOrElse(k, Set.empty[Int]) ++ m2.getOrElse(k, Set.empty[Int])))).toMap
  }
}
val accum = sc.accumulator(Map.empty[Int, Set[Int]])(CountPairsParam)
srch_destination_id_distinct.foreach(r => try{accum += Map(r(0).toString.toInt -> Set(r(1).toString.toInt))} catch {case ioe: NumberFormatException =>  Map.empty[Int, Set[Int]]})

В моем аккумуляторе я предполагаю, что m2 не всегда будет одним набором элементов, созданным в моем цикле foreach, и что иногда Spark будет использовать этот метод для добавления двух разных карт с более чем одним ключом. Но из-за этого у меня невысокие показатели. Всегда ли правильная карта попадает в аккумулятор с одним элементом, установленным в моем для каждого цикла, или мне нужно сделать компромисс с производительностью?


person Dan Ciborowski - MSFT    schedule 13.02.2016    source источник


Ответы (1)


Обычно вам следует избегать использования Accumulators для чего-либо, кроме отладки, потому что, насколько мне известно, нет гарантии, что каждая запись RDD будет "добавлена" в Accumulator только один раз.

Может, попробуем что-нибудь вроде этого:

import scala.collection.mutable.HashSet
import scala.util.Try

val result = srch_destination_id_distinct.flatMap(r => 
  Try((r(0).toString.toInt, r(1).toString.toInt)).toOption
).aggregateByKey(HashSet.empty[Int])(
  (set, n) => set += n,
  (set1, set2) => set1 union set2
).mapValues(_.toSet).collectAsMap

Различие между seqOp и combOp аргументами метода aggregate также позволяет нам избежать «оборачивания» каждого элемента RDD в Map[Int, Set[Int]], как вы это сделали с вашим подходом.

person Jason Lenderman    schedule 13.02.2016
comment
Вы почти правы. Spark не гарантирует, что аккумулятор будет обновлен только один раз, когда он используется внутри преобразования. Эту гарантию предоставляют аккумуляторы, используемые внутри акций. Я не повлиял на то, что здесь aggregateByKey намного лучше :) - person zero323; 13.02.2016