Parallel.Foreach С# Функция паузы и остановки?

Каков был бы наиболее эффективный способ приостановить и остановить (до того, как он закончится) parallel.foreach?

Parallel.ForEach(list, (item) =>
{
    doStuff(item);
});

person bbrez1    schedule 11.12.2011    source источник
comment
Пауза? Почему вы хотите поставить его на паузу?   -  person Damien_The_Unbeliever    schedule 11.12.2011
comment
Пауза — это просто дополнительная функция, которую я хотел добавить, если это возможно, конечно. Я пишу программу, которая сильно загружает процессор и подключение к Интернету и может быстро ее заполнить, поэтому пауза может быть не такой уж плохой. Я просто реализую функцию остановки и сохраню список незавершенных элементов в другой список, а затем при следующем запуске перепишу этот список. И продолжить с того места, где остановилось. Вроде паузы. Спасибо.   -  person bbrez1    schedule 11.12.2011
comment
Если один из ответов дал вам решение, пометьте этот ответ как принятый.   -  person Scott Chamberlain    schedule 12.12.2011


Ответы (2)


У Damien_The_Unbeliver есть хороший метод, но это только в том случае, если вы хотите, чтобы какой-то внешний процесс остановил цикл. Если вы хотите, чтобы цикл прерывался, как при использовании break в обычном цикле for или foreach, вам нужно будет использовать перегрузка с ParallelLoopState в качестве одного из параметров тела цикла. ParallelLoopState имеет две функции, относящиеся к тому, что вы хотите сделать: Stop() и Break().

Функция Stop() остановит обработку элементов при первой же возможности, что означает, что после вызова Stop() могут быть выполнены дополнительные итерации, и не гарантируется, что элементы, предшествующие элементу, на котором вы остановились, имеют даже начали обрабатывать.

Функция Break() работает точно так же, как Stop(), однако она также оценивает все элементы IEnumerable, которые были до элемента, для которого вы вызвали Break(). Это полезно, когда вам все равно, в каком порядке обрабатываются элементы, но вы должны обработать все элементы до момента остановки.

Проверьте ParallelLoopResult, возвращенный из foreach в посмотрите, остановился ли foreach раньше, и если вы использовали Break(), какой элемент с наименьшим номером он обработал.

Parallel.ForEach(list, (item, loopState) =>
    {
        bool endEarly = doStuff(item);
        if(endEarly)
        {
            loopState.Break();
        }
    }
    );
//Equivalent to the following non parallel version, except that if doStuff ends early
//    it may or may not processed some items in the list after the break.
foreach(var item in list)
{
    bool endEarly = doStuff(item);
    if(endEarly)
    {
        break;
    }
}

Вот более практический пример

static bool[] list = new int[]{false, false, true, false, true, false};

long LowestElementTrue()
{
    ParallelLoopResult result = Parallel.ForEach(list, (element, loopState) =>
    {
        if(element)
            loopState.Break();
    }
    if(result.LowestBreakIteration.IsNull)
        return -1;
    else
        return result.LowestBreakIteration.Value;
}   

Независимо от того, как он разделяет работу, он всегда будет возвращать 2 в качестве ответа.

скажем, процессор отправляет два потока для обработки этого, первый поток обрабатывает элементы 0-2, а второй поток обрабатывает элементы 3-5.

Thread 1:                Thread 2
0, False, continue next  3, False, continue next
1, False, continue next  4, True, Break
2, True, Break           5, Don't process Broke

Теперь самый низкий индекс, из которого вызывался Break, был равен 2, поэтому ParallelLoopResult.LowestBreakIteration каждый раз будет возвращать 2, независимо от того, как разбиты потоки, поскольку он всегда будет обрабатываться до числа 2.

Вот пример того, как можно использовать Stop.

static bool[] list = new int[]{false, false, true,  false, true, false};

long FirstElementFoundTrue()
{
    long currentIndex = -1;
    ParallelLoopResult result = Parallel.ForEach(list, (element, loopState, index) =>
    {
        if(element)
        {
             loopState.Stop();

             //index is a 64 bit number, to make it a atomic write
             // on 32 bit machines you must either:
             //   1. Target 64 bit only and not allow 32 bit machines.
             //   2. Cast the number to 32 bit.
             //   3. Use one of the Interlocked methods.
             Interlocked.Exchange (ref currentIndex , index);
        }
    }
    return currentIndex;
}   

В зависимости от того, как он разделяет работу, он вернет либо 2, либо 4 в качестве ответа.

скажем, процессор отправляет два потока для обработки этого, первый поток обрабатывает элементы 0-2, а второй поток обрабатывает элементы 3-5.

Thread 1:                 Thread 2
0, False, continue next    3, False, continue next
1, False, continue next    4, True, Stop
2, Don't process, Stopped  5, Don't process, Stopped

В этом случае он вернет 4 в качестве ответа. Давайте посмотрим на тот же процесс, но если он обрабатывает каждый второй элемент вместо 0-2 и 3-5.

Thread 1:                   Thread 2
0, False, continue next     1, False, continue next
2, True, Stop               3, False, continue next
4, Don't process, Stopped   5, Don't process, Stopped

На этот раз он вернет 2 вместо 4.

person Scott Chamberlain    schedule 11.12.2011

Чтобы остановить Parallel.ForEach, вы можете использовать одну из перегрузок, которая принимает ссылку ParallelOptions и включите CancellationToken в этих параметрах.

Дополнительные сведения см. в разделе Отмена.

Что касается паузы, я вообще не понимаю, зачем вам это нужно. Возможно, вы ищете Barrier (который используется для координации усилий между несколькими потоками, скажем, если им всем нужно завершить часть A, прежде чем переходить к части B), но я бы не подумал, что вы будете использовать это с Parallel.ForEach, поскольку вы не знаете, сколько участников будет .

person Damien_The_Unbeliever    schedule 11.12.2011