Мне нужно реализовать шаблон производитель / потребитель вокруг очереди FIFO фиксированного размера. Я думаю, что класс-оболочка вокруг ConcurrentQueue может сработать для этого, но я не совсем уверен (и я никогда раньше не работал с ConcurrentQueue). Дело в том, что очередь должна содержать только фиксированное количество элементов (в моем случае строки). В моем приложении будет одна задача / поток производителя и одна задача / поток потребителя. Когда моя задача потребителя запускается, ей необходимо исключить из очереди все элементы, которые существуют в очереди в данный момент времени, и обработать их.
Как бы то ни было, обработка стоящих в очереди элементов моим потребителем - это не что иное, как загрузка их через SOAP в веб-приложение, которое не на 100% надежно. Если соединение не может быть установлено или вызов SOAP завершается неудачно, я должен отбросить эти элементы и вернуться в очередь за дополнительными. Из-за накладных расходов на SOAP я пытался максимизировать количество элементов из очереди, которые я мог бы отправить за один вызов SOAP.
Иногда мой производитель может добавлять элементы быстрее, чем мой потребитель может их удалить и обработать. Если очередь уже заполнена, и моему производителю нужно добавить еще один элемент, мне нужно поставить новый элемент в очередь, но затем удалить самый старый элемент, чтобы размер очереди оставался фиксированным. По сути, мне нужно постоянно хранить в очереди самые последние созданные элементы (даже если это означает, что некоторые элементы не потребляются, потому что мой потребитель в настоящее время обрабатывает предыдущие элементы).
Что касается производителя, сохраняющего количество фиксированных элементов в очереди, я нашел одну потенциальную идею из этого вопроса:
В настоящее время я использую класс-оболочку (на основе этого ответа) вокруг ConcurrentQueue с таким методом Enqueue ():
public class FixedSizeQueue<T>
{
readonly ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
public int Size { get; private set; }
public FixedSizeQueue(int size)
{
Size = size;
}
public void Enqueue(T obj)
{
// add item to the queue
queue.Enqueue(obj);
lock (this) // lock queue so that queue.Count is reliable
{
while (queue.Count > Size) // if queue count > max queue size, then dequeue an item
{
T objOut;
queue.TryDequeue(out objOut);
}
}
}
}
Я создаю экземпляр этого класса с таким ограничением размера очереди:
FixedSizeQueue<string> incomingMessageQueue = new FixedSizeQueue<string>(10); // 10 item limit
Я запускаю свою задачу производителя, и она начинает заполнять очередь. Код в моем методе Enqueue (), кажется, работает правильно в отношении удаления самого старого элемента из очереди, когда добавление элемента приводит к тому, что количество очереди превышает максимальный размер. Теперь мне нужно, чтобы моя потребительская задача выводила элементы из очереди и обрабатывала их, но здесь мой мозг запутался. Как лучше всего реализовать метод Dequeue для моего потребителя, который будет делать снимок очереди в определенный момент времени и выводить из очереди все элементы для обработки (производитель может все еще добавлять элементы в очередь во время этого процесса)?