Сравните строки в RDD

Как я могу перебирать строки RDD и сравнивать одну строку со следующей в RDD?

Я знаю, что могу использовать цикл for следующим образом: for(x‹-rddItems), есть ли способ сделать что-то вроде x.next() внутри цикла for? или использовать некоторый индекс внутри for?

Благодарность


person Userrrrrrrr    schedule 15.04.2015    source источник
comment
Как вы собираетесь это делать, если строки не упорядочены и результат зависит от запуска к запуску?   -  person Nikita    schedule 15.04.2015
comment
Я создал отсортированный RDD   -  person Userrrrrrrr    schedule 15.04.2015
comment
@ipoteka: RDD упорядочены, и если вы их не перетасуете, порядок будет одинаковым от запуска к запуску.   -  person Daniel Darabos    schedule 15.04.2015
comment
@Lital: Можете ли вы описать, какой результат вы ищете? Простое сравнение элементов не имеет никакого эффекта, поэтому трудно сказать, чего вы пытаетесь достичь.   -  person Daniel Darabos    schedule 15.04.2015
comment
если вы не присоединитесь, не перераспределите их и т. д. @Lital Так что было бы лучше, если бы вы снабдили rdd ключами и объединили\сделали декартово произведение с этими ключами.   -  person Nikita    schedule 15.04.2015
comment
@Даниэль Дарабос. Дополнительная информация: у меня есть параRDD (ключ, значения) для каждого ключа, у меня есть одно или несколько значений. Мне нужно выбрать только одно значение для каждого ключа. Для каждого ключа мне нужно просмотреть все значения (отсортированные по времени) и сравнить их. На выходе должен быть новый RDD (или список), содержащий пары ключ-значение (но только одно значение для каждого ключа).   -  person Userrrrrrrr    schedule 15.04.2015
comment
В этом случае вас лучше обслужат groupByKey или reduceByKey, как предлагает @ipoteka. Это также должно быть быстрее, чем сортировка!   -  person Daniel Darabos    schedule 15.04.2015
comment
@DanielDarabos Я уже использую groupByKey. но теперь я хочу перебрать значения в каждой группе и сравнить их со следующим значением в той же группе. это означает, что я использую for((k,v)‹-myGroupedbyKeyRDD), но внутри цикла for (который повторяет группы) я хотел бы просмотреть значения и сравнить каждое со следующим значением в группе.   -  person Userrrrrrrr    schedule 15.04.2015
comment
О, почему бы тогда не спросить об этом? :D После того, как вы отсортируете группу, вы можете сделать то же самое с циклом for в ответах ниже, или вы можете использовать seq.sliding или seq.zip(seq.tail) для получения пар элементов, следующих друг за другом.   -  person Daniel Darabos    schedule 15.04.2015


Ответы (4)


Вы можете сделать что-то подобное, используя mapPartitions:

rdd.mapPartitions { partition =>
  var previous = partition.next
  for (element <- partition) yield {
    val result = previous == element // Do your comparison.
    previous = element
    result
  }
}

Но это не сравнивает последний элемент раздела N с первым элементом раздела N+1. Это было бы довольно сложно сделать, и это повредило бы производительности. Так что я просто скрещиваю пальцы и надеюсь, что вы не против пропустить некоторые сравнения!

person Daniel Darabos    schedule 15.04.2015
comment
Довольно поздно для вечеринки: не вызовет ли это исключение NullPointerException? - person Vale; 21.07.2016
comment
Если вы думаете, что это было бы, пожалуйста, объясните. Очевидно, я так не думаю, иначе я бы не опубликовал это как ответ :). Спасибо. - person Daniel Darabos; 21.07.2016
comment
Я новичок в Spark, так что это был скорее вопрос, а не замечание. Разве не должна быть проверка на partiton.hasNext? - person Vale; 22.07.2016
comment
for (x <- iterator) проверит hasNext и остановится соответствующим образом. - person Daniel Darabos; 22.07.2016
comment
@DanielDaraos Я тоже новичок в Scala. Спасибо за объяснение - person Vale; 22.07.2016

Вы можете перебирать каждый отдельный раздел RDD, используя mapPartitions, например:

val rdd = sc.parallelize(List(1,73,5,226))
rdd.mapPartitions { iter =>

  var last = 0
  var result = List[Boolean]()
  while (iter.hasNext) {
    val current = iter.next
    result = result ::: List(current > last)
    last = current
  }

  result.iterator
}.collect().foreach(println)

Дает:

правда правда ложь правда

Это делается для каждого раздела, а не для всего RDD.

person dpeacock    schedule 15.04.2015
comment
Вы можете увидеть разницу, играя с разделами (второй аргумент в sc.parallelize). Это дает только тот ответ, который вы говорите, если для разделов установлено значение 1. - person Justin Pihony; 15.04.2015
comment
Да - во многих случаях вы захотите, например. используйте HashPartitioner для некоторого соответствующего параметра, прежде чем делать это, вместо того, чтобы устанавливать разделы в 1. - person dpeacock; 15.04.2015

Вам нужно создать ключ, а затем соединить rdd с самим собой (применив ваше смещение).

person Alister Lee    schedule 16.04.2015
comment
Это кажется элегантным решением, как бы вы это сделали? - person Romain Jouin; 30.05.2017

Я думал об этой возможности, я не уверен, что это действительно хороший вариант?

def diff_timestamp(liste):
    timestamps = liste
    r          = []
    values     = []
    for indice, valeur in enumerate(timestamps):
        values.append(float(valeur))
        if indice>0:
            delta = values[indice] - values[indice-1]
            r.append(delta)
    return r
person Romain Jouin    schedule 30.05.2017
comment
Я думаю, вы неправильно понимаете вопрос, который касается Spark и RDD, которые работают совсем не так, как списки Python. - person puhlen; 30.05.2017