Поточно-безопасная очередь без блокировки - нужен совет

Мне нужно разработать потокобезопасный регистратор. Мой регистратор должен иметь метод Log(), который просто ставит в очередь текст для регистрации. Также регистратор должен быть без блокировки, чтобы другие потоки могли регистрировать сообщения, не блокируя регистратор. Мне нужно разработать рабочий поток, который должен ожидать некоторого события синхронизации, а затем регистрировать все сообщения из очереди, используя стандартное ведение журнала .NET (это не потокобезопасно). Итак, что меня интересует, так это синхронизация рабочего потока и функции журнала. Ниже приведен эскиз класса, который я разработал. Я думаю, что я должен использовать здесь Monitor.Wait/Pulse или любые другие средства для приостановки и возобновления рабочего потока. Я не хочу тратить циклы процессора, когда нет работы для регистратора.

Скажем иначе: я хочу разработать регистратор, который не блокирует вызывающие потоки, которые его используют. У меня есть высокопроизводительная система - и это требование.

class MyLogger
{
  // This is a lockfree queue - threads can directly enqueue and dequeue
  private LockFreeQueue<String> _logQueue;
  // worker thread
  Thread _workerThread;
  bool _IsRunning = true;

 // this function is used by other threads to queue log messages
  public void Log(String text)
{
  _logQueue.Enqueue(text);
}

// this is worker thread function
private void ThreadRoutine()
{
 while(IsRunning)
 {
   // do something here
 }
}    
}

person Captain Comic    schedule 12.01.2010    source источник


Ответы (3)


«без блокировки» не означает, что потоки не будут блокировать друг друга. Это означает, что они блокируют друг друга с помощью очень эффективных, но и очень хитрых механизмов. Требуется только для сценариев с очень высокой производительностью, и даже эксперты ошибаются (много).

Лучший совет: забудьте о «свободной от блокировки» и просто используйте «потокобезопасную» очередь.

Я бы рекомендовал «Очередь блокировки» с этой страницы.

И это вопрос выбора, чтобы включить ThreadRoutine (Потребитель) в сам класс.

Что касается второй части вашего вопроса, это зависит от того, что такое «какое-то событие синхронизации». Если вы собираетесь использовать вызов метода, пусть он запускает одноразовый поток. Если вы хотите подождать на семафоре, не используйте Monitor и Pulse. Здесь они не надежны. Используйте событие AutoResetEvent/ManualResetEvent.
Способ отображения зависит от того, как вы хотите его использовать.

Ваши основные ингредиенты должны выглядеть так:

class Logger
{
    private AutoResetEvent _waitEvent = new AutoResetEvent(false);
    private object _locker = new object();
    private bool _isRunning = true;    

    public void Log(string msg)
    {
       lock(_locker) { _queue.Enqueue(msg); }
    }

    public void FlushQueue()
    {
        _waitEvent.Set();
    }

    private void WorkerProc(object state)
    {
        while (_isRunning)
        {
            _waitEvent.WaitOne();
            // process queue, 
            // ***
            while(true)
            {
                string s = null;
                lock(_locker)
                {
                   if (_queue.IsEmpty) 
                      break;
                   s = _queue.Dequeu();
                }
                if (s != null)
                  // process s
            }
        } 
    }
}

Кажется, часть обсуждения касается того, что делать при обработке очереди (отмечено ***). Вы можете заблокировать Очередь и обработать все элементы, во время которых добавление новых записей будет заблокировано (дольше), или заблокировать и извлекать записи одну за другой и блокировать (очень) ненадолго каждый раз. Я добавил этот последний сценарий.

Резюме: вам нужно не решение Lock-Free, а решение Block-Free. Block-Free не существует, вам придется довольствоваться чем-то, что блокирует как можно меньше. Последняя итерация примера mys (неполная) показывает, как блокировать только вызовы Enqueue и Dequeue. Я думаю, что это будет достаточно быстро.

person Henk Holterman    schedule 12.01.2010
comment
Что ж, мое требование здесь состоит в том, чтобы реализовать неблокирующее ведение журнала, поскольку у нас есть какая-то высокопроизводительная служба Windows. - person Captain Comic; 12.01.2010
comment
Но как вы собираетесь реализовать блокировку для потока регистратора? Накладные расходы на блокировку намного больше, чем на использование Monitor. - person wj32; 12.01.2010
comment
Капитан, я думаю, мы расходимся только в названиях вещей. - person Henk Holterman; 12.01.2010
comment
Довольно озадачивает. Вы хотите, чтобы ваш рабочий поток блокировался, но вам нужна неблокирующая очередь. И @Henk Holterman: без блокировки действительно подразумевается общий прогресс. Поскольку вы не можете зависнуть в середине инструкции (в то время как вы можете, когда мьютекс получен), код без блокировок никогда не блокируется. - person wj32; 12.01.2010
comment
Эээ ... Я не очень хорошо понимаю многопоточность - не могли бы вы немного объяснить - может быть в отдельном ответе - использование блока в какой-то общедоступной (используемой другими потоками) функции - не означает ли это, что один поток может заблокировать выполнение другого потока? Если да, я подумал, что если я использую функцию lockfreequeue (на основе CompareExchange) (которая, как я полагаю, не блокирует, потому что CompareExchange — это какая-то хитрая функция Windows), мне просто нужно отправить какой-то сигнал рабочему потоку, чтобы он может начать вывод из очереди? Извините еще раз за мое небольшое знание вопроса. - person Captain Comic; 12.01.2010
comment
Очень редко можно иметь структуру данных, которую можно безопасно обновлять из нескольких потоков без синхронизации или блокирующих конструкций. Отсутствие блокировки просто означает, что вы не используете объекты блокировки ядра ОС, вместо этого используете такие вещи, как спин-ожидание, чтобы дождаться, когда что-то станет доступным, обычно в основном используемое в коде ядра, где такие блокировки вообще недоступны. Я бы начал с чего-то простого, вроде обычной очереди блокировки, и посмотрел, как она масштабируется. - person Lasse V. Karlsen; 12.01.2010
comment
Блокировка не плохая или что-то в этом роде. Это просто означает, что ваш поток не будет выполняться, пока вы явно не разбудите его. В этом случае вы хотите уведомлять поток регистратора каждый раз, когда что-то регистрируется. Зачем использовать незаблокированную очередь? Вам нужно синхронизировать больше вещей, чем просто постановка/удаление из очереди. Использование незаблокированной очереди вообще не даст заметного улучшения производительности. - person wj32; 12.01.2010
comment
@Lasse: спин-блокировки - это блокировки. Они не без замков. И кто сказал, что вы не можете использовать блокировки в режиме ядра? Я думаю, вы имели в виду функцию сравнения и замены (CAS), также известную как CompareExchange. - person wj32; 12.01.2010
comment
Я часто использую дизайн в этом ответе, и он достаточно хорошо работает для встроенных систем. Используйте очередь, которую вы блокируете при постановке/удалении элементов из очереди. Блокировка очень короткая, и конкуренция минимальна по той же причине. Рабочий поток может заблокировать один раз и локально удалить несколько элементов из очереди для обработки, чтобы еще меньше блокировать. - person Ioan; 12.01.2010

Ваш профилировщик показал вам, что вы испытываете большие накладные расходы, используя простой оператор lock? Программирование без блокировок очень сложно сделать правильно, и если вам это действительно нужно, я бы посоветовал взять что-то существующее из надежного источника.

person erikkallen    schedule 12.01.2010
comment
Хотя алгоритмы без блокировок могут иногда обеспечивать более высокую производительность, чем алгоритмы, использующие блокировки, большее преимущество заключается в том, что алгоритмы без блокировок устойчивы к некоторым механизмам сбоев, которые могут мешать алгоритмам, использующим блокировки. Например, при использовании кода без блокировок застрявший поток просто перестанет конкурировать с другими потоками за оспариваемые ресурсы. Напротив, при использовании блокировок поток, который получает блокировку, а затем задерживается, может привести к тому, что другим потокам будет на неопределенный срок отказано в использовании ресурса. - person supercat; 27.12.2011

Нетрудно сделать это без блокировки, если у вас есть атомарные операции. Возьмите односвязный список; вам просто нужен указатель head.

Функция журнала:
1. Локально подготовить элемент журнала (узел со строкой журнала).
2. Установить указатель next локального узла на head.
3. ATOMIC : Сравните head со следующим локальным узлом, если они равны, замените head адресом локального узла.
4. Если операция не удалась, повторите с шага 2, иначе элемент находится в «очереди».

Worker:
1. Скопируйте head локально.
2. ATOMIC: Сравните head с локальным, если они совпадают, замените < strong>head со значением NULL.
3. Если операция не удалась, повторите ее с шага 1.
4. Если она прошла успешно, обработайте элементы; которые теперь являются локальными и вне "очереди".

person Ioan    schedule 12.01.2010