Сравните данные в двух RDD в искре

Я могу печатать данные в двух RDD с помощью приведенного ниже кода.

usersRDD.foreach(println)
empRDD.foreach(println)

Мне нужно сравнить данные в двух СДР. Как я могу повторять и сравнивать данные поля в одном RDD с данными поля в другом RDD. Например: повторите записи и проверьте, имеют ли имя и возраст в userRDD совпадающую запись в empRDD, если они не помещены в отдельный RDD.

Я пробовал с userRDD.substract(empRDD), но он сравнивал все поля.


person Ramakrishna    schedule 05.01.2015    source источник


Ответы (2)


Вам нужно ввести данные в каждом RDD, чтобы было что-то для соединения записей. Взгляните, например, на groupBy. Затем вы join получаете RDD. Для каждого ключа вы получаете совпадающие значения в обоих. Если вы заинтересованы в поиске несовпадающих ключей, используйте leftOuterJoin, например:

// Returns the entries in userRDD that have no corresponding key in empRDD.
def nonEmp(userRDD: RDD[(String, String)], empRDD: RDD[(String, String)]) = {
  userRDD.leftOuterJoin(empRDD).collect {
    case (name, (age, None)) => name -> age
  }
}
person Sean Owen    schedule 05.01.2015
comment
Похоже, OP может быть заинтересован в поиске ключей, которые присутствуют в одном RDD и отсутствуют в другом. Для этого вам нужно leftOuterJoin вместо join. Возможно, стоит упомянуть в ответе. - person Daniel Darabos; 05.01.2015
comment
Спасибо, Шон, но мне нужно найти несовпадающие данные с двух СДР. будет здорово, если вы предоставите пример кода. - person Ramakrishna; 06.01.2015
comment
А, спасибо @DanielDarabos, это правильный ответ. Я неправильно прочитал исходный текст. - person Sean Owen; 06.01.2015
comment
@Ramakrishna, я добавил пример с leftOuterJoin. Надеюсь, поможет. - person Daniel Darabos; 06.01.2015

Конечно, приведенные выше решения являются полными и правильными! Только одно предложение, если и только если RDD синхронизированы (одинаковые строки имеют одинаковые ключи). Вы можете использовать распределенное решение и использовать параллелизм, используя только искровые преобразования с помощью следующего проверенного решения:

def distrCompare(left: RDD[(Int,Int)], right: RDD[(Int,Int)]): Boolean = {
  val rdd1 = left.join(right).map{case(k, (lv,rv)) => (k,lv-rv)}
  val rdd2 = rdd1.filter{case(k,v)=>(v!=0)}
  var equal = true;
  rdd2.map{
    case(k,v)=> if(v!=0) equal = false
  }
  return equal
}

Вы можете выбрать количество разделов в «объединении».

person P. Str    schedule 26.05.2016