Время ожидания действия в итерации Parallel.ForEach

У меня есть что-то подобное в моем коде:

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    Process(item);
});

Дело в том, что я делаю кучу вещей внутри метода Process() (подключаюсь к общей папке, анализирую файл, сохраняю в БД и т. д.), и я беспокоюсь, что во время этого процесса что-то может пойти не так, и итерация никогда не завершится...< strong>Может ли это когда-нибудь произойти?

Есть ли способ установить тайм-аут для метода Process(), чтобы избежать появления потоков-зомби?

ОБНОВЛЕНИЕ:

Самый простой способ, который я нашел для установки тайм-аута, — добавить миллисекунды к CancellationTokenSource или вызвать метод Wait() для задачи.

Опция 1

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    var cts = new CancellationTokenSource(2000);
    Task task = Task.Factory.StartNew(() => Process(item), cts.Token);
});

Вариант №2

Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
{
    Task task = Task.Factory.StartNew(() => Process(item));
    task.Wait(2000);
});

Проблема в том, что ни один из этих вариантов не может отменить метод Process(). Нужно ли что-то проверять в методе Process()?


person Santiago Aceñolaza    schedule 25.03.2014    source источник
comment
может ли это когда-нибудь случиться? Абсолютно... Если у вас есть процесс, который никогда не завершается (ожидание ввода-вывода, который никогда не происходит, бесконечный цикл и т. д.), то цикл никогда не завершится.   -  person poy    schedule 26.03.2014
comment
Поэтому мне нужно найти способ избежать бесконечных итераций... может быть, окружив вызов метода Process с помощью Threading.Timer?   -  person Santiago Aceñolaza    schedule 26.03.2014
comment
Вы застряли на .NET 4 или можете перейти на .NET 4.5? Если вы можете, то у вас есть лучшие варианты.   -  person poy    schedule 26.03.2014
comment
Я использую .NET 4.5... любые идеи приветствуются!   -  person Santiago Aceñolaza    schedule 26.03.2014
comment
Просто интересно, почему вы создаете задачу внутри Parallel.ForEach? Я думал, что Parallel.ForEach уже делает это за вас...   -  person usefulBee    schedule 15.10.2014
comment
@usefulBee просто для того, чтобы передать CancellationToken для отмены цикла из внутренней функции   -  person Santiago Aceñolaza    schedule 16.10.2014
comment
Существует Task.WaitAll перегрузок с параметрами тайм-аута и токена отмены: docs.microsoft.com/en-us/dotnet/api/   -  person Trass3r    schedule 12.01.2021


Ответы (3)


Рассмотрите возможность добавления CancellationToken в свой код. Таким образом, в любой момент вы можете корректно отменить все операции.

Затем вы можете использовать метод CancelAfter().

person poy    schedule 25.03.2014
comment
Это немного избыточно: CancellationTokenSource имеет метод CancelAfter, а также принимает тайм-аут в одном из своих переопределений конструктора. - person noseratio; 26.03.2014
comment
Очень верно! Ответ скорректирован. - person poy; 26.03.2014
comment
К сожалению, вы можете сохранить обе версии, извините :) Похоже, что OP использует .NET 4.0, в котором нет ни одной из них. Вы также можете добавить, что ему нужно будет наблюдать за отменой каждой единицы работы внутри параллельного цикла: stackoverflow.com/a /22549046/1768303 - person noseratio; 26.03.2014
comment
Я изменил свои вопросы, потому что понял, что они недостаточно ясны. - person Santiago Aceñolaza; 26.03.2014

В итоге я совместил оба варианта. Это работает, но я не знаю, правильный ли это способ сделать это.

Решение:

        Parallel.ForEach(myList, new ParallelOptions { MaxDegreeOfParallelism = 4 }, item =>
        {
                var tokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
                var token = tokenSource.Token;

                Task task = Task.Factory.StartNew(() => Process(item, token), token);
                task.Wait();
        });

и в Process() я несколько раз проверяю отмену:

    private void Process(MyItem item, CancellationToken token)
    {
        try
        {
            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...sentences

            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...more sentences

            if (token.IsCancellationRequested)
                token.ThrowIfCancellationRequested();

            ...etc
        }
        catch(Exception ex)
            Console.WriteLine("Operation cancelled");
person Santiago Aceñolaza    schedule 26.03.2014
comment
Есть много вещей, которые мне не нравятся в этой реализации, но принудительное выполнение каждой итерации метода ForEach в отдельной задаче — это то, что я нашел самым уродливым. - person Santiago Aceñolaza; 27.03.2014

Я закончил с немного другой реализацией. В моем случае проверка CancellationToken внутри Process() не сработает из-за потенциально длительных операторов между проверками. Например, если бы мой тайм-аут был 5 секунд, а один оператор занял, скажем, 100 секунд ... я бы не знал, пока этот оператор не завершится, а затем не будет обнаружен if (token.IsCancellationRequested).

Вот что я в итоге сделал

     Parallel.ForEach(myList, (item) =>
     {
         Task task = Task.Factory.StartNew(() =>
         {
             Process(item));
         });

         if(task.Wait(10000)) // Specify overall timeout for Process() here
             Console.WriteLine("Didn't Time out. All's good!"); 
         else
             Console.WriteLine("Timed out. Leave this one and continue with the rest"); 
     });

Затем в методе Process() я добавил дополнительные проверки потенциально длительных операторов, чтобы он мог изящно обрабатывать тайм-ауты (насколько это возможно). Так что только в худшем случае Process() пришлось преждевременно остановить с помощью Task.Wait() выше.

    private void Process(MyItem item)
    {
      ...
        cmd.CommandTimeout = 5; // The total of timeouts within Process() were 
                                // set to be less than the total Task.Wait duration.
                                // Unfortunately some potentially long running methods 
                                // don't have a built in timeout.
      ...
    }
person perNalin    schedule 12.09.2014