Функциональный подход к последовательной обработке RDD [Apache Spark]

У меня есть RDD, подключенный к таблице HBase. Каждая строка (ключ) представляет местоположение GPS. Теперь я написал функцию для вычисления расстояния между двумя точками. Функция должна вызываться с текущей строкой и ее предшественником [i-1]

Теперь я изо всех сил пытаюсь сделать это функциональным способом с помощью функций RDD, чтобы я мог распараллелить это.

Мой быстрый и грязный подход - сначала создать массив

val rows = rdd.collect()
val rowCount = rdd.count() - 1 //since the first row has no distance
val rowArray = new Array[(String, Point, Point)](rowCount.asInstanceOf[Int])
var i = 0 //can be better solved in scala, I know ;)

rows.foreach(row => {
  if (predecssorPoint == null) {
    predecssorPoint = getPointByRow(row._2)
  }
  else {
    currentPoint = getPointByRow(row._2)
    rowArray(i) = Tuple3(row._1, predecssorPoint, currentPoint)

    i += 1
    predecssorPoint = currentPoint
  }
})

return rowArray

Затем я распараллелю массив и рассчитаю расстояние

  //create a parallel-enabled data set
  val parallelDataSet = sc.parallelize(rows)

  parallelDataSet.foreach(row => {     
  Functions.logDistance(row)
})

Это работает, но это уродливо и, конечно, неэффективно.

Моя идея заключалась в том, чтобы использовать rdd.reduce(), чтобы избавиться от цикла foreach, и это может сработать, если функция расстояния обрабатывает проблему, связанную с тем, что порядок (a+b) не гарантируется.

В любом случае, есть ли лучшее решение? Насколько я понимаю, нет возможности иметь (эффективный) доступ к индексу при работе с RDD.

Спасибо.


person Marco    schedule 30.01.2015    source источник


Ответы (1)


Учитывая, что порядок здесь является ключевым, хорошим способом продолжить может быть сначала проиндексировать RDD. Затем, используя индекс, мы можем имитировать zip и разделить кортежи по кластеру. Что-то вроде этого:

val indexed = rdd.zipWithIndex.map(_.swap) // 
val shifted = indexed.map{case (k,v) => (k-1,v)}
val joined = indexed.join(shifted)
val distanceRDD = joined.map{(k,(v1,v2)) => distanceFunction(v1,v2)}

(*) пример кода - не тестировался

person maasg    schedule 30.01.2015