Потребитель/Производитель с порядком и ограничением на потребляемые товары

у меня следующий сценарий

  • I am writing a server that process files (jobs)
    • a file has a "prefix" and a time
    • файлы должны обрабатываться по времени (сначала старые файлы), но также учитывать префикс (файлы с одинаковым префиксом не могут обрабатываться одновременно)
  • У меня есть поток (задача с таймером), который наблюдает за каталогом и добавляет файлы в «очередь» (производитель)
  • I have several consumers that take the file from "queue" (consumer) - they should conform to the above rules.
    • the job of each task is kept in some list (this indicates the constraints)
  • Потребителей несколько, количество потребителей определяется при запуске.

Одним из требований является возможность корректно останавливать потребителей (немедленно или позволить текущим процессам завершиться).

Я сделал что-то в этом направлении:

while (processing)
{
    //limits number of concurrent tasks
    _processingSemaphore.Wait(queueCancellationToken);  
    //Take next job when available or wait for cancel signal
    currentwork = workQueue.Take(taskCancellationToken);

    //check that it can actually process this work
    if (CanProcess(currnetWork)
    { 
        var task = CreateTask(currentwork)
        task.ContinueWith((t) => { //release processing slot });
    }
    else
       //release slot, return job? something else?
 }

Источники токенов отмены находятся в коде вызывающей стороны и могут быть отменены. Их два, чтобы можно было остановить очередь, не отменяя запущенные задачи.

Я устал реализовывать «очередь» как BlockingCollection, обертывающую «безопасный» SortedSet. Общая идея работает (упорядочение по времени), за исключением случая, когда мне нужно найти новую работу, соответствующую ограничению. Если я верну задание в очередь и попытаюсь взять еще раз, то получу такое же.

Можно брать задания из очереди, пока я не найду подходящее, а затем вернуть «незаконные» задания обратно, но это может вызвать проблемы с другими потребителями, обрабатывающими неупорядоченные задания.

Другая альтернатива — передать простую коллекцию и способ ее блокировки, а затем просто заблокировать и выполнить простой поиск в соответствии с текущими ограничениями. Опять же, это означает написание кода, который, возможно, не будет потокобезопасным.

Любые другие предложения/указатели/структуры данных, которые могут помочь?


person Tomer Cagan    schedule 11.12.2012    source источник
comment
Самое раннее такое задание может быть обработано, когда первое задание было завершено. Таким образом, рабочий может также обработать следующий. Поэтому поместите в очередь список заданий, а не только задания, где каждое задание в списке имеет один и тот же префикс.   -  person Hans Passant    schedule 11.12.2012
comment
@HansPassant - я думал об этом - иметь некоторую локальную очередь для каждого процесса, но не уверен, что это не на 100% правильно - при такой же потребительской очереди следующее задание с тем же префиксом может нарушить требование обработки по времени - мне нужно подумать это через, хотя и посмотрите, работает ли он со всеми ограничениями. В любом случае - спасибо за участие.   -  person Tomer Cagan    schedule 12.12.2012
comment
@HansPassant - то, что вы указали, действительно правильно. В итоге я реализовал локальную очередь в каждом специализированном потребителе и поставил в очередь дополнительные задания локально в своих очередях. В случае, если потребитель пропустил файл в очереди (например, задание было добавлено в промежутке времени, когда оно было выполнено после того, как потребитель существует в цикле обработки), он возвращается в общую очередь и обрабатывается любым доступным потребителем. Могу ли я пометить комментарий как ответ?   -  person Tomer Cagan    schedule 21.12.2012
comment
@TomerCagan, пожалуйста, посмотрите мой отредактированный ответ. Надеюсь, это поможет упростить ваше решение.   -  person Francois Nel    schedule 08.01.2013


Ответы (2)


Я думаю, Ганс прав: если у вас уже есть потокобезопасный SortedSet (который реализует IProducerConsumerCollection, поэтому его можно использовать в BlockingCollection), то все, что вам нужно, это положить в коллекцию только файлы, которые можно обработать прямо сейчас. Если вы закончите работу над файлом, который сделает другой файл доступным для обработки, добавьте этот файл в коллекцию в этот момент, а не раньше.

person svick    schedule 14.12.2012

Я бы реализовал ваши требования с помощью потока данных TPL. Посмотрите, как можно реализовать с его помощью шаблон "производитель-потребитель". Я считаю, что это удовлетворит все ваши требования (включая отмену для потребителей).

EDIT (для тех, кто не любит читать документацию, но кто любит...)

Ниже приведен пример реализации требований с помощью потока данных TPL. Прелесть этой реализации в том, что потребители не привязаны к одному потоку и используют поток пула только тогда, когда ему нужно обработать данные.

    static void Main(string[] args)
    {
        BufferBlock<string> source = new BufferBlock<string>();
        var cancellation = new CancellationTokenSource();
        LinkConsumer(source, "A", cancellation.Token);
        LinkConsumer(source, "B", cancellation.Token);
        LinkConsumer(source, "C", cancellation.Token);

        // Link an action that will process source values that are not processed by other 
        source.LinkTo(new ActionBlock<string>((s) => Console.WriteLine("Default action")));

        while (cancellation.IsCancellationRequested == false)
        {
            ConsoleKey key = Console.ReadKey(true).Key;
            switch (key)
            {
                case ConsoleKey.Escape:
                    cancellation.Cancel();
                    break;
                default:
                    Console.WriteLine("Posted value {0} on thread {1}.", key, Thread.CurrentThread.ManagedThreadId);
                    source.Post(key.ToString());
                    break;
            }
        }

        source.Complete();
        Console.WriteLine("Done.");
        Console.ReadLine();
    }

    private static void LinkConsumer(ISourceBlock<string> source, string prefix, CancellationToken token)
    {
        // Link a consumer that will buffer and process all input of the specified prefix
        var consumer = new ActionBlock<string>(new Action<string>(Process), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1, SingleProducerConstrained = true, CancellationToken = token, TaskScheduler = TaskScheduler.Default });
        var linkDisposable = source.LinkTo(consumer, (p) => p == prefix);

        // Dispose the link (remove the link) when cancellation is requested.
        token.Register(linkDisposable.Dispose);
    }

    private static void Process(string arg)
    {
        Console.WriteLine("Processed value {0} in thread {1}", arg, Thread.CurrentThread.ManagedThreadId);

        // Simulate work
        Thread.Sleep(500);
    }
person Francois Nel    schedule 14.12.2012
comment
Это никак не объясняет, как решить рассматриваемую проблему. - person svick; 15.12.2012
comment
@svick В конце был задан вопрос. Любые другие предложения / указатели / структуры данных, которые могут помочь? и это именно то, что я дал ему. Использование Dataflow соответствует требованиям, но только в том случае, если вы потрудитесь прочитать предоставленные мной ссылки на документацию. На мой взгляд, ответы не должны ни кормить с ложки, ни повторно документировать что-либо, а только давать рекомендации или решать недокументированные проблемы. - person Francois Nel; 15.12.2012
comment
Но TPL Dataflow не содержит ничего, что могло бы напрямую решить рассматриваемую проблему. Это не простой сценарий производитель-потребитель, поэтому одни только ваши ссылки не будут очень полезны. - person svick; 15.12.2012
comment
@svick на самом деле решает проблему (и более эффективно). Извините за поздний ответ, но я не люблю работать в праздники. Пожалуйста, смотрите мое редактирование. - person Francois Nel; 08.01.2013
comment
Но если вы делаете это так и имеете что-то вроде aaaaab, то b не будет обрабатываться, пока обрабатываются первые четыре a, поэтому это может быть очень неэффективно. Вот почему я говорил, что «использовать поток данных TPL» не очень полезно: в нем не сказано, как именно вы должны его использовать. И ответить на него не тривиально. - person svick; 08.01.2013
comment
@svick, что касается вопроса, предполагается, что каждому файлу дается фиксированный префикс, поэтому каждый префикс обрабатывается своим собственным каналом (каждый канал работает одновременно; в своем собственном потоке пула). И если вы на самом деле запустите пример, вы увидите, что если у вас есть случай aaaab, первые 4 as будут обрабатываться одновременно с b (as будут обрабатываться в том же порядке, в котором они обрабатываются в том же канале, точно так же, как и запрошенный вопрос). - person Francois Nel; 09.01.2013
comment
Вы правы, я был неправ, он будет работать правильно. По какой-то причине я предполагал, что ваши потребители установили BoundedCapacity, я не знаю, почему. - person svick; 09.01.2013
comment
Интересный подход и хороший пример. Мысли немного нестандартно (или нестандартно ;-). Кажется, это отвечает требованию (хотя мне нужно больше времени, чтобы переварить). - person Tomer Cagan; 15.01.2013