выполнение кода внутри Spark foreach

У меня есть два RDD: points и pointsWithinEps. Каждая точка в points представляет x, y координату. pointsWithinEps обозначают две точки и расстояние между ними: ((x, y), distance). Я хочу зациклить все точки и для каждой точки отфильтровать только те элементы, которые находятся в координате pointsWithinEps как x (первой). Итак, я делаю следующее:

    points.foreach(p =>
      val distances = pointsWithinEps.filter{
        case((x, y), distance) => x == p
      }
      if (distances.count() > 3) {
//        do some other actions
      }
    )

Но этот синтаксис недействителен. Насколько я понимаю, внутри Spark foreach нельзя создавать переменные. Должен ли я сделать что-то подобное?

for (i <- 0 to points.count().toInt) {
  val p = points.take(i + 1).drop(i) // take the point
  val distances = pointsWithinEps.filter{
    case((x, y), distance) => x == p
  }
  if (distances.count() > 3) {
    //        do some other actions
  }
}

Или есть лучший способ сделать это? Полный код размещен здесь: https://github.com/timasjov/spark-learning/blob/master/src/DBSCAN.scala

ИЗМЕНИТЬ:

points.foreach({ p =>
  val pointNeighbours = pointsWithinEps.filter {
    case ((x, y), distance) => x == p
  }
  println(pointNeighbours)
})

Прямо сейчас у меня есть следующий код, но он генерирует исключение NullPointerException (pointsWithinEps). Как это можно исправить, почему pointsWithinEps равно null (до foreach в нем есть элементы)?


person Bob    schedule 26.10.2014    source источник
comment
Правильно ли я понимаю, что для каждой точки (x, y) на points вам нужны все ((x, y), расстояние) кортежи из pointsWithinEps, которые происходят из одного и того же (x)?   -  person maasg    schedule 26.10.2014
comment
да. В основном для каждой точки я хочу найти, какие другие точки являются ее соседями (точками, которые находятся в пределах эпсилона). В моем случае это сама точка и x в структуре ((x, y), Distance). Код находится в гитхабе, поэтому, например, вы можете выполнить его и в отладчике найти, что именно является значениями.   -  person Bob    schedule 26.10.2014


Ответы (2)


Чтобы собрать все точки расстояния, которые начинаются с заданной координаты, простой распределенный способ сделать это состоит в том, чтобы пометить точки по этой координате x и сгруппировать их по этому ключу, например:

val pointsWithinEpsByX = pointsWithinEps.map{case ((x,y),distance) => (x,((x,y),distance))}
val xCoordinatesWithDistance = pointsWithinEpsByX.groupByKey

Затем соедините слева СДР точек с результатом предыдущего преобразования:

val pointsWithCoordinatesWithDistance = points.leftOuterJoin(xCoordinatesWithDistance)
person maasg    schedule 26.10.2014
comment
Компилятор показывает, что нет метода groupByKey, только groupBy. То же самое и для метода leftOuterJoin. Я использую искру 1.1.0, предварительно созданную для Hadoop 1.X. - person Bob; 26.10.2014
comment
Кроме того, я должен поместить его в цикл foreach? - person Bob; 26.10.2014
comment
Это функции, доступные для пар RDD (ключ, значение) посредством неявного преобразования. Импортируйте org.apache.spark.SparkContext._ вверху вашей программы, чтобы использовать эти функции. - Кроме того, нет необходимости в петле. Этот функциональный конвейер выполняет свою работу, применяя преобразования и группировку ко всему набору данных. - person maasg; 27.10.2014
comment
Ооо, теперь у меня есть метод groupByKey, но у меня все еще нет метода leftOuterJoin. Я не понимаю, где находится метод leftOuterJoin. Оооо, я понял, точки не имеют типа RDD после чтения из файла и я не могу применить к ним leftOuterJoin. Как их можно преобразовать в RDD? - person Bob; 27.10.2014
comment
Я предполагаю, что точки имеют форму (x,y) ?? Неявное преобразование применяется к RDD типа Tuple2 или паре, подобной (x,y). - person maasg; 27.10.2014
comment
comment
Давайте продолжим обсуждение в чате. - person Bob; 27.10.2014
comment
Да, но я разбираю их на удвоение с помощью метода parseVector (см. код на github). - person Bob; 27.10.2014
comment
@Bob, вы можете извлечь координату x из файла Vector. Глядя на API, это кажется довольно простым. - person maasg; 28.10.2014

Объявление переменных означает, что у вас есть блок, а не просто выражение, поэтому вам нужно использовать фигурные скобки {}, например.

point.foreach({p => ... })
person lmm    schedule 26.10.2014
comment
Спасибо! Но можете ли вы также помочь с тем функционалом, который я описал? - person Bob; 26.10.2014