Разбиение Spark для записи файлов очень медленное

При записи файла в HDFS с помощью Spark это происходит довольно быстро, если не используется секционирование. Вместо этого, когда я использую разбиение для записи файла, задержка записи увеличивается примерно в 24 раза.

Для того же файла запись без раздела занимает около 600 мс. Запись с разделением по идентификатору (сгенерирует ровно 1000 разделов, так как в файле 1000 идентификаторов) занимает около 14 секунд.

Кто-то из вас сталкивался с тем, что запись файла с разделами занимает очень много времени? В чем основная причина этого, возможно, Spark необходимо создать 1000 папок и файлов для каждого раздела? У вас есть идея, как это можно ускорить?

val myRdd = streamedRdd.map { case ((id, metric, time), value) => Record(id, metric, getEpoch(time), time, value) }

val df = myRdd.toDF

df.write.mode(SaveMode.Append)
.partitionBy("id")
.parquet(path)

person AlexL    schedule 01.04.2016    source источник
comment
Не могли бы вы включить код, который вы используете?   -  person zero323    schedule 01.04.2016


Ответы (1)


Исполнители Spark взаимодействуют с HDFS для записи имеющихся у них данных, это зависит от того, как ваши данные распределяются по кластеру после разделения.

По-видимому, для небольших фрагментов данных время для установления соединений от нескольких узлов-исполнителей к HDFS и записи будет больше по сравнению с последовательной записью всего файла.

Как этого избежать:

По умолчанию spark разделяет данные с помощью Hash partitioner (хеширует ключ, а ключ с таким же хэшем идет к одному и тому же узлу). Попробуйте указать Range partitioner, пожалуйста, найдите примеры фрагментов ниже:

В следующем фрагменте используется разделитель Hash yourRdd.groupByKey().saveAsTextFile("ПУТЬ HDFS");

В следующем фрагменте используется наш настраиваемый разделитель диапазонов. Он создает 8 разделов, как указано в RangePartitioner(8, yourRdd), и запись через 8 соединений будет лучшим выбором, чем запись через 1000 соединений.

val tunedPartitioner = new RangePartitioner(8, yourRdd)
val partitioned = yourRdd.partitionBy(tunedPartitioner).saveAsTextFile("HDFS PATH");

Опять же, это компромисс между записываемыми данными и количеством создаваемых разделов.

person Sai Krishna    schedule 01.04.2016
comment
Идея хорошая, но с фреймами данных работать не будет. Можете ли вы показать пример переразбиения данных и сохранения их в паркет? - person alexeipab; 04.04.2016
comment
@alexeipab В настоящее время вы не можете разбить на разделы, используя собственный разделитель. Единственное, что вы можете сделать, это разделить по столбцам, используя перераспределение. В качестве альтернативы вы можете использовать myDF.rdd.partitionBy() для разделения RDD, лежащего в основе вашего фрейма данных. - person Vektor88; 28.10.2016