отправлено на http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9
Я знаю... На самом деле я не использую TplDataflow по максимуму. ATM Я просто использую BufferBlock
как безопасную очередь для передачи сообщений, где производитель и потребитель работают с разной скоростью. Я вижу какое-то странное поведение, которое ставит меня в тупик относительно того, как действовать дальше.
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
В приведенном выше коде (который является частью распределенного решения на 2000 строк) Send
вызывается периодически каждые 100 мс или около того. Это означает, что элемент преобразуется из Post
в messageQueue
примерно 10 раз в секунду. Это проверено. Однако иногда оказывается, что ReceiveAsync
не завершается в течение тайм-аута (т. е. Post
не вызывает завершения ReceiveAsync
), а TimeoutException
поднимается после 30 секунд. На данный момент messageQueue.Count
исчисляется сотнями. Это неожиданно. Эта проблема наблюдалась и при более низкой скорости публикации (1 сообщение в секунду) и обычно возникает до того, как 1000 элементов прошли через BufferBlock
.
Итак, чтобы обойти эту проблему, я использую следующий код, который работает, но иногда вызывает задержку в 1 секунду при получении (из-за ошибки, описанной выше):
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
Для меня это похоже на состояние гонки в TDF, но я не могу понять, почему этого не происходит в других местах, где я использую BufferBlock
аналогичным образом. Экспериментальное изменение с ReceiveAsync
на Receive
не помогает. Я не проверял, но думаю, что приведенный выше код работает отлично. Этот шаблон я видел в документе «Введение в поток данных TPL» tpldataflow.docx. .
Что я могу сделать, чтобы добраться до сути этого? Существуют ли какие-либо показатели, которые могут помочь сделать вывод о том, что происходит? Если я не могу создать надежный тестовый пример, какую дополнительную информацию я могу предложить?
Помощь!