Я пытаюсь использовать аккумулятор Spark для удаления группы по запросу с низкой производительностью.
import org.apache.spark._
object CountPairsParam extends AccumulatorParam[Map[Int, Set[Int]]] {
def zero(initialValue: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
Map.empty[Int, Set[Int]]
}
def addInPlace(m1: Map[Int, Set[Int]], m2: Map[Int, Set[Int]]): Map[Int, Set[Int]] = {
val keys = m1.keys ++ m2.keys
keys.map((k: Int) => (k -> (m1.getOrElse(k, Set.empty[Int]) ++ m2.getOrElse(k, Set.empty[Int])))).toMap
}
}
val accum = sc.accumulator(Map.empty[Int, Set[Int]])(CountPairsParam)
srch_destination_id_distinct.foreach(r => try{accum += Map(r(0).toString.toInt -> Set(r(1).toString.toInt))} catch {case ioe: NumberFormatException => Map.empty[Int, Set[Int]]})
В моем аккумуляторе я предполагаю, что m2 не всегда будет одним набором элементов, созданным в моем цикле foreach, и что иногда Spark будет использовать этот метод для добавления двух разных карт с более чем одним ключом. Но из-за этого у меня невысокие показатели. Всегда ли правильная карта попадает в аккумулятор с одним элементом, установленным в моем для каждого цикла, или мне нужно сделать компромисс с производительностью?