Многопоточная альтернатива мьютексу в parallel_for

Я новичок в C++, поэтому, пожалуйста, извините, если это глупый вопрос, но я не нашел хорошего примера того, что ищу в Интернете.

В основном я использую цикл parallel_for, чтобы найти максимум внутри 2D-массива (и кучу других операций между ними). Прежде всего, я даже не знаю, лучший ли это подход, но, учитывая длину этого двумерного массива, я думаю, что разделение вычислений будет быстрее.

Мой код:

vector<vector<double>> InterpU(1801, vector<double>(3601, 0));
Concurrency::parallel_for(0, 1801, [&](int i) {

    long k = 0; long l = 0;
    pair<long, long> Normalized;
    double InterpPointsU[4][4];
    double jRes;
    double iRes = i * 0.1;
    double RelativeY, RelativeX;
    int p, q;

    while (iRes >= (k + 1) * DeltaTheta) k++;
    RelativeX = iRes / DeltaTheta - k;
    for (long j = 0; j < 3600; j++)
    {
        jRes = j * 0.1;
        while (jRes >= (l + 1) * DeltaPhi) l++;
        RelativeY = jRes / DeltaPhi - l;
        p = 0;
        for (long m = k - 1; m < k + 3; m++)
        {
            q = 0;
            for (long n = l - 1; n < l + 3; n++)
            {
                Normalized = Normalize(m, n, PointsTheta, PointsPhi);
                InterpPointsU[p][q] = U[Normalized.first][Normalized.second];
                q++;
            }
            p++;
        }
        InterpU[i][j] = bicubicInterpolate(InterpPointsU, RelativeX, RelativeY);
        if (InterpU[i][j] > MaxU)
        {
            SharedDataLock.lock();
            MaxU = InterpU[i][j];
            SharedDataLock.unlock();
        }
    }
    InterpU[i][3600] = InterpU[i][0];
});

Здесь вы можете видеть, что я использую mutex с именем SharedDataLock для защиты нескольких потоков, обращающихся к одному и тому же ресурсу. MaxU — это переменная, которая должна содержать максимум вектора InterpU. Код работает хорошо, но так как у меня проблемы со скоростью работы, я начал изучать atomic и некоторые другие вещи.

Есть ли хороший пример того, как изменить аналогичный код, чтобы сделать его быстрее?


person Noldor130884    schedule 18.07.2017    source источник
comment
Нет необходимости иметь общую переменную MaxU. Каждый поток должен найти максимум среди своего диапазона векторов, а затем вам нужно найти максимум среди результатов потока.   -  person user7860670    schedule 18.07.2017
comment
посмотри на std::atomic. Это еще один инструмент для синхронизации.   -  person OutOfBound    schedule 18.07.2017
comment
Ваш код не завершен, пожалуйста, опубликуйте полный пример, описывающий вашу проблему... В любом случае, вы можете избежать блокировки, используя атомарные операции.   -  person Martin Hierholzer    schedule 18.07.2017
comment
Рассматривали ли вы возможность использования combinable вместо MaxU? т. е. каждый поток имеет свое максимальное значение, которое позже объединяется с помощью max.   -  person Hasturkun    schedule 18.07.2017
comment
@VTT Ну, в конце концов, мне нужен один вектор, содержащий все данные И максимум для его нормализации.   -  person Noldor130884    schedule 18.07.2017


Ответы (1)


Как упоминалось в VTT, вы можете просто найти локальный максимум каждого потока и затем объединить их с помощью combinable:

Concurrency::combinable<double> CombinableMaxU;
Concurrency::parallel_for(0, 1801, [&](int i) {
    ...
        CombinableMaxU.local() = std::max(CombinableMaxU.local(), InterpU[i][j]);
}
MaxU = std::max(MaxU, CombinableMaxU.combine(std::max<double>));

Обратите внимание, что ваш текущий код на самом деле неправильный (если только MaxU не является атомарным), вы читаете MaxU вне блокировки, в то время как он может быть записан одновременно другими потоками. Как правило, вы не должны читать значение, которое записывается одновременно, если только обе стороны не защищены атомарной семантикой или блокировками и ограничениями памяти. Причина в том, что доступ к переменной вполне может состоять из нескольких обращений к памяти, в зависимости от того, как тип поддерживается аппаратно.

Но в вашем случае у вас даже есть классическое состояние гонки:

MaxU == 1
  Thread a                 |   Thread b
InterpU[i][j] = 3          | InterpU[i][j] = 2
if (3 > MaxU)              |  if (2 > MaxU)
SharedDataLock.lock();     | SharedDataLock.lock();
(gets the lock)            | (waiting for lock)
MaxU = 3                   | ...
SharedDataLock.unlock();   | ...
...                        | (gets the lock)
                           | MaxU = 2
                           | SharedDataLock.unlock();
MaxU == 2

Блокировать сложно.

Вы также можете использовать атомарный и вычислить максимум. Однако я предполагаю1, что он по-прежнему плохо работает внутри цикла2, а вне цикла не имеет значения, используете ли вы атомарные методы или блокировки.

1. Если сомневаетесь, не гадайте — измерьте!

2: То, что что-то атомарно и поддерживается аппаратно, не означает, что оно так же эффективно, как доступ к локальным данным. Во-первых, атомарные инструкции часто намного дороже, чем их неатомарные аналоги, во-вторых, вам приходится иметь дело с очень плохими эффектами кеша, потому что ядра/кэши будут бороться за владение данными. Хотя во многих случаях атомарность может быть более элегантной (не в этом, ИМХО), в большинстве случаев редукция выполняется быстрее.

person Zulan    schedule 18.07.2017
comment
Если я не ошибаюсь, вы должны обращаться и назначать CombinableMaxU.local() в цикле. Кроме того, вы, вероятно, можете закончить только с MaxU = CombinableMaxU.combine(std::max<double>);, предполагая, что его предыдущее значение не имеет значения. - person Hasturkun; 18.07.2017
comment
@Hasturkun забыл про .local(), спасибо. TBH Я не очень хорошо знаком с PPL, хотя он кажется довольно интуитивным. Я хотел, чтобы мой ответ был идиоматичным, а не общим ручным решением, которое я использовал до редактирования. Из вопроса я не знаю, имеет ли значение предыдущее значение MaxU, поэтому я просто использую семантически эквивалентный код. - person Zulan; 18.07.2017
comment
@Zulan, я не понимаю, почему текущий код неверен, поскольку независимо от того, что MaxU содержит абсолютный максимум ... мне на самом деле все равно, в каком порядке максимальное сравнивается с его предыдущим значением ... В любом случае, Ваш ответ очень ценен и выглядит именно так, как я искал. Спасибо, что научили меня чему-то новому! - person Noldor130884; 18.07.2017
comment
@ Noldor130884 Noldor130884 Я добавил в свой ответ более подробное объяснение различных причин, по которым ваш текущий код неверен. - person Zulan; 18.07.2017
comment
@Zulan, извините за беспокойство, но ни MaxU = CombinableMaxU.combine(std::max<double>);, ни MaxU = std::max(MaxU, CombinableMaxU.combine(std::max<double>)); не работают. Я что-то пропустил? Как мне объявить MaxU? - person Noldor130884; 18.07.2017
comment
Каким образом они не работают? Вы не показали, как объявляется MaxU, но я предположил double. - person Zulan; 18.07.2017
comment
Да, это так: забавная вещь, если я пишу плюс вместо макс, это работает. Пишет, что аргументы функции не те, что ожидались - person Noldor130884; 18.07.2017
comment
Есть шанс, что у вас есть лишняя пара ()? Это combine(std::max<double>), но combine(std::min<double>()). Хотя я не могу проверить себя, это по сообщениям работает, хотя иногда рекомендуется combine([](double left, double right){ return std::max<double>left, right);}). - person Zulan; 18.07.2017
comment
Давайте продолжим обсуждение в чате. - person Noldor130884; 18.07.2017
comment
Решил это, используя функцию max шаблона, созданную мной: - person Noldor130884; 18.07.2017