Очевидная гонка/ошибка BufferBlock.Post/Receive/ReceiveAsync

отправлено на http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

Я знаю... На самом деле я не использую TplDataflow по максимуму. ATM Я просто использую BufferBlock как безопасную очередь для передачи сообщений, где производитель и потребитель работают с разной скоростью. Я вижу какое-то странное поведение, которое ставит меня в тупик относительно того, как действовать дальше.

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

В приведенном выше коде (который является частью распределенного решения на 2000 строк) Send вызывается периодически каждые 100 мс или около того. Это означает, что элемент преобразуется из Post в messageQueue примерно 10 раз в секунду. Это проверено. Однако иногда оказывается, что ReceiveAsync не завершается в течение тайм-аута (т. е. Post не вызывает завершения ReceiveAsync), а TimeoutException поднимается после 30 секунд. На данный момент messageQueue.Count исчисляется сотнями. Это неожиданно. Эта проблема наблюдалась и при более низкой скорости публикации (1 сообщение в секунду) и обычно возникает до того, как 1000 элементов прошли через BufferBlock.

Итак, чтобы обойти эту проблему, я использую следующий код, который работает, но иногда вызывает задержку в 1 секунду при получении (из-за ошибки, описанной выше):

    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

Для меня это похоже на состояние гонки в TDF, но я не могу понять, почему этого не происходит в других местах, где я использую BufferBlock аналогичным образом. Экспериментальное изменение с ReceiveAsync на Receive не помогает. Я не проверял, но думаю, что приведенный выше код работает отлично. Этот шаблон я видел в документе «Введение в поток данных TPL» tpldataflow.docx. .

Что я могу сделать, чтобы добраться до сути этого? Существуют ли какие-либо показатели, которые могут помочь сделать вывод о том, что происходит? Если я не могу создать надежный тестовый пример, какую дополнительную информацию я могу предложить?

Помощь!


person spender    schedule 09.04.2012    source источник
comment
Я не вижу ничего плохого в том, что вы делаете, или в том, что вы здесь ожидаете. Я определенно думаю, что вам нужно поддерживать активность на форумах MSDN больше, чем здесь. Вы уже привлекли внимание @StephenToub, и он определенно тот парень, которого вы хотите изучить.   -  person Drew Marsh    schedule 11.04.2012
comment
Неа. Никогда не докапывался до сути. Мне не удалось воспроизвести проблему на небольшом автономном примере. Поскольку я использовал только BufferBlock, вместо этого я реализовал собственную асинхронную очередь. Мне не пришлось менять какой-либо другой код... Я просто повторно реализовал те части интерфейса BufferBlock, которые использовал. Теперь работает хорошо, что заставляет меня думать, что что-то не так, но я не могу этого доказать. Грр.   -  person spender    schedule 18.09.2012
comment
@spendor Очень интересно, как ни странно, я отказался от своей собственной реализации асинхронной параллельной очереди после обнаружения BufferBlock ... теперь мне придется пересмотреть свое решение. Спасибо.   -  person Andrew Hanlon    schedule 20.09.2012
comment
кто-нибудь знает, это все еще проблема?   -  person Eyal Perry    schedule 29.05.2018
comment
@EyalPerry Я использовал (и проповедовал) поток данных для ряда других проектов и с тех пор никогда не сталкивался с этой проблемой. Учитывая зрелость продукта сейчас по сравнению с 6 лет назад, я был бы очень удивлен, если бы это все еще было проблемой.   -  person spender    schedule 29.05.2018
comment
В настоящее время у меня есть эта проблема. Кто-нибудь когда-нибудь понял это?   -  person Matt Ellen    schedule 01.04.2019


Ответы (1)


Стивен, кажется, думает, что следующее решение

var m = ожидание messageQueue.ReceiveAsync();

вместо:

var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

Вы можете это подтвердить или опровергнуть?

person Peter Ritchie    schedule 16.05.2012
comment
Это не сработало. Неважно, какую перегрузку ReceiveAsync я выбрал, результат был один и тот же. См. мой комментарий выше для моего решения. - person spender; 18.09.2012