Шаблон производитель / потребитель с очередью FIFO фиксированного размера

Мне нужно реализовать шаблон производитель / потребитель вокруг очереди FIFO фиксированного размера. Я думаю, что класс-оболочка вокруг ConcurrentQueue может сработать для этого, но я не совсем уверен (и я никогда раньше не работал с ConcurrentQueue). Дело в том, что очередь должна содержать только фиксированное количество элементов (в моем случае строки). В моем приложении будет одна задача / поток производителя и одна задача / поток потребителя. Когда моя задача потребителя запускается, ей необходимо исключить из очереди все элементы, которые существуют в очереди в данный момент времени, и обработать их.

Как бы то ни было, обработка стоящих в очереди элементов моим потребителем - это не что иное, как загрузка их через SOAP в веб-приложение, которое не на 100% надежно. Если соединение не может быть установлено или вызов SOAP завершается неудачно, я должен отбросить эти элементы и вернуться в очередь за дополнительными. Из-за накладных расходов на SOAP я пытался максимизировать количество элементов из очереди, которые я мог бы отправить за один вызов SOAP.

Иногда мой производитель может добавлять элементы быстрее, чем мой потребитель может их удалить и обработать. Если очередь уже заполнена, и моему производителю нужно добавить еще один элемент, мне нужно поставить новый элемент в очередь, но затем удалить самый старый элемент, чтобы размер очереди оставался фиксированным. По сути, мне нужно постоянно хранить в очереди самые последние созданные элементы (даже если это означает, что некоторые элементы не потребляются, потому что мой потребитель в настоящее время обрабатывает предыдущие элементы).

Что касается производителя, сохраняющего количество фиксированных элементов в очереди, я нашел одну потенциальную идею из этого вопроса:

Очередь фиксированного размера, которая автоматически удаляет старые значения из очереди на новые enques

В настоящее время я использую класс-оболочку (на основе этого ответа) вокруг ConcurrentQueue с таким методом Enqueue ():

public class FixedSizeQueue<T>
{
    readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    public int Size { get; private set; }

    public FixedSizeQueue(int size)
    {
        Size = size;
    }

    public void Enqueue(T obj)
    {
        // add item to the queue
        queue.Enqueue(obj);

        lock (this) // lock queue so that queue.Count is reliable
        {
            while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
            {
                T objOut;
                queue.TryDequeue(out objOut);
            }
        }
    }
}

Я создаю экземпляр этого класса с таким ограничением размера очереди:

FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit

Я запускаю свою задачу производителя, и она начинает заполнять очередь. Код в моем методе Enqueue (), кажется, работает правильно в отношении удаления самого старого элемента из очереди, когда добавление элемента приводит к тому, что количество очереди превышает максимальный размер. Теперь мне нужно, чтобы моя потребительская задача выводила элементы из очереди и обрабатывала их, но здесь мой мозг запутался. Как лучше всего реализовать метод Dequeue для моего потребителя, который будет делать снимок очереди в определенный момент времени и выводить из очереди все элементы для обработки (производитель может все еще добавлять элементы в очередь во время этого процесса)?


person bmt22033    schedule 13.09.2012    source источник
comment
Почему вас беспокоит снимок? В чем проблема с одновременным добавлением элементов? Если вы беспокоитесь, что можете захватить слишком много, остановитесь на «Размер» у потребителя.   -  person paparazzo    schedule 13.09.2012


Ответы (2)


Проще говоря, ConcurrentQueue имеет метод ToArray, который при вводе блокирует коллекцию и создает «моментальный снимок» всех текущих элементов в очереди. Если вы хотите, чтобы вашему потребителю был предоставлен блок вещей для работы, вы можете заблокировать тот же объект, что и метод постановки в очередь, вызовите ToArray (), а затем прокрутите цикл while(!queue.IsEmpty) queue.TryDequeue(out trash) для очистки очереди, прежде чем возвращать массив, который вы извлекли. .

Это будет ваш GetAll() метод:

public T[] GetAll()
{
    lock (syncObj) // so that we don't clear items we didn't get with ToArray()
    {
        var result = queue.ToArray();
        T trash;
        while(!queue.IsEmpty) queue.TryDequeue(out trash);
    }
}

Поскольку вам нужно очистить очередь, вы можете просто объединить две операции; создайте массив надлежащего размера (используя queue.Count), затем, пока очередь не пуста, удалите элемент из очереди и поместите его в массив перед возвратом.

Вот ответ на конкретный вопрос. Теперь я должен с чистой совестью надеть шляпу CodeReview.SE и указать на несколько вещей:

  • НИКОГДА не используйте lock(this). Вы никогда не знаете, какие другие объекты могут использовать ваш объект как блокирующий фокус и, таким образом, будут заблокированы, когда объект блокируется изнутри. Лучше всего заблокировать экземпляр объекта с частной областью видимости, обычно созданный только для блокировки: private readonly object syncObj = new object();

  • Поскольку вы все равно блокируете критические разделы своей оболочки, я бы использовал обычный List<T> вместо параллельной коллекции. Доступ быстрее, его легче очистить, поэтому вы сможете делать то, что делаете, гораздо проще, чем позволяет ConcurrentQueue. Для постановки в очередь заблокируйте объект синхронизации, вставьте () перед нулевым индексом, затем удалите все элементы из размера индекса до текущего счетчика списка с помощью RemoveRange (). Чтобы исключить из очереди, заблокируйте тот же объект синхронизации, вызовите myList.ToArray () (из пространства имен Linq; делает почти то же самое, что и ConcurrentQueue), а затем вызовите myList.Clear () перед возвратом массива. Не может быть проще:

    public class FixedSizeQueue<T>
    {
    private readonly List<T> queue = new List<T>();
    private readonly object syncObj = new object();
    
    public int Size { get; private set; }
    
    public FixedSizeQueue(int size) { Size = size; }
    
    public void Enqueue(T obj)
    {
        lock (syncObj)
        {
            queue.Insert(0,obj)
            if(queue.Count > Size) 
               queue.RemoveRange(Size, Count-Size);
        }
    }
    
    public T[] Dequeue()
    {
        lock (syncObj)
        {
            var result = queue.ToArray();
            queue.Clear();
            return result;
        }
    }
    }
    
  • Кажется, вы понимаете, что с помощью этой модели вы выбрасываете элементы из очереди. Обычно это нехорошо, но я хочу дать вам преимущество в сомнениях. Однако я скажу, что есть способ добиться этого без потерь, используя BlockingCollection. BlockingCollection охватывает любую коллекцию IProducerConsumerCollection, включая большинство классов System.Collections.Concurrent, и позволяет указать максимальную емкость для очереди. Затем коллекция будет блокировать любой поток, пытающийся выйти из очереди из пустой очереди, или любой поток, пытающийся добавить в полную очередь, до тех пор, пока элементы не будут добавлены или удалены, чтобы было что получить или место для вставки. Это лучший способ реализовать очередь производитель-потребитель с максимальным размером или такую, которая в противном случае потребовала бы «опроса», чтобы увидеть, есть ли у потребителя над чем поработать. Если вы пойдете по этому маршруту, выбрасываются только те, которые потребитель должен выбросить; потребитель увидит все строки, добавленные производителем, и примет собственное решение по каждой.

person KeithS    schedule 13.09.2012
comment
Привет, Кит. Большое спасибо за отличный ответ. Я никогда не думал об использовании списка, вероятно, потому, что способ хранения элементов казался мне обратным, но это отличное решение. Я реализовал это сейчас, и он работает очень хорошо. Re: выбрасывая элементы из очереди, я согласен, что это, конечно, не то, что вы ожидали, но это требование, которое мне дали (на самом деле для этого есть причина, но не то, о чем я могу подробно рассказывать). Еще раз спасибо за отличный ответ. - person bmt22033; 14.09.2012

Вы не хотите использовать lock с this. Дополнительные сведения см. В разделе Почему блокировка (это) {…} плохо?.

Этот код

// if queue count > max queue size, then dequeue an item
while (queue.Count > Size) 
{
    T objOut;
    queue.TryDequeue(out objOut);
}

предполагает, что вам нужно как-то подождать или уведомить потребителя о доступности товара. В этом случае рассмотрите возможность использования вместо этого BlockingCollection<T>.

person oleksii    schedule 13.09.2012
comment
Он, как он говорит, блокирует критические секции, потому что выполняет несколько операций одновременно; ConcurrentQueue заблокирует коллекцию, сделает все возможное, чтобы добавить или удалить один элемент, а затем снимет блокировку. Это означает, что если производитель пытается добавить, в то время как потребитель пытается очистить элементы, будут потеряны вещи, которых не должно быть. Все это говорит о том, что я тоже не согласен с его образцом. - person KeithS; 13.09.2012