Лучший способ сделать мой поток Parallel.ForEach безопасным?

Я хотел бы сделать следующий код потокобезопасным. К сожалению, я безуспешно пытался заблокировать этот код на разных уровнях. Единственный случай, когда я могу добиться потокобезопасности, — это установить блокировку вокруг всего цикла, что фактически делает Parallel.ForEach не быстрее (возможно, даже медленнее), чем просто использование foreach. Код относительно/почти безопасен без блокировки. Кажется, что он показывает лишь небольшие различия в суммировании ключей geneTokens.Value[-1] и ключей gtCandidates.Value[-1] примерно один раз из каждых 20 или около того выполнений.

Я понимаю, что Dictionary не является потокобезопасным. Однако я не могу изменить этот конкретный объект на ConcurrentDictionary без серьезного снижения производительности. Я бы предпочел запустить эту часть кода с помощью обычного foreach, чем изменить этот конкретный объект. Однако я использую ConcurrentDictionary для хранения отдельных объектов Dictionary. Я также пытался внести это изменение, и оно не решает мою проблему с расой.

Вот мои переменные уровня класса:

//Holds all tokens derived from each sequence chunk
public static ConcurrentBag<sequenceItem> tokenBag = 
  new ConcurrentBag<sequenceItem>();
public BlockingCollection<sequenceItem> sequenceTokens = new 
  BlockingCollection<sequenceItem>(tokenBag);
public ConcurrentDictionary<string, int> categories = new 
  ConcurrentDictionary<string, int>();
public ConcurrentDictionary<int, Dictionary<int, int>> gtStartingFrequencies = new 
  ConcurrentDictionary<int, Dictionary<int, int>>();
public ConcurrentDictionary<string, Dictionary<int, int>> gtCandidates = new 
  ConcurrentDictionary<string, Dictionary<int, int>>();
public ConcurrentDictionary<string, Dictionary<int, int>> geneTokens = new 
  ConcurrentDictionary<string, Dictionary<int, int>>();

Вот Parallel.ForEach:

Parallel.ForEach(sequenceTokens.GetConsumingEnumerable(), seqToken =>
{
  lock (locker)
  {
    //Check to see if the Sequence Token is a Gene Token
    Dictionary<int, int> geneTokenFreqs;
    if (geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs))
    { //The Sequence Token is a Gene Token 


      *****************Race Issue Seems To Occur Here**************************** 
      //Increment or create category frequencies for each category provided
      int frequency;
      foreach (int category in seqToken.categories)
      {
        if (geneTokenFreqs.TryGetValue(category, out frequency))
        {   //increment the category frequency, if it already exists
            frequency++;
            geneTokenFreqs[category] = frequency;
        }
        else
        {   //Create the category frequency, if it does not exist
            geneTokenFreqs.Add(category, 1);
        }
      }

      //Update the frequencies total [-1] by the total # of categories incremented.
      geneTokenFreqs[-1] += seqToken.categories.Length;
      ******************************************************************************
    }
    else
    { //The Sequence Token is NOT yet a Gene Token
      //Check to see if the Sequence Token is a Gene Token Candidate yet
      Dictionary<int, int> candidateTokenFreqs;
      if (gtCandidates.TryGetValue(seqToken.text, out candidateTokenFreqs))
      {
        *****************Race Issue Seems To Occur Here****************************
        //Increment or create category frequencies for each category provided
        int frequency;
        foreach (int category in seqToken.categories)
        {
          if (candidateTokenFreqs.TryGetValue(category, out frequency))
          { //increment the category frequency, if it already exists
            frequency++;
            candidateTokenFreqs[category] = frequency;
          }
          else
          { //Create the category frequency, if it does not exist
            candidateTokenFreqs.Add(category, 1);
          }
        }

        //Update the frequencies total [-1] by the total # of categories incremented.
        candidateTokenFreqs[-1] += seqToken.categories.Length;
        *****************************************************************************

        //Only update the candidate sequence count once per sequence
        if (candidateTokenFreqs[-3] != seqToken.sequenceId)
        {
          candidateTokenFreqs[-3] = seqToken.sequenceId;
          candidateTokenFreqs[-2]++;

          //Promote the Token Candidate to a Gene Token, if it has been found >=
          //the user defined candidateThreshold
          if (candidateTokenFreqs[-2] >= candidateThreshold)
          {
            Dictionary<int, int> deletedCandidate;
            gtCandidates.TryRemove(seqToken.text, out deletedCandidate);
            geneTokens.TryAdd(seqToken.text, candidateTokenFreqs);
          }
        }
      }
      else
      {
        //create a new token candidate frequencies dictionary by making 
        //a copy of the default dictionary from
        gtCandidates.TryAdd(seqToken.text, new 
          Dictionary<int, int>(gtStartingFrequencies[seqToken.sequenceId]));
      }
    }
  }
});

person Jake Drew    schedule 07.10.2012    source источник
comment
В этом коде есть и другие странности: как он позволяет увеличивать frequency без предварительной его инициализации?   -  person Tudor    schedule 07.10.2012
comment
Работает просто отлично, так как частота используется как выходная переменная для geneTokenFreqs.TryGetValue(). Единственный раз, когда он увеличивается, - это если переменная существует и возвращается из TryGetValue... Уверяю вас, код выполняется. Всю ночь гоняю :)   -  person Jake Drew    schedule 07.10.2012
comment
Извините, я не видел часть out. Тогда ладно.   -  person Tudor    schedule 07.10.2012


Ответы (2)


Очевидно, что одна гонка данных связана с тем, что некоторые потоки будут добавлять сюда элементы:

geneTokens.TryAdd(seqToken.text, candidateTokenFreqs);

и другие будут читать здесь:

if (geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs))
person Tudor    schedule 07.10.2012
comment
Ты прав. Я удалил их сверху. На самом деле они были там из предыдущего тестового прогона прямо перед тем, как я опубликовал это. Ранее я безуспешно пытался использовать geneTokens.Contains() с блокировкой прямо над geneTokenFreqs = geneTokens[seqToken.text]; Такой подход не решил мою проблему. - person Jake Drew; 07.10.2012
comment
@ Джейк Дрю: я внес изменения в то, в чем, по моему мнению, проблема. Вы можете попытаться заблокировать только эти части кода. - person Tudor; 07.10.2012
comment
Я уверен, как эффективно заблокировать TryAdd, не оборачивая блокировку вокруг всего блока, что возвращает меня к исходному вопросу. Я также безуспешно пробовал следующий шаблон: lock (locker) { isGeneToken = geneTokens.TryGetValue(seqToken.text, out geneTokenFreqs); } если (isGeneToken) - person Jake Drew; 07.10.2012

Как я использовал параллельный словарь в своем проекте:

Я ставлю флаг в словарь и проверяю из другого потока, есть ли флаг или нет. Если флаг присутствует, я выполняю свою задачу соответственно.

Для этого я делаю следующее:

1) Объявление Concurrent Dictionary 2) Добавление флага с помощью метода TryADD 3) Попытка получить квартиру с помощью TryGet Methid.

1) Декларирование

  Dim cd As ConcurrentDictionary(Of Integer, [String]) = New ConcurrentDictionary(Of Integer, String)()

2) Добавление

If cd.TryAdd(1, "uno") Then
        Console.WriteLine("CD.TryAdd() succeeded when it should have failed")
        numFailures += 1
    End If 

3) Получение

 If cd.TryGetValue(1, "uno") Then
        Console.WriteLine("CD.TryAdd() succeeded when it should have failed")
        numFailures += 1
    End If 
person Devendra Patel    schedule 06.01.2014