TaskScheduler с проблемой очереди и времени ожидания

Что я пытаюсь реализовать:

Планировщик задач, который ставит задачи в очередь и выполняет заданное количество задач параллельно, в то время как другие ждут в очереди, чтобы начать работу. У каждой задачи есть время ожидания, которое начинает отсчитываться при запуске задачи, и если оно превышает это время, задача отменяется и генерирует исключение TimeoutException, которое обрабатывается в ContinueWith (или какой-либо задаче, которая запускается сразу после нее). Задачи должны быть отменены пользователем.

Что я получаю:

Когда первая задача терпит неудачу, все остальные тоже мгновенно терпят неудачу.

Вот полный код моего планировщика заданий (взято из MSDN с некоторыми изменениями):

http://pastebin.com/KSMbDTH5. (рассматриваемая функция находится в строке 161)

Вот пример использования:

var taskTokens = new List<CancellationToken>();
var factory = new TaskScheduleFactory(new ParallelOptions() { MaxDegreeOfParallelism = 1 }); //for the purpose of testing, supposed to work and with higher values
for (var i = 0; i < 10; i++)
{
    //TaskScheduleFactory.cs, line 161
    var taskToken = factory.Add(
        (token) => //Task
        {
            Console.WriteLine("Start");
            Thread.Sleep(5000);
            if (token.IsCancellationRequested) return; //Cancelled by timeout
            Console.WriteLine("This should not print");
        },
        (task) => //ContinueWith
        {
            if (task.IsFaulted)
            {
                Console.WriteLine("Fail");
            }
            else if (!task.IsCompleted)
            {
                Console.WriteLine("Not completed");
            }
            else Console.WriteLine("Done");
        },
        2000 //Timeout
    );
    taskTokens.Add(taskToken);
}

Как это должно работать: (мы инициируем событие тайм-аута через 2 секунды, чтобы ни одна задача не завершилась)

Для MaxDegreeOfParallelism = 1:

Start;
(Wait 2sec)
Fail;
Start;
(Wait 2sec)
Fail;
....

Для MaxDegreeOfParallelism = 2:

Start;
Start;
(Wait 2sec)
Fail;
Fail;
Start;
Start;
(Wait 2sec)
Fail;
Fail;
....

Как это работает:

Start;
(Wait 2sec)
Fail;
Fail;
Fail;
Fail;
...

(для MaxDegreeOfParallelism = 1 остальные тоже бардак)

Примечание: это мои первые шаги в TPL, так что извините за глупость с моей стороны.


person Arvigeus    schedule 02.03.2016    source источник


Ответы (2)


В Add вы звоните .TimeoutAfter(timeoutInMilliseconds, cts). Это означает, что тайм-аут начинается, как только вы создаете задачу. Я думаю, вы намеревались запустить тайм-аут только тогда, когда задача начнет выполняться.

Поскольку первая задача занимает больше времени, чем тайм-аут, все остальные задачи уже истекают, когда они получают свою очередь.

person usr    schedule 02.03.2016
comment
Хорошая точка зрения. Я переместил TimeoutAfter и ContinueWith в тело действия, но все еще пытаюсь заставить его работать правильно. Не могли бы вы подробнее рассказать, как это исправить? - person Arvigeus; 02.03.2016
comment
Вы написали довольно сложный код. Я думал, что вы можете исправить это :) Что-то вроде этого должно работать: var workTask = Instance.StartNew(() => { cts.CancelAfter(timeoutInMilliseconds); action(cts.Token); }, cts.Token); var resultTask = Task.WhenAll(workTask, cts.Token);. Это может завершить resultTask в состоянии, которое вам не нравится (я думаю, оно в конечном итоге будет отменено). Если вам нужен больший контроль, вы можете добиться любого поведения завершения с помощью TaskCompletionSource. - person usr; 02.03.2016
comment
Хе-хе, я упоминал ранее, что скопировал большую часть кода из примера MSDN по этой теме. Единственная часть, которую я написал, это та, которая на самом деле не работает. Спасибо за помощь! Меня не будет несколько дней, поэтому потребуется некоторое время, чтобы проверить его и отметить как ответ (за исключением случаев, когда моя глупость мешает мне правильно его реализовать). Удачи и отличной недели! - person Arvigeus; 02.03.2016
comment
Что-то не в порядке. Во-первых, Task.WhenAll не принимает CancellationTokesn в качестве второго параметра. И я не вижу ContinueWith. Извините, что так нахально... - person Arvigeus; 04.03.2016
comment
Хорошо, я не на 100% знаком с этой областью TPL. Это выглядит хорошо: stackoverflow.com/a/27240225/122718 - person usr; 04.03.2016
comment
Я вижу вашу точку зрения. К сожалению, таким образом задача не удаляется из активных задач, а TaskScheduler блокируется для новых задач. Я думаю, что нашел решение, используя встроенную задачу (не уверен, насколько правильно это использовать): Instance.StartNew(() => { cts.CancelAfter(timeoutInMilliseconds); Task.Factory.StartNew(() => action(cts.Token), cts.Token, TaskCreationOptions.AttachedToParent|TaskCreationOptions.LongRunning, TaskScheduler.Default); }, cts.Token); - person Arvigeus; 04.03.2016
comment
Это должно сработать. Я думаю, что прикрепленные задачи — это довольно неуклюжий шаблон, поэтому я всегда стараюсь явно создавать зависимости задач. Другая идея: я думаю, вам вообще не нужен TaskScheduler. Вы можете заставить задачи вводить SemaphoreSlim. Я считаю, что этот подход к планированию задач часто проще и работает с асинхронными задачами, которые не основаны исключительно на процессоре. - person usr; 04.03.2016
comment
SemaphoreSlim выглядит как хорошая альтернатива, но не подходит для моего случая, потому что не гарантирует порядок выполнения и в конечном итоге может привести к зависанию задачи (в моем случае это весьма правдоподобно). ОГРОМНОЕ СПАСИБО за ваше время и усилия! К сожалению, мне придется ответить на свой вопрос (кроме того, если кто-то еще наткнется позже на эту тему, чтобы увидеть полное решение) - person Arvigeus; 05.03.2016
comment
Хорошо, я посмотрю и, возможно, дам отзыв на этот ответ. - person usr; 05.03.2016

Не идеальное решение, но лучшее, что я мог придумать:

public CancellationTokenSource Add(Action<CancellationToken> action, Action<Task> callback, int timeoutInMilliseconds)
{
    var cts = new CancellationTokenSource();
    Instance.StartNew(() =>
    {
        cts.CancelAfter(timeoutInMilliseconds);
        var task = Task.Factory.StartNew(() => action(cts.Token), cts.Token, TaskCreationOptions.AttachedToParent|TaskCreationOptions.LongRunning, TaskScheduler.Default);
        try
        {
            task.Wait(timeoutInMilliseconds, cts.Token);
        }
        catch (OperationCanceledException) { }
        callback(task);
    }, cts.Token);
    return cts;
}

Вкратце: когда задача запускается из очереди, она создает дочернюю задачу в планировщике задач по умолчанию (потому что другой просто ставит ее в очередь), затем мы ждем определенное время и вызываем функцию обратного вызова. Не знаю, насколько это плохо.

person Arvigeus    schedule 07.03.2016