Сэкономьте 37% на Функциональном параллелизме в .NET с кодом fccterrell.

Сверните PLINQ: агрегатные функции

В функциональном программировании сворачивание - это функция высшего порядка, также известная как сокращение и накопление, цель которой - уменьшить заданную структуру данных, обычно последовательность элементов, в одно значение. Например, сокращение может возвращать среднее значение для ряда чисел, вычисляя суммирование, максимальное значение или минимальное значение.

Функция сворачивания принимает начальное значение, обычно называемое аккумулятором, которое используется как контейнер для промежуточных результатов. Затем в качестве второго аргумента он принимает двоичное выражение, которое действует как функция сокращения, чтобы применить его к каждому элементу в последовательности, чтобы наконец вернуть новое значение для аккумулятора. В целом редукция работает следующим образом. Вы берете бинарный оператор - то есть функцию с двумя аргументами - и вычисляете ее по вектору или набору элементов размера n, обычно слева направо. Иногда для первой операции с первым элементом используется специальное начальное значение, поскольку предыдущего значения для использования нет. Во время каждого шага итерации двоичное выражение принимает текущий элемент из последовательности и значение аккумулятора в качестве входных данных и возвращает значение, которое перезаписывает аккумулятор. Конечным результатом является значение последнего аккумулятора, как показано на рисунке 1.

Рис. 1. Функция сворачивания сокращает последовательность до одного значения. Функция (f) в данном случае является умножением и принимает начальный аккумулятор со значением 1. Затем для каждой итерации в последовательности (5, 7, 9) функция применяет вычисление к текущему элементу и аккумулятору, результат затем используется для обновления аккумулятора до необходимого значения.

Те же концепции, которые вы, возможно, узнали о функции сворачивания, можно применить к PLINQ как в F #, так и в C #. Фактически, PLINQ имеет эквивалент функции сворачивания под названием Aggregate. Агрегат PLINQ является правым. Например, вот одна из его перегруженных подписей:

public static TAccumulate Aggregate<TSource, TAccumulate>(
this IEnumerable<TSource> source,TAccumulate seed,Func<TAccumulate, TSource, TAccumulate> func);

Вышеупомянутая функция принимает три аргумента, которые сопоставляются с источником последовательности: начальное значение аккумулятора и функция свертывания func, которая обновляет аккумулятор для каждого элемента.

Лучший способ понять, как работает Aggregate, - это на примере. В этом примере мы распараллелим алгоритм кластеризации k-средних с помощью PLINQ и функции Aggregate. Цель примера - показать, насколько простой и производительной становится программа при использовании этой конструкции. Следующая реализация алгоритма кластеризации k-средних использует функциональное программирование, выражения последовательности с PLINQ и некоторые из многих встроенных функций для управления данными.

k-означает кластеризацию

k-средство, также называемое алгоритмом Ллойда, представляет собой алгоритм машинного обучения без учителя, который классифицирует набор точек данных на кластеры, каждый из которых сосредоточен на своем собственном центроиде. Центроид кластера - это сумма точек в кластере, деленная на количество точек. Он представляет собой центр масс геометрической формы, имеющей однородную плотность. Алгоритм кластеризации k-средних принимает входные данные и значение k, которое указывает количество кластеров, которые нужно установить, затем случайным образом размещает центроиды в этих кластерах. Идея этого алгоритма состоит в том, чтобы сгенерировать несколько центроидов, которые образуют центры кластеров. Каждая точка данных связана со своим ближайшим центроидом. Расстояние рассчитывается с использованием простой функции евклидова расстояния [1]. Затем каждый центроид перемещается к среднему значению положений связанных с ним точек.

Центроид вычисляется как сумма его точек, а затем эта сумма делится на размер кластера. Итерация включает следующие шаги:

  • Сумма или сокращение, которое вычисляет сумму баллов в каждом кластере.
  • Разделите сумму каждого кластера на количество точек в этом кластере.
  • Переназначить или сопоставить точки кластера с ближайшим центроидом

Процесс является итеративным, что означает, что он повторяется до тех пор, пока алгоритм не достигнет окончательного результата или не превысит максимальное количество шагов. Когда алгоритм работает, он непрерывно корректирует и обновляет центроиды на каждой итерации, чтобы лучше кластеризовать входные данные.

Для источника данных, используемого в качестве входных данных в алгоритме кластеризации k-средних, мы будем использовать общедоступные записи качества белого вина (рисунок 2), которые доступны для загрузки в Интернете.

Рис. 2 Результат выполнения алгоритмов k-средних с использованием C # LINQ для последовательной версии кода и C # PLINQ для распараллеленной версии. Центроиды - это красные точки в обоих кластерах. Каждое изображение представляет одну итерацию алгоритма k-средних с 11 центроидами в кластере. Каждая итерация алгоритма вычисляет центроид (выделенный красным) каждого кластера, а затем присваивает каждую точку кластеру с ближайшим центроидом.

Полная реализация программы k-means опущена из-за длины кода, и только соответствующий фрагмент кода показан в примере в листинге 1.

Необходимо рассмотреть две основные функции: GetNearestCentroid и UpdateCentroids. Во-первых, функция GetNearestCentroid используется для обновления кластеров. Для каждого ввода данных эта функция находит ближайший центроид, назначенный кластеру, которому принадлежит ввод.

Листинг 1: функция для поиска ближайшего центроида (используется для обновления кластеров)

double[] GetNearestCentroid(double[][] centroids, double[] center){
           return centroids.Aggregate((centroid1, centroid2) => //#A
              Dist(center, centroid2) < Dist(center, centroid1)
              ? centroid2
              : centroid1);
     }
}

#A Агрегатная функция LINQ для поиска ближайшего центроида

Реализация GetNearestCentroid использует функцию Aggregate для сравнения расстояний между центроидами, чтобы найти ближайший из них. На этом этапе, если входные данные в каком-либо из кластеров не обновляются, потому что ближайший центроид не найден, алгоритм завершается и возвращает результат.

Следующим шагом, показанным в листинге 2, после обновления кластеров является обновление местоположений центроидов. Функция UpdateCentroids вычисляет центр для каждого кластера и сдвигает центроиды в эту точку. Затем с обновленными значениями центроидов алгоритм повторяет предыдущий шаг, выполняя функцию GetNearestCentroid, пока не найдет результат закрытия. Эти операции продолжаются до тех пор, пока не будет выполнено условие сходимости и положения центров кластера не станут стабильными.

Листинг 2: обновите расположение центроидов в соответствии с центром кластера.

double[][] UpdateCentroids(double[][] centroids)
{
    var partitioner = Partitioner.Create(data, true); //#A
    var result = partitioner.AsParallel() //#B
     .WithExecutionMode(ParallelExecutionMode.ForceParallelism) //#C
     .GroupBy(u => GetNearestCentroid(centroids, u))
     .Select(points =>
       points
       .Aggregate(new double[N], //#D
       (acc, item) => acc.Zip(item, (a, b) => a + b).ToArray()) //#E
       .Select(items => items / points.Count())
       .ToArray());
     return result.ToArray();
}

#A Специальный разделитель для максимальной производительности

#B Параллельное выполнение запроса из модуля разделения

#C Принудительное параллелизм

#D Используйте агрегатную функцию, чтобы найти центр центроидов в кластере; начальное значение начального числа - это двойной массив размером N (размерность данных)

#E Используйте функцию Zip, чтобы связать последовательность расположения центроидов и последовательность аккумуляторов

С функцией UpdateCentroids требуется много вычислений, поэтому использование PLINQ может эффективно распараллелить код, тем самым увеличивая скорость.

ПРИМЕЧАНИЕ. Даже если центроиды не перемещаются по плоскости, они могут изменить свои индексы в массиве результатов из-за характера GroupBy и AsParallel.

Запрос PLINQ в теле функции UpdateCentroids выполняет агрегирование в два этапа. На первом этапе используется функция GroupBy, которая принимает в качестве аргумента функцию, предоставляющую ключ, используемый для агрегирования. В этом случае ключ вычисляется предыдущей функцией GetNearestCentroid. На втором этапе сопоставления, который запускает функцию Select, вычисляются центры новых кластеров для каждой заданной точки. Это вычисление выполняется функцией Aggregate, которая принимает список точек в качестве входных данных (координаты местоположения каждого центроида) и вычисляет их центры, сопоставленные с тем же кластером, с использованием локального аккумулятора acc, как показано в листинге 3. Аккумулятор представляет собой массив удваивается с размером N, который представляет собой размерность данных для обработки (размерность данных также известна как количество характеристик / измерений). Значение N определяется как константа в верхнем классе, потому что оно никогда не изменяется и может безопасно использоваться совместно. Функция Zip используется для объединения ближайших центроидов (точек) и аккумуляторных последовательностей. Затем центр этого кластера пересчитывается путем усреднения положения точек в кластере. Детали реализации алгоритма не имеют решающего значения, ключевым моментом является то, что описание алгоритма достаточно точно и напрямую транслируется в PLINQ с помощью Aggregate. Если вы попытаетесь повторно реализовать ту же функциональность без функции Aggregate, программа будет работать в уродливых и трудных для понимания циклах с изменяемыми общими переменными. Например, в листинге 3 показан эквивалент функции UpdateCentroids без помощи функции Aggregate.

Листинг 3: функция UpdateCentroids, реализованная без Aggregate.

double[][] UpdateCentroidsWithMutableState(double[][] centroids)
{
    var result = data.AsParallel()
      .GroupBy(u => GetNearestCentroid(centroids, u))
      .Select(points => {
          var res = new double[N];
          foreach (var x in points) //#A
             for (var i = 0; i < N; i++)
                  res[i] += x[i]; //#B
          var count = points.Count();
          for (var i = 0; i < N; i++)
              res[i] /= count; //#B
       return res;
      });
    return result.ToArray();
}

# Императивный цикл для вычисления центра центроидов в кластере

#B Использование изменяемого состояния

На рисунке 3 показаны результаты тестирования алгоритма кластеризации k-средних. Тест был выполнен на четырехъядерном компьютере с 8 ГБ оперативной памяти. Тестируемые алгоритмы - это последовательный LINQ, параллельный PLINQ и параллельный PLINQ с использованием специального разделителя.

Рис. 3 Бенчмарк, выполняющий алгоритм k-средних с использованием четырехъядерного компьютера с 8 ГБ ОЗУ. Тестируемые алгоритмы - это последовательный LINQ и параллельный PLINQ с вариантом специализированного разделителя. Параллельный PLINQ выполняется за 0,481 секунды, что в три раза быстрее, чем последовательная версия LINQ, которая выполняется за 1,316 секунды. Небольшое улучшение - это PLINQ со специализированным разделителем, который работает за 0,426 секунды, что на 12% быстрее, чем исходная версия PLINQ.

ПРИМЕЧАНИЕ. Когда в мультипроцессоре используется несколько потоков, для выполнения задачи может использоваться более одного процессора. В этом случае процессорное время может быть больше прошедшего времени.

Результаты тестов впечатляют. Параллельная версия алгоритма k-средних с использованием PLINQ работает в три раза быстрее, чем последовательная версия на четырехъядерном компьютере. Версия модуля разделения PLINQ, показанная в листинге 3, на 10% быстрее, чем версия PLINQ. В функции UpdateCentroids используются два интересных расширения PLINQ. Расширение WithExecutionMode (ParallelExecution Mode.ForceParallelism) используется для уведомления планировщика TPL о том, что запрос должен выполняться одновременно.

Два варианта настройки ParallelExecutionMode - ForceParallelism и Default. Перечисление ForceParallelism принудительно выполняет параллельное выполнение. Значение по умолчанию относится к запросу PLINQ для принятия соответствующего решения о выполнении.

Как правило, параллельное выполнение запроса PLINQ не гарантируется. Планировщик TPL может решить, основываясь на таких факторах, как размер, сложность операций и текущее состояние доступных компьютерных ресурсов, что запрос должен выполняться последовательно. Однако бывают случаи, когда вы хотите принудительно использовать параллелизм.

Другое интересное расширение, используемое в функции UpdateCentroids, - это настраиваемый Partitioner:

var partitioner = Partitioner.Create (данные, истина)

Класс Partitioner ‹T› - это абстрактный класс, допускающий как статическое, так и динамическое разделение. Стандартный TPL Partitioner имеет встроенные стратегии, которые автоматически обрабатывают разбиение, обеспечивая хорошую производительность для широкого диапазона источников данных. Цель TPL Partitioner - найти баланс между слишком большим количеством разделов (что приводит к накладным расходам) и слишком небольшим количеством разделов (что не позволяет использовать доступные ресурсы). Однако бывают ситуации, когда секционирование по умолчанию может быть неподходящим, и вы можете повысить производительность от запроса PLINQ, используя адаптированную стратегию секционирования.

В этом примере настраиваемый разделитель создается просто с помощью перегруженной версии метода Partitioner.Create, который принимает в качестве аргумента источник данных и флаг, указывающий, какую стратегию использовать, динамическую или статическую. Когда флаг установлен в значение true, стратегия секционирования является динамической, в противном случае - статической. Статическое разбиение часто обеспечивает ускорение на многоядерном компьютере с небольшим количеством ядер (два или четыре). Динамическое разбиение направлено на балансировку нагрузки между задачами путем назначения произвольного размера фрагментов и последующего постепенного увеличения длины после каждой итерации. Можно построить очень сложные разделители со сложными стратегиями.

Понимание того, как работает раздел:

В PLINQ существует четыре типа алгоритмов разделения:

  • Разделение по диапазону работает с источником данных с известным размером. Массивы входят в эту категорию:
int[] data = Enumerable.Range(0, 1000).ToArray();
data.AsParallel().Select(n => Compute(n));
  • Разбиение на разделы - противоположность Range. Размер источника данных не определен заранее, поэтому запрос PLINQ выбирает по одному элементу за раз и назначает его задаче, пока источник данных не станет пустым. Основное преимущество этой стратегии состоит в том, что можно распределять нагрузку между задачами.
IEnumerable<int> data = Enumerable.Range(0, 1000);
Data.AsParallel().Select(n => Compute(n));
  • Разделение хэша использует хэш-код значений для назначения элементов с одним и тем же хеш-кодом одной и той же задаче (например, когда запрос PLINQ выполняет GroubBy).
  • Разделение фрагментов работает с увеличивающимся размером фрагмента, когда каждая задача извлекает фрагмент элементов из источника данных, длина которого увеличивается с количеством итераций. Таким образом, на каждой итерации появляются блоки большего размера, чтобы задача была максимально загружена.

Реализация параллельной функции Reduce для PLINQ

Теперь вы узнали о силе агрегированных операций, которые особенно подходят для масштабируемого распараллеливания на многоядерном оборудовании благодаря низкому потреблению памяти и оптимизации обезлесения. Низкая пропускная способность памяти связана с тем, что агрегатные функции производят меньше данных, чем принимают. Например, другие агрегатные функции, такие как Sum () и Average (), сокращают набор элементов до одного значения. Это концепция редукции: требуется функция для уменьшения последовательности элементов до одного значения. Расширения списка PLINQ не имеют специальной функции Reduce, как в понимании списков F # или других языках функционального программирования, таких как Scala и Elixir. Однако после знакомства с функцией агрегирования реализация многоразовой функции сокращения становится легкой задачей. В листинге 4 показана реализация функции Reduce.

Листинг 4. Реализация параллельной функции Reduce с использованием Aggregate.

static TSource Reduce<TSource>(this ParallelQuery<TSource> source, Func<TSource, TSource, TSource> func)
{
   return ParallelEnumerable.Aggregate(source, //#A
         (item1, item2) => func(item1, item2)); //#B
}
int[] source = Enumerable.Range(0, 100000).ToArray();
int result = source.AsParallel()
      .Reduce((value1, value2) => value1 + value2);//#C

#A Используйте функцию агрегирования, чтобы разработать собственный вариант сокращения

#B Для каждой итерации функция func применяется к текущему элементу, а предыдущее значение используется как аккумулятор

#C Используйте функцию Reduce, передающую анонимную лямбду для применения в качестве функции уменьшения

Функция Reduce принимает два аргумента: последовательность, которую нужно уменьшить, и функцию, которую нужно применить для уменьшения. Базовая реализация использует Aggregate, обрабатывая первый элемент исходной последовательности как аккумулятор. Остальная часть кода является примером применения функции Reduce для параллельного вычисления суммы заданной последовательности.

Ассоциативность и коммутативность для детерминированного агрегирования

Порядок вычисления агрегирования, которое выполняется параллельно с использованием PLINQ (или PSeq), применяет функцию сокращения иначе, чем последовательная версия.

В предыдущем примере k-средних последовательный результат вычисляется в другом порядке, чем параллельный результат, но два выхода гарантированно равны. Это связано с тем, что оператор + (плюс), используемый для обновления расстояний до центроидов, обладает особыми свойствами ассоциативности и коммутативности. Это строка кода, используемая для поиска ближайшего центроида:

Расстояние (центр, центроид2) ‹Расстояние (центр, центроид1)

Это строка кода, используемая для поиска обновлений центроидов:

В функциональном программировании математические операторы являются функциями. + (Плюс) - это бинарный оператор, что означает, что он работает с двумя значениями и манипулирует ими, чтобы вернуть результат.

points
    .Aggregate(new double[N],
      (acc, item) => acc.Zip(item, (a, b) => a + b).ToArray())
    .Select(items => items / points.Count())

Функция является ассоциативной, если порядок, в котором она применяется, не меняет результат. Это свойство важно для операций сокращения. Операторы + (плюс) и * (умножение) ассоциативны, потому что:

(a + b) + c = a + (b + c)

(a * b) * c = a * (b * c)

Функция является коммутативной, когда порядок операндов не меняет ее вывод, пока учитывается каждое число. Это свойство важно для операций объединения. Операторы + (плюс) и * (умножение) являются коммутативными, потому что:

a + b + c = b + c + a

a * b * c = b * c * a

Почему это важно?

Это имеет значение, потому что, используя эти свойства, можно разделить данные и иметь несколько потоков, работающих независимо на своих собственных фрагментах, обеспечивая параллелизм и по-прежнему возвращая правильный результат в конце.

Комбинация этих свойств позволяет реализовать параллельные шаблоны, такие как «разделяй и властвуй», «Форк / соединяй» и MapReduce.

Чтобы параллельное агрегирование в PLINQ / PSeq работало правильно, применяемая операция должна быть как ассоциативной, так и коммутативной. Хорошая новость заключается в том, что многие из самых популярных функций редукции являются одновременно ассоциативными и коммутативными.

Надеюсь, вы нашли эту статью информативной. Для получения дополнительной информации о пересечении параллелизма и парадигмы функционального программирования загрузите бесплатную первую главу Функциональный параллелизм в .NET и просмотрите эту презентацию Slideshare для получения дополнительных сведений.