Я столкнулся с той же проблемой, что и вы. Я не углублялся в реализацию LinkTo, но я думаю, что сообщение распространяется только тогда, когда исходный блок его получил. Я имею в виду, что может быть случай, когда исходный блок имеет несколько сообщений на входе, но он не будет их обрабатывать до следующего полученного сообщения Post / SendAsync. И это ваше дело.
Вот мое решение, и оно работает для меня.
Сначала объявите "двигатель"
/// <summary>
/// Engine-class (like a car engine) that produced a lot count (or infinite) of actions.
/// </summary>
public class Engine
{
private BufferBlock<int> _bufferBlock;
/// <summary>
/// Creates source block that produced stub data.
/// </summary>
/// <param name="count">Count of actions. If count = 0 then it's infinite loop.</param>
/// <param name="boundedCapacity">Bounded capacity (throttling).</param>
/// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
/// <returns>Source block that constantly produced 0-value.</returns>
public ISourceBlock<int> CreateEngine(int count, int boundedCapacity, CancellationToken cancellationToken)
{
_bufferBlock = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = boundedCapacity });
Task.Run(async () =>
{
var counter = 0;
while (count == 0 || counter < count)
{
await _bufferBlock.SendAsync(0);
if (cancellationToken.IsCancellationRequested)
return;
counter++;
}
}, cancellationToken).ContinueWith((task) =>
{
_bufferBlock.Complete();
});
return _bufferBlock;
}
}
Затем продюсер, использующий движок
/// <summary>
/// Producer that generates random byte blobs with specified size.
/// </summary>
public class Producer
{
private static Random random = new Random();
/// <summary>
/// Returns source block that produced byte arrays.
/// </summary>
/// <param name="blobSize">Size of byte arrays.</param>
/// <param name="count">Total count of blobs (if 0 then infinite).</param>
/// <param name="boundedCapacity">Bounded capacity (throttling).</param>
/// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
/// <returns>Source block.</returns>
public static ISourceBlock<byte[]> BlobsSourceBlock(int blobSize, int count, int boundedCapacity, CancellationToken cancellationToken)
{
// Creating engine with specified bounded capacity.
var engine = new Engine().CreateEngine(count, boundedCapacity, cancellationToken);
// Creating transform block that uses our driver as a source.
var block = new TransformBlock<int, byte[]>(
// Useful work.
i => CreateBlob(blobSize),
new ExecutionDataflowBlockOptions
{
// Here you can specify your own throttling.
BoundedCapacity = boundedCapacity,
MaxDegreeOfParallelism = Environment.ProcessorCount,
});
// Linking engine (and engine is already working at that time).
engine.LinkTo(block, new DataflowLinkOptions { PropagateCompletion = true });
return block;
}
/// <summary>
/// Simple random byte[] generator.
/// </summary>
/// <param name="size">Array size.</param>
/// <returns>byte[]</returns>
private static byte[] CreateBlob(int size)
{
var buffer = new byte[size];
random.NextBytes(buffer);
return buffer;
}
}
Теперь вы можете использовать производителя с потребителем (например, ActionBlock)
var blobsProducer = BlobsProducer.CreateAndStartBlobsSourceBlock(0, 1024 * 1024, 10, cancellationTokenSource.Token);
var md5Hash = MD5.Create();
var actionBlock = new ActionBlock<byte[]>(b =>
{
Console.WriteLine(GetMd5Hash(md5Hash, b));
},
new ExecutionDataflowBlockOptions() { BoundedCapacity = 10 });
blobsProducer.LinkTo(actionBlock);
Надеюсь, это поможет вам!
person
xneg
schedule
23.02.2018
i
вActionBlock
, если он не может отправить значение самому себе, на самом деле код кажется ненужным. Какую проблему вы на самом деле пытаетесь решить? - person JSteward   schedule 29.08.2017TransformBlock
- мой бесконечный продюсер. Я никогда не жду, чтобы он был полным. Я ожидаю, что он будет продолжать создавать сообщения для отправки самому себе, с одной стороны, и преобразовывать их во что-то еще для своего потребителя (ActionBlock
). Я хотел иметь возможность дросселироватьTransformBlock
на случай, если его потребитель работает слишком медленно, блокируя его, когда он отправляет самому себе, и впоследствии задерживает сообщение для потребителя. Моя проблема с приведенным выше примером кода заключается в том, чтоSendAsync
зависает после N раз. - person S. Alexander   schedule 29.08.2017SendAsync
в MSDN: если цель откладывает предложенный элемент, элемент будет буферизирован до тех пор, пока цель не потребляет или не освобождает его, после чего задача будет завершена, а ее Результат будет указывать, было ли получено сообщение. Если цель никогда не пытается обработать или выпустить сообщение, возвращенная задача никогда не будет завершена. - person S. Alexander   schedule 29.08.2017TransformBlock
не должен быть бесконечным производителем. Если вам нужно что-то просто для непрерывной отправки значений в конвейер, вы можете использовать простой цикл сSendAsync
в теле. Это позволитBoundedCapacity
вашегоActionBlock
задушить вашего производителя. - person JSteward   schedule 30.08.2017ActionBlock
как бесконечного производителя, как предлагается здесь: stackoverflow.com/questions/13695499/ - person S. Alexander   schedule 30.08.2017ActionBlock
вызывает действие каждые 10 секунд и не регулируется. Проблема с вашим кодом вызвана попыткойSendAsync
перейти кTransformBlock
, когда его буфер заполнен.TransformBlock
в вашем примере ничего не делает, что указывает на необходимостьTransformBlock
, достаточно простого цикла, и его будет легко регулировать. Чтобы использовать ответ, на который вы ссылались, вы должны сохранитьActionBlock
в качестве производителя иSendAsync
оттуда до вашего текущего связанногоActionBlock
, а затем отправить значение обратно производителю. - person JSteward   schedule 30.08.2017BoundedCapacity
. Прямо сейчас вы пытаетесь сделать вывод о мощности потребителя, установивBoundedCapacity
для своего производителя, что логически ошибочно. - person JSteward   schedule 30.08.2017TransformBlock
необходим в реальном коде, потому что он действительно трансформируется. Настоящий код - это конвейер, состоящий из множества связанных между собой блоков. Я не хотел делать вывод о пределах потребителя для производителя, но, как вы можете видеть из приведенного выше примера кода, у меня нет выбора, поскольку у производителя есть выходной буфер. - person S. Alexander   schedule 31.08.2017