Spark: как разделить RDD[T]` на Seq[RDD[T]] и сохранить порядок

Как я могу эффективно разделить RDD[T] на Seq[RDD[T]]/Iterable[RDD[T]] с элементами n и сохранить исходный порядок?

Я хотел бы иметь возможность написать что-то вроде этого

RDD(1, 2, 3, 4, 5, 6, 7, 8, 9).split(3)

что должно привести к чему-то вроде

Seq(RDD(1, 2, 3), RDD(4, 5, 6), RDD(7, 8, 9))

Искра обеспечивает такую ​​функцию? Если нет, то какой эффективный способ добиться этого?

val parts = rdd.length / n
val rdds = rdd.zipWithIndex().map{ case (t, i) => (i - (i % parts), t)}.groupByKey().values.map(iter => sc.parallelize(iter.toSeq)).collect

Вроде не быстро..


person Robin des Bois    schedule 12.11.2014    source источник
comment
Это не имеет никакого смысла — RDD — это ссылка на что-то, разделенное вокруг кластера. Вы хотите, чтобы куча разных узлов кластера имела... ссылку на что-то, что разделено на все узлы кластера?? Возможно, rdd.mapPartitions(_.grouped(3)) делает то, что вы хотите, но я бы предложил сделать шаг назад и спросить о вашей проблеме на более высоком уровне - чего вы на самом деле пытаетесь достичь здесь?   -  person lmm    schedule 13.11.2014
comment
Да, ты прав. Мне нужно что-то вроде Seq[RDD[T]] или Iterable[RDD[T]]. Я отредактирую вопрос...   -  person Robin des Bois    schedule 13.11.2014
comment
Все еще не имеет большого смысла - лучше иметь RDD на верхнем уровне, чтобы разделы были как можно более грубыми.   -  person lmm    schedule 13.11.2014
comment
Я знаю, что Вы имеете ввиду. Но первоначальный RDD исходит из действительно огромного файла, и в моем случае мне приходится обрабатывать большие пакеты этого файла один за другим в цикле. Поскольку можно обрабатывать каждый из этих больших пакетов распределенным образом, они должны быть типа RDD. Имеет ли это смысл?   -  person Robin des Bois    schedule 13.11.2014
comment
Вы хотите обработать их все, не так ли? Таким образом, если каждый узел кластера выполняет полные пакеты, это будет более эффективно, чем распределение каждого пакета по отдельности (я имею в виду, я предполагаю, что у вас больше пакетов, чем узлов кластера). (Конечно, RDD из 3 элементов глупо - накладные расходы на кластеризацию будут намного больше, чем выигрыш - хотя я полагаю, что это просто пример)   -  person lmm    schedule 13.11.2014
comment
На самом деле более того, RDD из нескольких миллионов целых чисел все еще довольно глупо — смысл RDD в том, что набор данных слишком велик, чтобы поместиться в память на одном узле.   -  person lmm    schedule 13.11.2014
comment
Да. Это был просто символический пример. Я не могу обрабатывать пакеты параллельно, потому что результат одного пакетного вычисления будет аргументом для следующего пакетного вычисления. И эти пакеты слишком велики, чтобы поместиться в основную память одного узла. .. Так что RDD для каждой партии имеет смысл для меня.   -  person Robin des Bois    schedule 13.11.2014
comment
Может быть, вы хотите искровой поток? Это ориентировано на то, чтобы дать вам упорядоченную серию RDD.   -  person lmm    schedule 13.11.2014
comment
да.. Я тоже об этом думал.. Вы знаете, как преобразовать RDD в DStream?   -  person Robin des Bois    schedule 13.11.2014
comment
@RobindeBois вы можете использовать queueStream(), который позволяет вам передать очередь RDD для создания DStream   -  person aaronman    schedule 13.11.2014
comment
Если вам нужен результат пакета для следующего, похоже, что вы хотите, чтобы зациклить количество пакетов, которые вам нужны, и каждый раз загружать определенный набор данных. Будет более эффективно, чем загрузка RDD и фильтрация данных из него.   -  person maasg    schedule 14.11.2014
comment
@maasg звучит хорошо .. но как я могу загрузить только часть (учитывая смещение и длину) файла в rdd?   -  person Robin des Bois    schedule 14.11.2014
comment
вы загружаете один файл из локальной ФС?   -  person maasg    schedule 14.11.2014


Ответы (1)


Технически вы можете делать то, что предлагаете. Однако это действительно не имеет смысла в контексте использования вычислительного кластера для выполнения распределенной обработки больших данных. Прежде всего, это противоречит всей сути Spark. Если вы выполняете groupByKey, а затем пытаетесь извлечь их в отдельные RDD, вы фактически загружаете все данные, распределенные в RDD, в драйвер, а затем перераспределяете их обратно в кластер. Если драйвер не может загрузить весь файл данных, он также не сможет выполнить эту операцию.

Вы не должны загружать большие файлы данных на узел драйвера из локальной файловой системы. Вам следует переместить файл в распределенную файловую систему, такую ​​как HDFS или S3. Затем вы можете загрузить свой единственный файл больших данных в свой кластер с помощью val lines = SparkContext.textFile(...) в RDD строк. При этом каждый рабочий процесс в кластере будет загружать только часть файла, что можно сделать, поскольку данные уже распределены по кластеру в распределенной файловой системе.

Если затем вам нужно организовать данные в «пакеты», которые важны для функциональной обработки данных, вы можете ввести данные с помощью соответствующего идентификатора пакета, например: val batches = lines.keyBy( line => lineBatchID(line) )

Затем каждую партию можно свести к сводке на уровне партии, а эти сводки можно свести к одному общему результату.

В целях тестирования кода Spark можно загрузить небольшой образец файла данных на один компьютер. Но когда дело доходит до полного набора данных, вы должны использовать распределенную файловую систему в сочетании с искровым кластером для обработки этих данных.

person Tristan Nixon    schedule 15.03.2017