После прочтения блога инженера Facebook о сценарии использования Spark 60 ТБ (https://code.facebook.com/posts/1671373793181703/apache-spark-scale-a-60-tb-production-use-case/) стало интересно чтобы посмотреть немного подробнее о найденных ошибках, как они это исправили, а также для целей:

  1. получить некоторый опыт для ошибок, возникающих из-за экстремального состояния времени выполнения
  2. лучше знаком с исходным кодом Spark

Давайте начнем:

Искра-14363

https://issues.apache.org/jira/browse/SPARK-14363
Исправить утечку памяти в сортировщике. Когда UnsafeExternalSorter сбрасывает данные на диск, он не освобождает базовый массив указателей. В результате мы видим много OOM исполнителя, а также используемую память.

Фактически измененные коды находятся в классе ShuffleExternalSorter, который вызывается методом UnsafeShuffleWriter. Когда Executor обрабатывает ShuffleMapTask, он использует ShuffleWriter для записи (сортировки) данных с информацией о разделах в выходные файлы. Базовый массив указателей, упомянутый в их описании, представляет собой массив типа LongArray внутри ShuffleInMemorySorter.Этот массив используется для хранения всех указателей записей. (Подробно о LongArray и длинном 64-битном указателе я рассказал в этом блоге). Преимущество в том, что сортировка по разделам может обрабатываться в этом массиве, а не в записи напрямую. Это улучшение является частью проекта Tungsten.

При каждом вызове Spill() ShuffleExternalSorter вызывает ShuffleInMemorySorter, чтобы выполнить сортировку и записать данные сортировки на диск. Ошибка заключается в том, что ShuffleExternalSorter вызывает метод reset() ShuffleInMemorySorter только при записи ПОСЛЕДНЕГО перенесенного файла. В большинстве случаев это нормально, потому что LongArray предназначен для хранения 64-битных указателей. Но команда FB обнаружила, что если одна задача (ShuffleInMemorySorter) продолжает занимать все больше и больше памяти, например, какая-то другая задача, завершенная в том же Executor, этот LongArray будет занимать значительную часть памяти, и особенно, поскольку эта разлитая задача не полностью завершена, эта память LongArray НИКОГДА не будет освобождена. Это приводит к тому, что другие новые задачи, запущенные на этом исполнителе, имеют OOM. Будет хорошо, если команда FB опубликует полный стек исключения OOM.

Изменение очень простое. Вызывайте метод reset() ShuffleInMemorySorter каждый раз, когда в ShuffleExternalSorter вызывается Spill().

Изучено: когда вы выделяете память кучи JVM или память вне кучи, всегда освобождайте ее как можно скорее. Особенно, когда ваше распределение может динамически расширяться. Например, в ExpandPointerArray() функции ShuffleInMemorySorter мы всегда удваиваем текущий размер ArrayLong. Лучше поставить ограничение. Я нашел поздние коммиты на ShuffleExternalSorter, они добавили numElementsForSpillThreshold для этих указателей ArrayLong. по умолчанию это 8G ArrayLong, после достижения этого порога записи немедленно сбрасываются на диск, что освобождает всю память, выделенную этим массивом указателей сейчас.

Искра-14277

https://issues.apache.org/jira/browse/SPARK-14277
Метод JNI — (Snappy.ArrayCopy) — вызывался для каждой читаемой/записываемой строки. Мы подняли эту проблему, и поведение Snappy было изменено, чтобы использовать вместо него System.ArrayCopy, не основанный на JNI.

UnsafeSorterSpillReader читает разлитый файл запись за записью. И это заканчивается вызовом большого количества Snappy.arrayCopy() (метод JNI), что вызывает проблему накладных расходов ЦП. Полезно знать, что собственный метод Java System.arrayCopy реализован с помощью внутреннего собственного кода VM(небезопасные методы являются встроенными, то есть каждый вызов метода компилируется JIT в одна машинная инструкция.), так что очень-очень быстро и не вызов JNI. Исправление находится на стороне библиотеки snappy-java.

Искра-5581

https://issues.apache.org/jira/browse/SPARK-5581
На стороне карты при записи данных в случайном порядке на диск задача карты открывала и закрывала один и тот же файл для каждого раздел. Мы внесли исправление, позволяющее избежать ненужного открытия/закрытия, и наблюдали повышение производительности ЦП до 50% для заданий, записывающих очень большое количество разделов в случайном порядке.

Хе-хе, похоже, команда Spark давно знает об этой проблеме, просто никто не хочет внедрять это улучшение :p

ShuffleExternalSorter (снова…) используйте DiskBlockObjectWriter для записи нескольких байтов на диск. Важной особенностью этого средства записи является возможность атомарного добавления данных, что используется для записи данных из разных разделов в один отсортированный выходной файл.

Сортировщик должен записывать записи на диск одну за другой, когда происходит сброс (слишком много памяти из-за запущенного Executor) или перед его закрытием. Во время сброса каждый раз, когда запись для записи переключается в раздел, Sorter должен вызывать commitAndClose() Writer. На самом деле записи для записи уже отсортированы по разделам в этот момент, но в случае использования FB некоторые этапы Spark содержат более 100 тыс. разделов, в худшем случае эта commitAndClose() будет стоить много процессорного времени.

В новой реализации Sorter вызывает commit() только в том случае, если ему нужно переключить разделы. Изменения внутри DiskBlockObjectWriter включают в себя добавление флага streamOpen, указателя commitPosition, класса ManualCloseBufferedOutputStream, которые немного взломали close() и flush() универсального OutputStream…
Я не читал и не понимал каждую строку, измененную в Writer. Но, видимо, крупная победа.

Искра-14649

https://issues.apache.org/jira/browse/SPARK-14649
Драйвер Spark повторно отправлял уже запущенные задачи, когда произошел сбой выборки, что привело к снижению производительности. Мы исправили проблему, избегая повторного запуска запущенных задач.

Это спорное исправление, до сих пор PR не включен в мастер Spark. В этом тикете речь идет только о повторном выполнении задач по выборке в случайном порядке. Текущая реализация DAGScheduler заставляет повторно запускать все незавершенные задачи выборки, если произошла одна ошибка выборки. Sital Kedia из FB хочет, чтобы запущенные задачи выборки продолжали работать, даже если произошла одна ошибка выборки. Поскольку эта работающая задача все еще может быть завершена, и ее не нужно отправлять в DAGScheduler для повторного запуска. Но Кей Оустерхаут из Беркли указал: если текущая работающая задача завершается с ошибкой с опозданием, при повторном запуске они больше не могут получить выходные данные старой задачи карты, потому что соответствующая задача карты имеет триггер повторного запуска. из-за неудачной задачи выборки ранее … (но я думаю, что в этом случае эта задача может быть запущена повторно, как и другие неудачные задачи выборки, может быть, это создает незавершенные циклы?)

При разработке программного обеспечения при обработке сбоя нужно быть очень осторожным, в контексте перетасовки это просто очень сложно. Я могу понять боль Ситала Кедиа и согласиться с его идеей, но я не очень хорошо знаю, в каком состоянии невыполненная задача извлечения может повторно запустить задачу карты и запустить ее повторно. Мэй поздно смотрит на источник, чтобы узнать подробности по этому вопросу. Но, по крайней мере, теперь лучше понять, что происходит, когда задача выборки не удалась.

Искра-15074

https://issues.apache.org/jira/browse/SPARK-15074
Мы обнаружили, что служба перемешивания открывает/закрывает файл индекса перемешивания при каждой выборке в случайном порядке. Мы внесли изменения в кэширование информации об индексе, чтобы избежать открытия/закрытия файлов и повторного использования информации об индексе для последующих выборок.

Файлы индекса генерируются задачей преобразователя, чтобы помочь задаче выборки загрузить правильную часть сегмента файла из одного выходного файла преобразователя. Каждый редуктор должен прочитать 8-байтовое значение смещения, сохраненное в индексном файле. Таким образом, даже с 10k редукторами размер индексного файла все еще очень мал. Подходит для кэширования в памяти. Имя файла индекса имеет формат shuffle_ShuffleId_MapId_0.index, а имя файла данных — в формате shuffle_ShuffleId_MapId_0.data. Вся логика чтения файла индекса была реализована в getSortBasedShuffleBlockData()ExternalShuffleBlockResolver.

Перед ExternalShuffleBlockResolver находится ExternalShuffleBlockHandler, который обрабатывает все вызовы RPC для извлечения блока перемешивания или регистрации исполнителя.

Исправление заключается в использовании com.google.common.cache.LoadingCache для кэширования ShuffleIndexInformation для каждого выходного файла сопоставления. Таким образом, каждый редьюсер может извлекать данные блока перетасовки, не открывая и не закрывая индексный файл.

Искра-15958

https://issues.apache.org/jira/browse/SPARK-15958
Исходный размер буфера по умолчанию для сортировщика слишком мал (4 КБ), и мы обнаружили, что он очень мал для больших рабочих нагрузок — и в результате мы тратим значительное количество времени на использование буфера и копирование содержимого.

Они говорят о DEFAULT_INITIAL_SORT_BUFFER_SIZE внутри UnsafeExternalRowSorter. Следуйте исходникам, этот размер буфера, наконец, передается конструктору DiskBlockObjectWriter.

У вас также есть DISK_WRITE_BUFFER_SIZEвShuffleExternalSorter со значением по умолчанию 1024k, которое используется для буферизации байтов, записываемых в память, также используется в методе write() DiskBlockObjectWriter. Обычно кажется, что конфигурации буфера Spark слишком много, и из названия очень сложно понять, для чего он будет использоваться.

Искра-15569

https://issues.apache.org/jira/browse/SPARK-15569
Используя интеграцию Spark Linux Perf, мы обнаружили, что около 20 процентов времени ЦП тратится на проверку и обновление перетасовать байты записанных метрик.

Это легко понять и исправить. Ключ в том, чтобы обнаружить эту плохую строку кода. Просто измените метод DiskObjectWriter.recordWritten() с:

if (numRecordsWritten % 32 == 0) {
  updateBytesWritten()
}

to

if (numRecordsWritten % 16384 == 0) {
  updateBytesWritten()
}

updateBytesWritten() используется только для обновления метрик записи.

Сводка

Я прошел почти все исправления и улучшения оптимизации производительности в блоге. Большинство из них касалось стадии тасования. Производительность тасования является ключом к обработке больших данных, Spark уже проделал хорошую работу. Но на этот раз, в контексте реальных данных 60 ТБ, команда FB обнаружила несколько сложных ошибок и проблем, со всем их пиаром, просто делающих Spark лучше. Но большая часть их исправлений включена только в версию 2.1.0.