Spark RDD - как они работают

У меня есть небольшая программа Scala, которая отлично работает на одном узле. Однако я масштабирую его, чтобы он работал на нескольких узлах. Это моя первая такая попытка. Я просто пытаюсь понять, как RDD работают в Spark, поэтому этот вопрос основан на теории и может быть не на 100% правильным.

Допустим, я создаю RDD: val rdd = sc.textFile(file)

Теперь, когда я это сделал, означает ли это, что файл в file теперь разделен по узлам (при условии, что все узлы имеют доступ к пути к файлу)?

Во-вторых, я хочу подсчитать количество объектов в RDD (достаточно просто), однако мне нужно использовать это число в вычислении, которое нужно применить к объектам в RDD — пример псевдокода:

rdd.map(x => x / rdd.size)

Предположим, что в rdd есть 100 объектов, и скажем, что есть 10 узлов, таким образом, количество объектов составляет 10 на узел (при условии, что так работает концепция RDD), теперь, когда я вызываю метод, каждый узел будет выполнять расчет с rdd.size как 10 или 100? Потому что в целом RDD имеет размер 100, но локально на каждом узле он составляет только 10. Должен ли я сделать широковещательную переменную до выполнения расчета? Этот вопрос связан с вопросом ниже.

Наконец, если я сделаю преобразование в RDD, например. rdd.map(_.split("-")), а потом я захотел новый size RDD, нужно ли мне выполнять действие над RDD, например count(), чтобы вся информация отправлялась обратно на узел драйвера?


person monster    schedule 12.12.2014    source источник
comment
«Этот вопрос связан с вопросом ниже». --› ??   -  person gsamaras    schedule 17.08.2016
comment
Я думаю, вы имели в виду rdd.flatMap(_.split("-"))   -  person lovasoa    schedule 19.11.2016


Ответы (2)


Обычно файл (или части файла, если он слишком большой) реплицируется на N узлов в кластере (по умолчанию N=3 в HDFS). Мы не собираемся разделять каждый файл между всеми доступными узлами.

Однако для вас (т.е. клиента) работа с файлом с помощью Spark должна быть прозрачной — вы не должны видеть никакой разницы в rdd.size, независимо от того, на скольких узлах он разбит и/или реплицирован. Существуют методы (по крайней мере, в Hadoop), чтобы узнать, на каких узлах (частях) файла может находиться в данный момент. Однако в простых случаях вам, скорее всего, не понадобится использовать эту функцию.

ОБНОВЛЕНИЕ: статья с описанием внутреннего устройства RDD: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

person Ashalynd    schedule 12.12.2014
comment
Спасибо за ответ. Итак, для вычислений типа: rdd.filter(...).map(x => x * rdd.count) выполняется ли шаг filter на каждом узле до того, как какой-либо узел сможет выполнить шаг map? Потому что очевидно, что шаг map зависит от шага filter, уже выполняемого на каждом узле, поскольку map содержит rdd.count. Спасибо еще раз. - person monster; 13.12.2014
comment
Естественно, потому что map построен на filter (о концепции родословной читайте в статье). - person Ashalynd; 13.12.2014
comment
Спасибо за информацию, это хорошее чтение, однако теперь мне интересно, какова цель широковещательной переменной? Еще раз спасибо, оценил! - person monster; 13.12.2014
comment
Ссылка на Беркли уже мертва. - person Don Branson; 20.11.2016

val rdd = sc.textFile(file)

Означает ли это, что файл теперь разделен по узлам?

Файл остается там, где он был. Элементами результирующего RDD[String] являются строки файла. RDD разбит на разделы в соответствии с естественным разделением базовой файловой системы. Количество разделов не зависит от количества имеющихся у вас узлов.

Важно понимать, что когда эта строка выполняется, она не читает файл(ы). RDD — это ленивый объект, который будет делать что-то только тогда, когда это необходимо. Это здорово, потому что позволяет избежать ненужного использования памяти.

Например, если вы напишете val errors = rdd.filter(line => line.startsWith("error")), все равно ничего не произойдет. Если вы затем напишете val errorCount = errors.count, теперь вашу последовательность операций нужно будет выполнить, потому что результат count является целым числом. То, что каждое рабочее ядро ​​(исполнительный поток) будет делать параллельно, будет читать файл (или часть файла), перебирать его строки и подсчитывать строки, начинающиеся с ошибки. Помимо буферизации и сборки мусора, в каждый момент времени в памяти будет находиться только одна строка на ядро. Это позволяет работать с очень большими данными, не используя много памяти.

Я хочу подсчитать количество объектов в RDD, однако мне нужно использовать это число в расчете, который необходимо применить к объектам в RDD - пример псевдокода:

rdd.map(x => x / rdd.size)

rdd.size метода нет. Есть rdd.count, который подсчитывает количество элементов в RDD. rdd.map(x => x / rdd.count) не получится. Код попытается отправить переменную rdd всем рабочим процессам и завершится с ошибкой NotSerializableException. Что вы можете сделать, это:

val count = rdd.count
val normalized = rdd.map(x => x / count)

Это работает, потому что count является Int и может быть сериализовано.

Если я сделаю преобразование в RDD, например. rdd.map(_.split("-")), а потом я захотел новый размер RDD, нужно ли мне выполнять действие над RDD, например count(), чтобы вся информация отправлялась обратно на узел драйвера?

map не меняет количество элементов. Я не знаю, что вы имеете в виду под размером. Но да, вам нужно выполнить действие, например count, чтобы получить что-либо из RDD. Видите ли, никакая работа не выполняется до тех пор, пока вы не выполните действие. (При выполнении count драйверу будет отправлено только количество разделов, конечно, не вся информация.)

person Daniel Darabos    schedule 15.12.2014
comment
Я сделал пример python на основе вашего ответа в документация, если она вам нравится, вы можете включить ее в свой ответ! - person gsamaras; 17.08.2016
comment
Это должен быть принятый ответ. Он отвечает на все части полностью и правильно. - person tejaskhot; 19.11.2016