Итерационные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т. Д. Эти алгоритмы усложняются с итерациями, размер данных на каждой итерации и обеспечение отказоустойчивости на каждой итерации - непростая задача. В этой статье я бы подробно остановился на некоторых соображениях по работе с этими проблемами. Мы использовали Spark для реализации нескольких итеративных алгоритмов, таких как создание подключенных компонентов, обход больших подключенных компонентов и т. Д. Ниже приведены результаты моего опыта работы в лабораториях Walmart по созданию подключенных компонентов для 60 миллиардов узлов идентификации клиентов.
Типы итерационных алгоритмов
Конвергенция данных. Здесь мы видим, что объем данных уменьшается с каждой итерацией, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основная проблема будет заключаться в обработке огромных наборов данных на первых нескольких итерациях, и после того, как набор данных значительно уменьшится, будет легко обрабатывать дальнейшие итерации до конечного условия.
Расходящиеся данные. Объем данных увеличивается на каждой итерации, и иногда они могут увеличиваться быстрее, что делает невозможным дальнейшее продвижение. Нам нужны ограничения, такие как количество итераций, начальный размер данных, размер вычислительной мощности и т. Д., Чтобы эти алгоритмы работали.
Подобные данные: на каждой итерации у нас будут более или менее одинаковые данные, и с таким алгоритмом будет очень легко справиться.
Дополнительные данные: на каждой итерации мы можем добавлять новые данные, особенно в машинном обучении, у нас могут быть новые наборы обучающих данных с периодическими интервалами.
Вызовы
- Линия RDD: Один из распространенных способов обеспечения отказоустойчивости любой системы - хранить реплики данных в разных местах, чтобы в случае отказа одного узла у нас была бы реплика, которая помогала бы до тех пор, пока узел выздоровел. Но Spark не поддерживает реплики данных, вместо этого он поддерживает граф происхождения преобразований, выполненных с данными в драйвере. Таким образом, этот граф происхождения будет полезен, если какой-либо фрагмент данных отсутствует, он может восстановить его, используя граф происхождения, следовательно, Spark является отказоустойчивым. По мере роста графа Lineage становится трудно восстановить данные по мере увеличения количества итераций.
- Память и дисковый ввод-вывод. В Spark RDD неизменяемы, поэтому для каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование памяти и диска. По мере увеличения итераций использование диска / памяти исполнителями будет увеличиваться, что может привести к замедлению из-за нехватки памяти и ожидания GC для выполнения очистки. В некоторых случаях памяти кучи будет недостаточно, и задача прервется.
- Размер задачи: в некоторых случаях может быть несколько задач, которые не подходят для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к возникновению узких мест.
Советы по преодолению вышеуказанных проблем
- Сохранение большого графа родословной в памяти и в случае отказа узла восстановление утраченных наборов данных потребует много времени. В таких случаях мы можем использовать кеширование или создание контрольных точек наборов данных на каждых N итерациях. Он будет сохранять рассчитанный RDD на каждой N-й итерации (кэширование будет храниться в памяти или на диске исполнителей, контрольная точка использует HDFS, нам нужно решить, исходя из наших потребностей, поскольку скорость будет отличаться для каждого из них). В случае сбоя RDD вычисляется обратно с последней контрольной точки / кэшированной. Вместо использования двух вышеуказанных методов вы также можете создать временную таблицу и сохранить вычисленный набор данных, разделенный по итерациям. Это было бы полезно в случае сбоя искрового задания, мы можем перезапустить с последней N-й итерации, и преимущество сохранения во временную таблицу состоит в том, чтобы избавиться от графа родословной RDD до этой итерации и начать новый граф родословной с этой точки. Поскольку линейный граф RDD становится все более популярным в итеративных алгоритмах, нам необходимо создавать гибридные решения с использованием кэширования, контрольных точек (см. Ссылку [2]) и временных таблиц для различных случаев использования.
- То же, что и выше, сохранение во временной таблице и обратное чтение из временной таблицы может избавить от графа происхождения и очистить память и диск от предыдущих RDD. Такая запись и чтение увеличивает накладные расходы, но дает огромное преимущество при работе с большими наборами данных. В частности, при конвергенции наборов данных нам может потребоваться выполнить этот процесс только для первых нескольких итераций и использовать кеширование, когда наборы данных стали маленькими из-за итераций. Сохранение во временную таблицу в качестве контрольной точки выглядит тривиально, но это не просто контрольная точка. Поскольку мы избавляемся от истории линейного графа, делая это с периодическими итерациями. Это снизит риск сбоя задания и сократит время восстановления потерянных данных.
- Обработка расходящихся данных является сложной задачей, потому что размер каждой задачи будет увеличиваться с итерациями и отнимать гораздо больше времени для каждого исполнителя. Поэтому нам нужен коэффициент, чтобы вычислить количество задач в (i + 1) итерации по сравнению с i-й итерацией, чтобы размер задачи оставался прежним. Например, предположим, что количество задач в i-й итерации равно 100, и каждая задача обрабатывает около 100 МБ данных. В i + 1 размер итерации каждой задачи увеличен до 150 МБ, мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ для каждой задачи. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач за счет повторного разделения и изменения случайных разделов на основе итераций.
- В случаях, когда размер искровой задачи огромен, попробуйте увеличить память исполнителя, чтобы она соответствовала размеру задачи. И если нам нужно выполнить соединения для искаженных наборов данных, где 10% задач занимают 90% времени выполнения и 90% задач выполняются за 10% времени. Предлагается обрабатывать эти задачи отдельно, выполняя их как два разных запроса. Нам нужно определить причину больших задач и можно ли разделить их на две группы, то есть маленькие и большие задачи. В первом запросе мы обрабатываем 90% задач, поскольку для их обработки нет никаких препятствий, и это займет 10% времени, как и раньше. В другом запросе будут обрабатываться большие задачи (10% задач) с использованием широковещательного соединения, так как таких задач меньше, а также избежать перетасовки данных.
Пример: Допустим, у нас есть таблица A и таблица B. данные о населении со столбцами user_id, name, city, state. Таблица B - это данные групп WhatsApp со столбцами user_id, group_id. Если мы пытаемся найти топ-5 городов с наибольшим количеством используемых групп WhatsApp. В этом примере могут быть крайние случаи, например, города с большим населением могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Чтобы решить эти угловые случаи, объединение между этими таблицами может быть выполнено в двух запросах. мы можем отфильтровать крупных пользователей с большим количеством групп (допустим, порог в 1000 групп на пользователя) и рассматривать их как большие задачи. И выполняйте объединения отдельно для крупных пользователей с помощью широковещательного объединения, поскольку количество крупных пользователей будет небольшим по сравнению с общими данными. Аналогичным образом для остальных пользователей выполните случайное объединение и объедините результаты и объедините их по городам, чтобы найти 5 лучших городов.
Пожалуйста, добавляйте любые вопросы в комментарии, вот несколько соответствующих ссылок для более подробной информации
Использованная литература :
[3] An interesting research paper [link] [4] Refer spark documentation