ожидаемая очередь на основе задач

Мне интересно, существует ли реализация/оболочка для ConcurrentQueue, аналогичная в BlockingCollection, где получение из коллекции не блокируется, а выполняется асинхронно и вызовет асинхронное ожидание, пока элемент не будет помещен в очередь.

Я придумал свою собственную реализацию, но, похоже, она работает не так, как ожидалось. Мне интересно, не изобретаю ли я что-то, что уже существует.

Вот моя реализация:

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();

    object queueSyncLock = new object();

    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> Dequeue()
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs=null;
        T firstItem=default(T);
        while (true)
        {
            bool ok;
            lock (queueSyncLock)
            {
                ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem);
                if (ok)
                {
                    waitingQueue.TryDequeue(out tcs);
                    queue.TryDequeue(out firstItem);
                }
            }
            if (!ok) break;
            tcs.SetResult(firstItem);
        }
    }
}

person spender    schedule 23.10.2011    source источник
comment
Согласно нашему руководству по теме, некоторые вопросы по-прежнему не относятся к теме, даже если они соответствуют одному из категории, перечисленные выше:...Вопросы с просьбой порекомендовать или найти книгу, инструмент, библиотеку программного обеспечения, учебник или другой ресурс за пределами сайта не относятся к теме...   -  person Robert Columbia    schedule 29.06.2018
comment
Очередь с возможностью ожидания — это то, о чем я недавно думал (вот мой вопрос: stackoverflow.com/questions/52775484/)! Я считаю, что это решило бы СТОЛЬКО проблем в архитектуре микросервисов! Но в этом случае очередь, вероятно, должна быть постоянной очередью, а не чем-то в памяти.   -  person user2173353    schedule 15.10.2018
comment
Связанный: Есть ли что-нибудь вроде асинхронной BlockingCollection‹T›?   -  person Theodor Zoulias    schedule 26.06.2019


Ответы (10)


Я не знаю решения без блокировки, но вы можете взглянуть на новый библиотека потоков данных, часть асинхронной CTP. Достаточно простого BufferBlock<T>, например:

BufferBlock<int> buffer = new BufferBlock<int>();

Производство и потребление проще всего выполнять с помощью методов расширения типов блоков потока данных.

Производство так же просто, как:

buffer.Post(13);

и потребление готово к асинхронности:

int item = await buffer.ReceiveAsync();

Я рекомендую вам использовать Dataflow, если это возможно; сделать такой буфер одновременно эффективным и правильным сложнее, чем кажется на первый взгляд.

person Stephen Cleary    schedule 23.10.2011
comment
Выглядит очень многообещающе... завтра проверю. Спасибо. Это очень похоже на порт CCR. - person spender; 23.10.2011
comment
Вместо этого заглянул перед сном! Похоже, что Dataflow очень хорошо соответствует моим потребностям. Кажется, это устраняет разрыв между тем, что предлагает TPL, и тем, что предлагается в CCR (которую я использовал с большим успехом). Это оставляет меня уверенным, что отличная работа в CCR не была потрачена впустую. Это правильный ответ (и что-то блестящее и новое, в которое можно вонзить зубы!) Спасибо @StephenCleary. - person spender; 24.10.2011
comment
В собственной библиотеке Nito.AsyncEx Стивена Клири также есть AsyncProducerConsumerQueue. ‹T›, который является альтернативой BufferBlock<T>. - person Fanblade; 14.05.2021
comment
@Fanblade: Верно, но сейчас я указываю людям на System.Threading.Channels. Каналы — очень эффективное и очень современное решение. - person Stephen Cleary; 14.05.2021

Простой подход с C# 8.0 IAsyncEnumerable и библиотекой потоков данных

// Instatiate an async queue
var queue = new AsyncQueue<int>();

// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
    // Writes a line as soon as some other Task calls queue.Enqueue(..)
    Console.WriteLine(i);
}

С реализацией AsyncQueue следующим образом:

public class AsyncQueue<T> : IAsyncEnumerable<T>
{
    private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
    private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();

    public void Enqueue(T item) =>
        _bufferBlock.Post(item);

    public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
    {
        // We lock this so we only ever enumerate once at a time.
        // That way we ensure all items are returned in a continuous
        // fashion with no 'holes' in the data when two foreach compete.
        await _enumerationSemaphore.WaitAsync();
        try {
            // Return new elements until cancellationToken is triggered.
            while (true) {
                // Make sure to throw on cancellation so the Task will transfer into a canceled state
                token.ThrowIfCancellationRequested();
                yield return await _bufferBlock.ReceiveAsync(token);
            }
        } finally {
            _enumerationSemaphore.Release();
        }

    }
}
person Bruno Zell    schedule 30.04.2019
comment
Мне нравится, когда старый вопрос получает современное обновление. Проголосуйте. Я не проверял IAsyncEnumerable, но очень хорошо знаком с Symbol.asyncIterator javascript, который выглядит более или менее похоже на ту же концепцию. - person spender; 30.04.2019
comment
Спасибо @spender! Я так думаю, это в основном IEnumerable, но вы можете асинхронно ожидать новых элементов, поэтому это неблокирующая операция. - person Bruno Zell; 30.04.2019
comment
Интересно, есть ли какая-то конкретная причина для использования SemaphoreSlim(1) вместо lock? - person valorl; 01.05.2020
comment
@valori внутри lock не может быть await - person Bruno Zell; 02.05.2020
comment
И надстройка только для того, чтобы получить текущий прогресс очереди, добавить свойство в класс AsyncQueue.cs public int Count { get { return _bufferBlock.Count; } } используя этот счетчик, мы можем проверить в цикле foreach, пуста ли очередь или нет: await foreach(int i in queue) { if(queue.Count › 1) { // очередь не пуста } else { // очередь пуста } // Записывает строку, как только другая задача вызывает queue.Enqueue(..) Console.WriteLine(i); } - person Yasir Ali; 22.05.2020
comment
Я использовал это в классе-оболочке для преобразования IObserver в IAsyncEnumerable. Спасибо! - person MplsAmigo; 29.07.2020

Один простой и легкий способ реализовать это с помощью SemaphoreSlim:

public class AwaitableQueue<T>
{
    private SemaphoreSlim semaphore = new SemaphoreSlim(0);
    private readonly object queueLock = new object();
    private Queue<T> queue = new Queue<T>();

    public void Enqueue(T item)
    {
        lock (queueLock)
        {
            queue.Enqueue(item);
            semaphore.Release();
        }
    }

    public T WaitAndDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        semaphore.Wait(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }

    public async Task<T> WhenDequeue(TimeSpan timeSpan, CancellationToken cancellationToken)
    {
        await semaphore.WaitAsync(timeSpan, cancellationToken);
        lock (queueLock)
        {
            return queue.Dequeue();
        }
    }
}

Прелесть этого в том, что SemaphoreSlim берет на себя всю сложность реализации функций Wait() и WaitAsync(). Недостатком является то, что длина очереди отслеживается как семафором , так и самой очередью, и они оба волшебным образом остаются синхронизированными.

person Ryan    schedule 09.01.2020
comment
Хорошо, если производительность не имеет первостепенного значения, не ожидаются всплески очереди или удаления из очереди, а время обработки каждого элемента является значительным. Он использует блокировку, что означает, что к коллекции может получить доступ только один поток за раз, а все остальные будут ожидать блокировки при постановке или удалении элемента из очереди. - person Jordi; 04.11.2020
comment
Следует принять во внимание результат semaphore.WaitAsync(), и в случае достижения тайм-аута вернуть null значение по умолчанию или создать исключение. - person Guillermo Prandi; 24.01.2021
comment
@GuillermoPrandi Задача semaphore.WaitAsync не возвращает значение. Если тайм-аут достигнут, он выдаст TaskCanceledException, который будет пузыриться. - person Ryan; 08.02.2021
comment
@Ryan документы. microsoft.com/en-us/dotnet/api/ Задача, которая завершится с результатом true, если текущий поток успешно войдет в SemaphoreSlim, в противном случае с результатом false. - person Guillermo Prandi; 08.02.2021
comment
Как вы используете это, чтобы поставить в очередь список ожидаемых заданий с возвратом? - person johnstaveley; 18.03.2021

Теперь есть официальный способ сделать это: System.Threading.Channels. Он встроен в основную среду выполнения в .NET Core 3.0 и выше (включая .NET 5.0 и 6.0), но также доступен в виде пакета NuGet в .NET Standard 2.0 и 2.1. Вы можете ознакомиться с документацией здесь.

var channel = System.Threading.Channels.Channel.CreateUnbounded<int>();

Чтобы поставить работу в очередь:

// This will succeed and finish synchronously if the channel is unbounded.
channel.Writer.TryWrite(42);

Чтобы завершить канал:

channel.Writer.TryComplete();

Читать с канала:

var i = await channel.Reader.ReadAsync();

Или, если у вас .NET Core 3.0 или выше:

await foreach (int i in channel.Reader.ReadAllAsync())
{
    // whatever processing on i...
}
person kanders84152    schedule 09.04.2021

Моя попытка (событие возникает, когда создается «обещание», и его может использовать внешний производитель, чтобы узнать, когда производить больше предметов):

public class AsyncQueue<T>
{
    private ConcurrentQueue<T> _bufferQueue;
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue;
    private object _syncRoot = new object();

    public AsyncQueue()
    {
        _bufferQueue = new ConcurrentQueue<T>();
        _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>();
    }

    /// <summary>
    /// Enqueues the specified item.
    /// </summary>
    /// <param name="item">The item.</param>
    public void Enqueue(T item)
    {
        TaskCompletionSource<T> promise;
        do
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;                                       
            }
        }
        while (promise != null);

        lock (_syncRoot)
        {
            if (_promisesQueue.TryDequeue(out promise) &&
                !promise.Task.IsCanceled &&
                promise.TrySetResult(item))
            {
                return;
            }

            _bufferQueue.Enqueue(item);
        }            
    }

    /// <summary>
    /// Dequeues the asynchronous.
    /// </summary>
    /// <param name="cancellationToken">The cancellation token.</param>
    /// <returns></returns>
    public Task<T> DequeueAsync(CancellationToken cancellationToken)
    {
        T item;

        if (!_bufferQueue.TryDequeue(out item))
        {
            lock (_syncRoot)
            {
                if (!_bufferQueue.TryDequeue(out item))
                {
                    var promise = new TaskCompletionSource<T>();
                    cancellationToken.Register(() => promise.TrySetCanceled());

                    _promisesQueue.Enqueue(promise);
                    this.PromiseAdded.RaiseEvent(this, EventArgs.Empty);

                    return promise.Task;
                }
            }
        }

        return Task.FromResult(item);
    }

    /// <summary>
    /// Gets a value indicating whether this instance has promises.
    /// </summary>
    /// <value>
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>.
    /// </value>
    public bool HasPromises
    {
        get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; }
    }

    /// <summary>
    /// Occurs when a new promise
    /// is generated by the queue
    /// </summary>
    public event EventHandler PromiseAdded;
}
person André Bires    schedule 08.04.2014
comment
Я думаю, что это лучшее решение. Я реализовал это и тщательно протестировал. Несколько замечаний: вызов !promise.Task.IsCanceled не нужен. Я добавил ManualResetEventSlim, чтобы отслеживать, когда bufferQueue пуст, чтобы вызывающая сторона могла заблокироваться, ожидая, пока очередь опустеет. - person Brian Heilig; 09.03.2016
comment
Вы должны распоряжаться CancellationTokenRegistration, которые вы получили от cancellationToken.Register вызова. - person Paya; 06.03.2017

Это может быть излишним для вашего варианта использования (учитывая кривую обучения), но Реактивные расширения обеспечивает все необходимое для асинхронной композиции.

По сути, вы подписываетесь на изменения, и они передаются вам по мере их появления, и вы можете сделать так, чтобы система отправляла изменения в отдельный поток.

person Morten Mertner    schedule 23.10.2011
comment
Я, по крайней мере, частично разбираюсь в Reactive, но это немного эзотерично для использования в производстве, поскольку другим, возможно, придется поддерживать код. Я действительно ценю простоту, которую async/await привносит в ранее очень сложный серверный продукт, и я пытаюсь сохранить всю асинхронную технологию в рамках одной технологии. - person spender; 23.10.2011

Проверьте https://github.com/somdoron/AsyncCollection, вы можете как асинхронно удалять из очереди, так и использовать C# 8.0 IAsyncEnumerable.

API очень похож на BlockingCollection.

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    while (!collection.IsCompleted)
    {
        var item = await collection.TakeAsync();

        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();

С IAsyncEnumeable:

AsyncCollection<int> collection = new AsyncCollection<int>();

var t = Task.Run(async () =>
{
    await foreach (var item in collection)
    {
        // process
    }
});

for (int i = 0; i < 1000; i++)
{
    collection.Add(i);
}

collection.CompleteAdding();

t.Wait();
person somdoron    schedule 07.07.2019
comment
Ваш пример var item = await collection.TakeAsync() кажется подходящим только для одного потребителя. С несколькими потребителями вы можете получить InvalidOperationException. Я думаю, вам следует использовать TryTakeAsync вместо TakeAsync, чтобы он работал корректно и с несколькими потребителями. - person Theodor Zoulias; 09.08.2019

Вот реализация, которую я сейчас использую.

public class MessageQueue<T>
{
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>();
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
        new ConcurrentQueue<TaskCompletionSource<T>>();
    object queueSyncLock = new object();
    public void Enqueue(T item)
    {
        queue.Enqueue(item);
        ProcessQueues();
    }

    public async Task<T> DequeueAsync(CancellationToken ct)
    {
        TaskCompletionSource<T> tcs = new TaskCompletionSource<T>();
        ct.Register(() =>
        {
            lock (queueSyncLock)
            {
                tcs.TrySetCanceled();
            }
        });
        waitingQueue.Enqueue(tcs);
        ProcessQueues();
        return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task;
    }

    private void ProcessQueues()
    {
        TaskCompletionSource<T> tcs = null;
        T firstItem = default(T);
        lock (queueSyncLock)
        {
            while (true)
            {
                if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem))
                {
                    waitingQueue.TryDequeue(out tcs);
                    if (tcs.Task.IsCanceled)
                    {
                        continue;
                    }
                    queue.TryDequeue(out firstItem);
                }
                else
                {
                    break;
                }
                tcs.SetResult(firstItem);
            }
        }
    }
}

Он работает достаточно хорошо, но на queueSyncLock довольно много разногласий, так как я довольно часто использую CancellationToken для отмены некоторых ожидающих задач. Конечно, это приводит к значительно меньшей блокировке, которую я бы увидел с BlockingCollection, но...

Мне интересно, есть ли более гладкие средства без блокировки для достижения той же цели

person spender    schedule 23.10.2011

Ну, 8 лет спустя я столкнулся с этим самым вопросом и собирался реализовать класс MS AsyncQueue<T>, найденный в пакете/пространстве имен nuget: Microsoft.VisualStudio.Threading

Спасибо @Theodor Zoulias за упоминание, что этот API может быть устаревшим, и библиотека DataFlow была бы хорошей альтернативой.

Поэтому я отредактировал свою реализацию AsyncQueue‹>, чтобы использовать BufferBlock‹>. Почти то же самое, но работает лучше.

Я использую это в фоновом потоке AspNet Core, и он работает полностью асинхронно.

protected async Task MyRun()
{
    BufferBlock<MyObj> queue = new BufferBlock<MyObj>();
    Task enqueueTask = StartDataIteration(queue);

    while (await queue.OutputAvailableAsync())
    {
        var myObj = queue.Receive();
        // do something with myObj
    }

}

public async Task StartDataIteration(BufferBlock<MyObj> queue)
{
    var cursor = await RunQuery();
    while(await cursor.Next()) { 
        queue.Post(cursor.Current);
    }
    queue.Complete(); // <<< signals the consumer when queue.Count reaches 0
}

Я обнаружил, что использование queue.OutputAvailableAsync() устранило проблему, которая у меня была с AsyncQueue‹> — попытка определить, когда очередь была завершена, и не нужно проверять задачу удаления из очереди.

person bmiller    schedule 10.03.2020
comment
Ожидание queue.DequeueAsync() и queue.Completion с Task.WhenAny — хитрый трюк, но он похож на хак, чтобы преодолеть недостатки плохого дизайна API. Альтернативные классы (Dataflow BufferBlock<T> и Channel<T>) предлагают методы (OutputAvailableAsync и WaitToReadAsync соответственно), которые позволяют ожидать дополнительных элементов без необходимости обработки исключения в качестве механизма обратной связи. Проблема с вашим трюком заключается в том, что вы можете получить ошибочную задачу с ненаблюдаемым исключением, что в этом случае вызовет событие TaskScheduler.UnobservedTaskException. - person Theodor Zoulias; 10.03.2020
comment
В классе есть и другие средства уведомления, но в документации MS не было примера. В моем случае у меня есть несколько задач, поэтому мне все равно пришлось использовать WhenAny. -и если задача бросает вызов, она может быть перехвачена как AggregateException. - person bmiller; 11.03.2020
comment
Я хочу сказать, что класс Microsoft.VisualStudio.Threading.AsyncQueue<T> не следует использовать для новых проектов, потому что сегодня доступны лучшие альтернативы. Особенно класс Channel<T>, который не только предлагает лучший API, но он также имеет отличные характеристики производительности. - person Theodor Zoulias; 11.03.2020
comment
хорошо, вы правы, AsyncQueue основан на библиотеке TPL и, похоже, предназначен для работы в расширениях Visual Studio. Я отредактирую свой ответ с моей реализацией. Спасибо за ваш комментарий, вы, возможно, спасли меня от кучи головной боли. - person bmiller; 11.03.2020
comment
В вашей новой реализации существует потенциальное состояние гонки (основанное на BufferBlock), которое может возникнуть, если у вас будет несколько потребителей. Метод Receive может быть вызван одним потребителем сразу после того, как другой потребитель взял последний элемент из очереди. По этой причине предпочтительнее использовать метод TryReceive в качестве условия либо в блоке if, либо в блоке while, чтобы вам не приходилось просматривать потребляющий код, если вы позже обновите свою архитектуру. Посмотрите здесь пример. - person Theodor Zoulias; 12.03.2020

Вы можете просто использовать BlockingCollection (используя ConcurrentQueue по умолчанию) и обернуть вызов Take в Task, чтобы вы могли await его:

var bc = new BlockingCollection<T>();

T element = await Task.Run( () => bc.Take() );
person Nick Butler    schedule 23.10.2011
comment
Хорошая идея, но я не доволен блокировкой. У меня будет несколько тысяч клиентов, у каждого из которых будет своя очередь сообщений. Любая блокировка потопит корабль, потому что свяжет нити, ничего не делая. Причина, по которой мне нужна ожидаемая неблокирующая задача, заключается в том, что я могу хранить все операции в пуле потоков, не вызывая голодания пула потоков. - person spender; 23.10.2011