Итерационные алгоритмы широко применяются в машинном обучении, связанных компонентах, ранжировании страниц и т. Д. Эти алгоритмы усложняются с итерациями, размер данных на каждой итерации и обеспечение отказоустойчивости на каждой итерации - непростая задача. В этой статье я бы подробно остановился на некоторых соображениях по работе с этими проблемами. Мы использовали Spark для реализации нескольких итеративных алгоритмов, таких как создание подключенных компонентов, обход больших подключенных компонентов и т. Д. Ниже приведены результаты моего опыта работы в лабораториях Walmart по созданию подключенных компонентов для 60 миллиардов узлов идентификации клиентов.

Типы итерационных алгоритмов

Конвергенция данных. Здесь мы видим, что объем данных уменьшается с каждой итерацией, т.е. мы начинаем 1-ю итерацию с огромными наборами данных, а размер данных уменьшается с увеличением количества итераций. Основная проблема будет заключаться в обработке огромных наборов данных на первых нескольких итерациях, и после того, как набор данных значительно уменьшится, будет легко обрабатывать дальнейшие итерации до конечного условия.

Расходящиеся данные. Объем данных увеличивается на каждой итерации, и иногда они могут увеличиваться быстрее, что делает невозможным дальнейшее продвижение. Нам нужны ограничения, такие как количество итераций, начальный размер данных, размер вычислительной мощности и т. Д., Чтобы эти алгоритмы работали.

Подобные данные: на каждой итерации у нас будут более или менее одинаковые данные, и с таким алгоритмом будет очень легко справиться.

Дополнительные данные: на каждой итерации мы можем добавлять новые данные, особенно в машинном обучении, у нас могут быть новые наборы обучающих данных с периодическими интервалами.

Вызовы

  1. Линия RDD: Один из распространенных способов обеспечения отказоустойчивости любой системы - хранить реплики данных в разных местах, чтобы в случае отказа одного узла у нас была бы реплика, которая помогала бы до тех пор, пока узел выздоровел. Но Spark не поддерживает реплики данных, вместо этого он поддерживает граф происхождения преобразований, выполненных с данными в драйвере. Таким образом, этот граф происхождения будет полезен, если какой-либо фрагмент данных отсутствует, он может восстановить его, используя граф происхождения, следовательно, Spark является отказоустойчивым. По мере роста графа Lineage становится трудно восстановить данные по мере увеличения количества итераций.
  2. Память и дисковый ввод-вывод. В Spark RDD неизменяемы, поэтому для каждой итерации мы будем создавать новую копию преобразованных данных (новый RDD), что увеличит использование памяти и диска. По мере увеличения итераций использование диска / памяти исполнителями будет увеличиваться, что может привести к замедлению из-за нехватки памяти и ожидания GC для выполнения очистки. В некоторых случаях памяти кучи будет недостаточно, и задача прервется.
  3. Размер задачи: в некоторых случаях может быть несколько задач, которые не подходят для одного исполнителя, или одна задача занимает гораздо больше времени, чем остальные задачи, что может привести к возникновению узких мест.

Советы по преодолению вышеуказанных проблем

  1. Сохранение большого графа родословной в памяти и в случае отказа узла восстановление утраченных наборов данных потребует много времени. В таких случаях мы можем использовать кеширование или создание контрольных точек наборов данных на каждых N итерациях. Он будет сохранять рассчитанный RDD на каждой N-й итерации (кэширование будет храниться в памяти или на диске исполнителей, контрольная точка использует HDFS, нам нужно решить, исходя из наших потребностей, поскольку скорость будет отличаться для каждого из них). В случае сбоя RDD вычисляется обратно с последней контрольной точки / кэшированной. Вместо использования двух вышеуказанных методов вы также можете создать временную таблицу и сохранить вычисленный набор данных, разделенный по итерациям. Это было бы полезно в случае сбоя искрового задания, мы можем перезапустить с последней N-й итерации, и преимущество сохранения во временную таблицу состоит в том, чтобы избавиться от графа родословной RDD до этой итерации и начать новый граф родословной с этой точки. Поскольку линейный граф RDD становится все более популярным в итеративных алгоритмах, нам необходимо создавать гибридные решения с использованием кэширования, контрольных точек (см. Ссылку [2]) и временных таблиц для различных случаев использования.
  2. То же, что и выше, сохранение во временной таблице и обратное чтение из временной таблицы может избавить от графа происхождения и очистить память и диск от предыдущих RDD. Такая запись и чтение увеличивает накладные расходы, но дает огромное преимущество при работе с большими наборами данных. В частности, при конвергенции наборов данных нам может потребоваться выполнить этот процесс только для первых нескольких итераций и использовать кеширование, когда наборы данных стали маленькими из-за итераций. Сохранение во временную таблицу в качестве контрольной точки выглядит тривиально, но это не просто контрольная точка. Поскольку мы избавляемся от истории линейного графа, делая это с периодическими итерациями. Это снизит риск сбоя задания и сократит время восстановления потерянных данных.
  3. Обработка расходящихся данных является сложной задачей, потому что размер каждой задачи будет увеличиваться с итерациями и отнимать гораздо больше времени для каждого исполнителя. Поэтому нам нужен коэффициент, чтобы вычислить количество задач в (i + 1) итерации по сравнению с i-й итерацией, чтобы размер задачи оставался прежним. Например, предположим, что количество задач в i-й итерации равно 100, и каждая задача обрабатывает около 100 МБ данных. В i + 1 размер итерации каждой задачи увеличен до 150 МБ, мы можем перетасовать эти 100 задач до 150 задач и оставить 100 МБ для каждой задачи. Таким образом, в расходящихся наборах данных нам необходимо увеличить количество задач за счет повторного разделения и изменения случайных разделов на основе итераций.
  4. В случаях, когда размер искровой задачи огромен, попробуйте увеличить память исполнителя, чтобы она соответствовала размеру задачи. И если нам нужно выполнить соединения для искаженных наборов данных, где 10% задач занимают 90% времени выполнения и 90% задач выполняются за 10% времени. Предлагается обрабатывать эти задачи отдельно, выполняя их как два разных запроса. Нам нужно определить причину больших задач и можно ли разделить их на две группы, то есть маленькие и большие задачи. В первом запросе мы обрабатываем 90% задач, поскольку для их обработки нет никаких препятствий, и это займет 10% времени, как и раньше. В другом запросе будут обрабатываться большие задачи (10% задач) с использованием широковещательного соединения, так как таких задач меньше, а также избежать перетасовки данных.
    Пример: Допустим, у нас есть таблица A и таблица B. данные о населении со столбцами user_id, name, city, state. Таблица B - это данные групп WhatsApp со столбцами user_id, group_id. Если мы пытаемся найти топ-5 городов с наибольшим количеством используемых групп WhatsApp. В этом примере могут быть крайние случаи, например, города с большим населением могут быть большой задачей, пользователи с большим количеством групп могут привести к большим задачам. Чтобы решить эти угловые случаи, объединение между этими таблицами может быть выполнено в двух запросах. мы можем отфильтровать крупных пользователей с большим количеством групп (допустим, порог в 1000 групп на пользователя) и рассматривать их как большие задачи. И выполняйте объединения отдельно для крупных пользователей с помощью широковещательного объединения, поскольку количество крупных пользователей будет небольшим по сравнению с общими данными. Аналогичным образом для остальных пользователей выполните случайное объединение и объедините результаты и объедините их по городам, чтобы найти 5 лучших городов.

Пожалуйста, добавляйте любые вопросы в комментарии, вот несколько соответствующих ссылок для более подробной информации

Использованная литература :





[3] An interesting research paper [link]
[4] Refer spark documentation