Атомарная операция в C #

Введение

Атомарная операция названа академической линеаризуемостью, атомарность - это гарантия изоляции от параллельных процессов, она может быть усилена аппаратным уровнем, построенным на протоколе Cache Coherence, или эксклюзивной блокировкой на программном уровне. В этом сообщении в блоге я собираюсь исследовать несколько механизмов для достижения атомарной работы в .Net.

Что такое атомарные операции, а что нет?

В Спецификации C # говорится об атомарной операции:

«Чтение и запись следующих типов данных должны быть атомарными: bool, char, byte, sbyte, short, ushort, uint, int, float и ссылочные типы». Также: «… нет гарантии атомарного чтения-изменения-записи, например, в случае увеличения или уменьшения».

Threading In C # Джозефа Альбахари описал:

«Чтение / запись в 32-битном поле или меньше всегда является атомарным, операции в 64-битном поле гарантированно атомарны только в 64-битной ОС, операторы, которые объединяют более одной операции чтения / записи, никогда не являются атомарными. . »

Например, следующие операции гарантированно являются атомарными:

int i = 3; // Always atomic
long l = Int64.MaxValue; // Atomic in 64-bit enviroment, non-atomic on 32-bit environment

Код, подобный приведенному ниже, никогда не бывает атомарным:

int i = 0;
int j += i;  // Non-atomic, read and write operation
i++;         // Non-atomic, read and write operation

И я пытаюсь задокументировать атомарные операции на более низком уровне в разделе ниже.

Сущность

Учитывая, что два потока (или два процесса) работают одновременно: T1 и T2, есть поле, хранящееся в памяти, T1 считывает его значение и выполняет некоторые вычисления для значения и, наконец, записывает новое значение обратно в память, в течение периода T2. на самом деле выполняет ту же задачу - то есть чтение / вычисление / запись значения обратно, поэтому, возможно, одна операция в этом поле переопределяет другую - другими словами: более поздний выполненный поток (T2) может переопределить ранее выполненный (T1), потому что, когда он читает значение поля другой поток просто манипулировал им, и после того, как T1 закончил записывать новое значение обратно в память, T2 записывает обратно.

Итак, простым примером является операция увеличения / уменьшения, как я показал выше, это НЕ атомарная операция, она требует как чтения, так и записи, если многие потоки одновременно выполняют приращение в одном поле, это может вызвать состояние гонки (тем более потоков, чем больше будет выполняться операция приращения, тем больше вероятность возникновения состояния гонки).

Пример атомарности

Я написал приложение Winform, оно выполняет простую работу:

Создайте 10 потоков во время выполнения и одновременно работайте с частным целым числом, есть изменчивый счетчик с начальным значением 0, каждый поток завершает свою работу:

  1. Распечатать информацию в пользовательском интерфейсе.
  2. Увеличьте счетчик
  3. Проверьте, достигает ли счетчик 10, если да, выведите CalculatingFinished метод, который распечатает окончательный результат.

И я бы хотел, чтобы пользовательский интерфейс не блокировался во время расчета, иначе я могу просто присоединиться к каждому созданному потоку. Мой скелет кода показан ниже:

private const int MaxThraedCount = 10;
 private Thread[] m_Workers = new Thread[MaxThraedCount];
 private volatile int m_Counter = 0;
 private Int32 x = 0;
 
 protected void btn_DoWork_Click(object sender, EventArgs e)
 {
     ShowStatus("Starting...");
 
     for (int i = 0; i < MaxThraedCount; i++)
     {
         m_Workers[i] = new Thread(IncreaseNumber) { Name = "Thread " + (i + 1) };
         m_Workers[i].Start();
     }
 }
 
 void IncreaseNumber()
 {
     try
     {
         for (int i = 0; i < 10000; i++)
         {
             // Different strategy to increment x
         }
 
         ShowStatus(String.Format("{0} finished at {1}", Thread.CurrentThread.Name, m_Watcher.Elapsed.TotalMilliseconds));
         
         // Increases Counter and decides whether or not sets the finish signal
         m_Counter++;
         if (m_Counter == MaxThraedCount)
         {
             // Print finish information on UI
             CalculatingFinished();
             m_Counter = 0;
         }
     }
     catch (Exception ex)
     {
         throw;
     }
 }
 
 public void ShowStatus(string info)
 {
     this.InvokeAction(() => listInfo.Items.Add(info));
 }
 
 private void CalculatingFinished()
 {
     ShowStatus("\r\nAll Done at: " + m_Watcher.Elapsed.TotalMilliseconds);
     ShowStatus("The result: " + x.ToString());
 }

Я выделил «// Другая стратегия увеличения x» и попробую несколько способов достижения атомарности с использованием библиотек FCL.

Давайте сначала посмотрим на неатомарную процедуру - простой do x++ в каждом потоке:

for (int i = 0; i < 10000; i++)
 {
     x++;
 }

Поскольку x++ НЕ является атомарностью, вдобавок я сделал большой цикл - 10 тысяч раз, по моему опыту, я НИКОГДА не получаю правильный результат, скриншот ниже:

Анализ и решения

Чтобы решить эту проблему и обеспечить атомарность, в результате более чем двухнедельного частичного изучения я нашел следующие возможные способы (теоретически, без учета производительности):

  1. Interlocked.Increment
  2. Примените монопольную блокировку (или Moniter.Enter) вне цикла for.
  3. AutoResetEvent, чтобы потоки выполняли задачи один за другим.
  4. Создайте временное целое число в каждом потоке и после завершения добавьте временное значение в x под исключительной блокировкой.
  5. ReaderWriterLockSlim.
  6. Parallel.Для согласования с Interlocked.Increment.

Все вышеперечисленное позволяет выполнить атомарную операцию по увеличению значения x и получить ожидаемый результат:

На самом деле я пробовал другие способы, такие как использование MemoryBarrier, Thread.VolatileRead / VolatileWrite - StackOverFlow question link, но потерпел неудачу, если уважаемые читатели знают, что есть способ использовать их для достижения цели, пожалуйста, помогите мне.

Демонстрационный код

В этом разделе я перечислю ключевой код для реализации 5 решений, указанных выше.

Решение №1: Interlocked.Increment

for (int i = 0; i < 10000; i++)
     Interlocked.Increment(ref x);

Решение №2: примените монопольную блокировку (Monitor) вне цикла for.

private readonly string m_Locker = "THREAD_LOCKER";
 
 Monitor.Enter(m_Locker);
 for (int i = 0; i < 10000; i++)
     x++;
 Monitor.Exit(m_Locker);

Решение №3: AutoResetEvent, чтобы потоки выполняли задачу один за другим.

private static AutoResetEvent m_AutoReset = new AutoResetEvent(false);
 
 protected void btn_DoWork_Click(object sender, EventArgs e)
 {
     ShowStatus("Starting...");
 
     for (int i = 0; i < MaxThraedCount; i++)
     {
         m_Workers[i] = new Thread(IncreaseNumber) { Name = "Thread " + (i + 1) };
         m_Workers[i].Start();
     }
     
     m_AutoReset.Set();
 }
 
 void IncreaseNumber()
 {
     m_AutoReset.WaitOne();
     for (int i = 0; i < 10000; i++)
         x++;
     m_AutoReset.Set();
 }

Один примечательный момент заключается в том, что в этом случае (неблокирование пользовательского интерфейса) непросто использовать пару Monitor.Enter / Monitor.Pulse для замены AutoResetEvent и реализации логики «один за другим», поскольку Monitor.Pulse не сохраняет состояние , ниже приводится описание в MSDN:

Важно

Класс Monitor не поддерживает состояние, указывающее, что метод Pulse был вызван. Таким образом, если вы вызываете Pulse, когда ни один поток не ожидает, следующий поток, который вызывает Wait, блокируется, как если бы Pulse никогда не вызывался. Если два потока используют для взаимодействия Pulse и Wait, это может привести к тупиковой ситуации. Сравните это с поведением класса AutoResetEvent: если вы сигнализируете об AutoResetEvent, вызывая его метод Set, и нет ожидающих потоков, AutoResetEvent остается в сигнальном состоянии до тех пор, пока поток не вызовет WaitOne. , WaitAny или WaitAll. AutoResetEvent освобождает этот поток и возвращается в несигнальное состояние.

В моем приложении Winform, если я вызываю Monitor.Pulse () в событии нажатия кнопки, многие потоки не получат сигнал (тогда как AutoResetEvent останется сигнальным состоянием)! Я написал простую процедуру, чтобы продемонстрировать это:

privatestaticreadonlystring_locker = ”THREAD_LOCKER” ;
publicstaticvoidMain ()
{
for (inti = 0; i ‹5; i ++)
{< br /> Threadt = newThread (DoWork);
t.Name =
”T” + (i + 1);
t.IsBackground = true;
t .Start ();
}

//Thread.Sleep(500);

Monitor.Enter (_locker); < br /> Console.WriteLine (
«Основной поток» );
Monitor.Pulse (_locker);
Monitor.Exit (_locker);
}

privatestaticvoidDoWork ()
{
Monitor.Enter (_locker);
Monitor.Wait (_locker);
Monitor.Pulse (_locker);
Monitor.Exit (_locker);
Console.WriteLine (Thread.CurrentThread.Name +
"завершено и существует" );
}

Удаление «Thread.Sleep(500)» * ОЧЕНЬ ВОЗМОЖНО * приведет к работе менее 5 потоков, потому что создание 5 потоков требует не короткого времени (объект kenel, TEB, стек kenel / пользователя), в течение периода времени один только что созданный поток (T2) может получить сигнал или не получить (гораздо больше), потому что, когда ранее созданный поток (T1), вызывающий 'Monitor.Pulse (_locker)' T2, НЕ был настроен, T2 и поток, созданный позже, не будут иметь шансов получить сигнал! Они будут ждать ... Таким образом, 0,5 секунды используются, чтобы дать время для создания 5 потоков, в противном случае основной поток немедленно завершится и будет собран фоновый поток.

Решение №4: Создайте временное целое число в каждом потоке и после завершения добавьте временное значение к x под исключительной блокировкой.

private readonly string m_Locker = "THREAD_LOCKER";
 
 void IncreaseNumber(object objThreadName)
 {
     int tmp = 0;
     for (int i = 0; i < 10000; i++)
         tmp++;
 
     lock (m_Locker)
         x += tmp;
 }

Решение №5: ReaderWriterLockSlim.

void IncreaseNumber(object objThreadName)
 {
     // Or we can use ReaderWriterLock.AcquireWriterLock(500) but it has more performance overhead and is not recommended
     m_ReaderWriterLocker.EnterWriteLock(); 
     for (int i = 0; i < 10000; i++)
         x++;
     m_ReaderWriterLocker.ExitWriteLock();  // Or ReaderWriterLock.ReleaseWriterLock();
 }

Обратите внимание, что использование класса ReaderWriterLock не рекомендуется, поскольку он выполняется примерно в пять раз дольше, чем вызов метода Enter монитора. см. Блокировки чтения / записи и библиотека ResourceLock Джеффри Рихтера.

Решение № 6: Parallel.Forcoordinate с Interlocked.Increment.

Parallel.For(0, 100000, (i) => Interlocked.Increment(ref x));

Вывод

В этом посте я взял простой и понятный пример: 10 потоков, одновременно работающих в поле, чтобы поэкспериментировать с атомарностью в C # .Net, используя технологию синхронизации, включая эксклюзивную блокировку, сигнализацию, неблокирующую синхронизацию, я думаю, это очень хороший пример для освоения базовых библиотек / концепций потоков FCL, таких как Interlocked, Monitor, MemoryBarrier, volatile, AutoResetEvent, ReaderWriterLockSlim и т. д.

Многопоточное программирование действительно очень сложно, во время моего расследования я случайно увидел, что даже Джон Скит признал, что у него открылись глаза на точное значение volatile, которое не« всегда читается из основной памяти, всегда записывается напрямую в основную память . (Ссылка), так что, как новичку в этой области, я должен приложить больше усилий :)

Дальнейшее чтение

Threading In C # (настоятельно рекомендуется !!!)

Линеаризуемость Википедия

Представляем новый ReaderWriterLockSlim в Orcas

Асинхронные блоки кода

Атомные операции

Повышение производительности с помощью атомарных операций в .NET 4