Неблокирующая параллельная коллекция?

В System.Collections.Concurrent есть несколько новых коллекций, которые очень хорошо работают в многопоточных средах. Однако они немного ограничены. Либо они блокируются, пока элемент не станет доступным, либо возвращают default(T) (методы TryXXX).

Мне нужна коллекция, которая является потокобезопасной, но вместо блокировки вызывающего потока она использует обратный вызов, чтобы сообщить мне, что доступен хотя бы один элемент.

Мое текущее решение — использовать BlockingCollection, но использовать APM с делегатом для получения следующего элемента. Другими словами, я создаю делегат для метода, который Takes из коллекции, и выполняю этот делегат, используя BeginInvoke.

К сожалению, для этого мне нужно сохранить много состояний в моем классе. Хуже того, класс не является потокобезопасным; он может использоваться только одним потоком. Я обхожу грань ремонтопригодности, чего я бы предпочел не делать.

Я знаю, что есть некоторые библиотеки, которые делают то, что я здесь делаю, довольно простым (я считаю, что Reactive Framework является одной из них), но я хотел бы достичь своих целей, не добавляя никаких ссылок за пределами версии 4 фреймворка. .

Есть ли какие-нибудь лучшие шаблоны, которые я могу использовать, которые не требуют внешних ссылок для достижения моей цели?


tl;dr:

Существуют ли шаблоны, удовлетворяющие требованию:

"Мне нужно сообщить коллекции, что я готов к следующему элементу, и заставить коллекцию выполнить обратный вызов, когда этот следующий элемент прибудет, без блокировки каких-либо потоков."


person Community    schedule 19.07.2010    source источник
comment
Будет ли это потокобезопасным? Что мешает доступному элементу стать недоступным до вызова делегата? И какова ваша общая цель (например, система очередей)?   -  person Adam Houldsworth    schedule 19.07.2010
comment
@Adam Хороший вопрос о потреблении предмета. Делегат берет элемент, удаленный из коллекции. Таким образом, выполнение делегата блокируется до тех пор, пока элемент не будет Take-en из коллекции, и этот элемент не будет object передан EndInvoke. Общая цель немного запутана; по сути, я должен простаивать рабочий процесс, пока элемент не станет доступным. Вы не можете заблокировать выполнение рабочего процесса, поэтому просто Take-элемент не будет работать, поскольку вызов блокирует. Мне нужно создать закладку, а затем передать ее расширению. Расширение вызывает делегат, возобновляя закладку в обратном вызове.   -  person    schedule 19.07.2010
comment
к сожалению, у меня мало опыта работы с рабочими процессами - попробуйте добавить эту деталь к своему вопросу, и это может кого-то заинтересовать :-)   -  person Adam Houldsworth    schedule 19.07.2010
comment
@Adam Адам, часть рабочего процесса не имеет особого значения; это причина моего требования. Я не включил его, потому что решил, что либо запутаюсь, либо получу ответы/комментарии, либо они попытаются ответить о рабочем процессе, а не об асинхронности...   -  person    schedule 19.07.2010


Ответы (2)


Я думаю, что у меня есть два возможных решения. Я не очень доволен ни тем, ни другим, но они, по крайней мере, представляют собой разумную альтернативу подходу APM.

Первый не соответствует вашему требованию отсутствия блокирующего потока, но я думаю, что он довольно элегантен, потому что вы можете регистрировать обратные вызовы, и они будут вызываться в циклическом режиме, но у вас все еще есть возможность вызывать Take или TryTake, как обычно. для BlockingCollection. Этот код принудительно регистрирует обратные вызовы каждый раз, когда запрашивается элемент. Это сигнальный механизм для коллекции. Преимущество этого подхода в том, что вызовы Take не прерываются, как в моем втором решении.

public class NotifyingBlockingCollection<T> : BlockingCollection<T>
{
    private Thread m_Notifier;
    private BlockingCollection<Action<T>> m_Callbacks = new BlockingCollection<Action<T>>();

    public NotifyingBlockingCollection()
    {
        m_Notifier = new Thread(Notify);
        m_Notifier.IsBackground = true;
        m_Notifier.Start();
    }

    private void Notify()
    {
        while (true)
        {
            Action<T> callback = m_Callbacks.Take();
            T item = Take();
            callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
        }
    }

    public void RegisterForTake(Action<T> callback)
    {
        m_Callbacks.Add(callback);
    }
}

Второй соответствует вашему требованию отсутствия блокирующего потока. Обратите внимание, как он передает вызов обратного вызова в пул потоков. Я сделал это, потому что думаю, что если бы он выполнялся синхронно, то блокировки удерживались бы дольше, что приводило к узким местам Add и RegisterForTake. Я внимательно изучил его и не думаю, что он может быть заблокирован в реальном времени (доступны и элемент, и обратный вызов, но обратный вызов никогда не выполняется), но вы можете просмотреть его самостоятельно, чтобы убедиться. Единственная проблема здесь заключается в том, что вызов Take будет зависать, поскольку обратные вызовы всегда имеют приоритет.

public class NotifyingBlockingCollection<T>
{
    private BlockingCollection<T> m_Items = new BlockingCollection<T>();
    private Queue<Action<T>> m_Callbacks = new Queue<Action<T>>();

    public NotifyingBlockingCollection()
    {
    }

    public void Add(T item)
    {
        lock (m_Callbacks)
        {
            if (m_Callbacks.Count > 0)
            {
                Action<T> callback = m_Callbacks.Dequeue();
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Items.Add(item);
            }
        }
    }

    public T Take()
    {
        return m_Items.Take();
    }

    public void RegisterForTake(Action<T> callback)
    {
        lock (m_Callbacks)
        {
            T item;
            if (m_Items.TryTake(out item))
            {
                callback.BeginInvoke(item, null, null); // Transfer to the thread pool.
            }
            else
            {
                m_Callbacks.Enqueue(callback);
            }
        }
    }
}
person Brian Gideon    schedule 19.07.2010
comment
Спасибо за ответ, но это немного не то, что я ищу. Это то, что я сейчас делаю, но с APM, помещенным в коллекцию (код, который вы предоставили). Я предполагаю, что суть моей проблемы в том, что APM не соответствует моим требованиям, это просто реализация, которую я использовал. Мои требования требуют шаблона, который обеспечивает решение вопроса: как я могу сообщить коллекции, что я готов к следующему элементу, и чтобы коллекция выполнила обратный вызов, когда этот следующий элемент прибыл, без блокировки каких-либо потоков? - person ; 20.07.2010
comment
Я как бы подумал, что это не то, что вам нужно. Хотя это интересная проблема. Жаль, что Add не virtual, иначе вы могли бы каким-то образом внедрить туда уведомление. Возможно, вы могли бы использовать одну из реализаций очереди блокировки в качестве отправной точки. Проблема в том, что вы должны быть осторожны при доставке этого уведомления, иначе другой потребитель схватит товар первым. Я мог бы поиграть с этим сегодня, если у меня будет время. Напишите ответ сами, если разберетесь. Я не знаю ... вам может быть проще просто сослаться на другую библиотеку. - person Brian Gideon; 20.07.2010
comment
Уведомление должно содержать следующий элемент и должно контролироваться уведомителем. Возможно, представление о том, что это коллекция, ошибочно; только с помощью этого механизма может быть предоставлен следующий элемент, что позволяет избежать проблемы, когда два наблюдателя соревнуются за один элемент. Другими словами, один наблюдатель не может использовать механизм A для получения следующего элемента (т. е. T Pop()), пока другой зарегистрирован для обратного вызова. - person ; 20.07.2010
comment
@Will: Посмотри сейчас. У меня есть две разные идеи. Меня мало беспокоит второй пример относительно возможной ситуации с живой блокировкой, но, насколько я могу судить, он безопасен для нескольких производителей и нескольких потребителей. - person Brian Gideon; 20.07.2010
comment
благодаря. Я, вероятно, закончу тем, что адаптирую что-то из этого. Я надеялся, что что-то похожее на то, что мне нужно, было в фреймворке, но так оно и есть. - person ; 20.07.2010

Как насчет чего-то подобного? (Название, вероятно, требует некоторой работы. И обратите внимание, что это не проверено.)

public class CallbackCollection<T>
{
    // Sychronization object to prevent race conditions.
    private object _SyncObject = new object();

    // A queue for callbacks that are waiting for items.
    private ConcurrentQueue<Action<T>> _Callbacks = new ConcurrentQueue<Action<T>>();

    // A queue for items that are waiting for callbacks.
    private ConcurrentQueue<T> _Items = new ConcurrentQueue<T>();

    public void Add(T item)
    {
        Action<T> callback;
        lock (_SyncObject)
        {
            // Try to get a callback. If no callback is available,
            // then enqueue the item to wait for the next callback
            // and return.
            if (!_Callbacks.TryDequeue(out callback))
            {
                _Items.Enqueue(item);
                return;
            }
        }

        ExecuteCallback(callback, item);
    }

    public void TakeAndCallback(Action<T> callback)
    {
        T item;
        lock(_SyncObject)
        {
            // Try to get an item. If no item is available, then
            // enqueue the callback to wait for the next item
            // and return.
            if (!_Items.TryDequeue(out item))
            {
                _Callbacks.Enqueue(callback);
                return;
            }
        }
        ExecuteCallback(callback, item);
    }

    private void ExecuteCallback(Action<T> callback, T item)
    {
        // Use a new Task to execute the callback so that we don't
        // execute it on the current thread.
        Task.Factory.StartNew(() => callback.Invoke(item));
    }
}
person Jack Leitch    schedule 20.07.2010
comment
Только что обновился и увидел коллекцию NotificationBlockingCollection @Brian. Похоже, мы с ним пришли примерно к одному и тому же решению одновременно. - person Jack Leitch; 20.07.2010
comment
Да, здесь мы определенно думали в одном направлении, особенно в части получения вызова обратного вызова из текущего потока. - person Brian Gideon; 20.07.2010