Почему spark записывает огромный файл на временный локальный диск даже без сохранения на диске или контрольной точки?

Я выполняю небольшую работу в кластере с 15 ГБ памяти и 8 ГБ диска на машину.

Задание всегда попадает в тупик, где последнее сообщение об ошибке:

java.io.IOException: No space left on device
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:345)
    at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream$$anonfun$write$3.apply$mcV$sp(BlockObjectWriter.scala:86)
    at org.apache.spark.storage.DiskBlockObjectWriter.org$apache$spark$storage$DiskBlockObjectWriter$$callWithTiming(BlockObjectWriter.scala:221)
    at org.apache.spark.storage.DiskBlockObjectWriter$TimeTrackingOutputStream.write(BlockObjectWriter.scala:86)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
    at org.xerial.snappy.SnappyOutputStream.dumpOutput(SnappyOutputStream.java:300)
    at org.xerial.snappy.SnappyOutputStream.rawWrite(SnappyOutputStream.java:247)
    at org.xerial.snappy.SnappyOutputStream.write(SnappyOutputStream.java:107)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(ObjectOutputStream.java:1876)
    at java.io.ObjectOutputStream$BlockDataOutputStream.writeByte(ObjectOutputStream.java:1914)
    at java.io.ObjectOutputStream.writeFatalException(ObjectOutputStream.java:1575)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:195)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:751)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:750)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:750)
    at org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:746)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:746)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:56)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

К тому времени, когда это происходит, размер произвольной записи составляет 0,0 Б, а размер ввода — 3,4 МБ. Интересно, какая операция могла быстро сожрать все свободное место на диске 5G.

Кроме того, уровень хранения всего задания ограничен MEMORY_ONLY_SERIALIZED, а контрольные точки полностью отключены.


person tribbloid    schedule 11.02.2015    source источник
comment
Вероятно, это произойдет, если ваши данные не помещаются в памяти. Кроме того, Spark должен записывать на диск для любых операций перемешивания (например, уменьшения). Чтобы убедиться, что размер входных данных вызывает проблему, я бы посоветовал вам использовать гораздо меньший набор входных данных и посмотреть, получите ли вы эту ошибку или нет.   -  person Soumya Simanta    schedule 11.02.2015
comment
Я уже пробовал, и он проходит тест. Но в любом случае размер моих данных намного меньше общей памяти (300G+, в то время как мой самый большой набор данных составляет около 4G — как вы уже можете видеть в метрике размера в случайном порядке)   -  person tribbloid    schedule 11.02.2015
comment
Spark также сохраняется, чтобы уменьшить необходимость пересчета с нуля в случае потери узла.   -  person Alister Lee    schedule 11.02.2015
comment
только видите, что происходит обратная ситуация: при сохранении в медленной сети он все равно выполняет пересчет, если он намного быстрее. ИМХО, сохранение/контрольная точка может быть вызвано только вручную.   -  person tribbloid    schedule 11.02.2015


Ответы (1)


Если вы знаете, что операции тасования помещаются в памяти, вы можете попробовать установить для spark.shuffle.spill значение false. (иначе вы получите ООМ). На http://spark.apache.org/docs/latest/configuration.html вы можете увидеть параметры, касающиеся поведения в случайном порядке, и другие общедоступные параметры конфигурации.

MEMORY_ONLY_SERIALIZED применяется к RDD.

person Sietse    schedule 11.02.2015
comment
На самом деле, даже если тасование умещается в памяти, оно все равно будет записано после фазы хеширования/сортировки. Spark просто не держит это в памяти, вопреки общеизвестному факту. Параметр spark.shuffle.spill имеет значение только во время (а не после) фазы хеширования/сортировки. На самом деле этот параметр вообще ничего не делает, начиная с spark 1.5.1. Это самая поучительная тема для меня за последний год: mail-archives.apache.org/mod_mbox/spark-dev/201604.mbox/ Я все еще думаю добавить его в официальную документацию, когда буду есть время. - person hbogert; 28.08.2016