Задача SendAsync потока данных TPL никогда не завершается, когда блок связан

Я надеялся на чистое решение для регулирования определенного типа производителя, пока потребитель занят обработкой, без написания собственного настраиваемого блока. Я надеялся, что приведенный ниже код сделает именно это, но как только SendAsync блокируется после достижения предела емкости, его задача никогда не завершается, намекая, что отложенное сообщение никогда не будет использовано.

_block = new TransformBlock<int, string>(async i =>
{
    // Send the next request to own input queue
    // before processing this request, or block
    // while pipeline is full. 
    // Do not start processing if pipeline is full!
    await _block.SendAsync(i + 1);
    // Process this request and pass it on to the
    // next block in the pipeline.
    return i.ToString();
}, 
// TransformBlock block has input and output buffers. Limit both, 
// otherwise requests that cannot be passed on to the next 
// block in the pipeline will be cached in this block's output 
// buffer, never throttling this block.
new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });

// This block is linked to the output of the 
// transform block. 
var action = new ActionBlock<string>(async i =>
{
    // Do some very long processing on the transformed element.
    await Task.Delay(1000);
}, 
// Limit buffer size, and consequently throttle previous blocks 
// in the pipeline.
new ExecutionDataflowBlockOptions { BoundedCapacity = 5 });
_block.LinkTo(action);

// Start running.
_block.Post(0);

Мне было интересно, есть ли причина, по которой связанный ActionBlock не использует отложенное сообщение.


person S. Alexander    schedule 28.08.2017    source источник
comment
чистое решение для ограничения определенного типа производителя Пожалуйста, подробно опишите конкретную проблему, которую вы пытаетесь решить. Ваш текущий код не может передать i в ActionBlock, если он не может отправить значение самому себе, на самом деле код кажется ненужным. Какую проблему вы на самом деле пытаетесь решить?   -  person JSteward    schedule 29.08.2017
comment
У вас есть бесконечный цикл в коде, как вы думаете, он может завершиться?   -  person VMAtm    schedule 29.08.2017
comment
Извините, похоже, мой вопрос был непонятен. Пример кода - это упрощение реального кода. TransformBlock - мой бесконечный продюсер. Я никогда не жду, чтобы он был полным. Я ожидаю, что он будет продолжать создавать сообщения для отправки самому себе, с одной стороны, и преобразовывать их во что-то еще для своего потребителя (ActionBlock). Я хотел иметь возможность дросселировать TransformBlock на случай, если его потребитель работает слишком медленно, блокируя его, когда он отправляет самому себе, и впоследствии задерживает сообщение для потребителя. Моя проблема с приведенным выше примером кода заключается в том, что SendAsync зависает после N раз.   -  person S. Alexander    schedule 29.08.2017
comment
От SendAsync в MSDN: если цель откладывает предложенный элемент, элемент будет буферизирован до тех пор, пока цель не потребляет или не освобождает его, после чего задача будет завершена, а ее Результат будет указывать, было ли получено сообщение. Если цель никогда не пытается обработать или выпустить сообщение, возвращенная задача никогда не будет завершена.   -  person S. Alexander    schedule 29.08.2017
comment
TransformBlock не должен быть бесконечным производителем. Если вам нужно что-то просто для непрерывной отправки значений в конвейер, вы можете использовать простой цикл с SendAsync в теле. Это позволит BoundedCapacity вашего ActionBlock задушить вашего производителя.   -  person JSteward    schedule 30.08.2017
comment
@JSteward, не должно быть бесконечного продюсера? Почему? Чем он отличается от ActionBlock как бесконечного производителя, как предлагается здесь: stackoverflow.com/questions/13695499/   -  person S. Alexander    schedule 30.08.2017
comment
Этот ActionBlock вызывает действие каждые 10 секунд и не регулируется. Проблема с вашим кодом вызвана попыткой SendAsync перейти к TransformBlock, когда его буфер заполнен. TransformBlock в вашем примере ничего не делает, что указывает на необходимость TransformBlock, достаточно простого цикла, и его будет легко регулировать. Чтобы использовать ответ, на который вы ссылались, вы должны сохранить ActionBlock в качестве производителя и SendAsync оттуда до вашего текущего связанного ActionBlock, а затем отправить значение обратно производителю.   -  person JSteward    schedule 30.08.2017
comment
По сути, ваш потребитель должен быть источником регулирования, поскольку только он может объявить, сколько он может обработать, посредством своего BoundedCapacity. Прямо сейчас вы пытаетесь сделать вывод о мощности потребителя, установив BoundedCapacity для своего производителя, что логически ошибочно.   -  person JSteward    schedule 30.08.2017
comment
@JSteward, спасибо за ваши комментарии - я обновил свой пример кода, чтобы лучше проиллюстрировать то, чего я пытаюсь достичь. TransformBlock необходим в реальном коде, потому что он действительно трансформируется. Настоящий код - это конвейер, состоящий из множества связанных между собой блоков. Я не хотел делать вывод о пределах потребителя для производителя, но, как вы можете видеть из приведенного выше примера кода, у меня нет выбора, поскольку у производителя есть выходной буфер.   -  person S. Alexander    schedule 31.08.2017


Ответы (1)


Я столкнулся с той же проблемой, что и вы. Я не углублялся в реализацию LinkTo, но я думаю, что сообщение распространяется только тогда, когда исходный блок его получил. Я имею в виду, что может быть случай, когда исходный блок имеет несколько сообщений на входе, но он не будет их обрабатывать до следующего полученного сообщения Post / SendAsync. И это ваше дело.

Вот мое решение, и оно работает для меня.

Сначала объявите "двигатель"

/// <summary>
/// Engine-class (like a car engine) that produced a lot count (or infinite) of actions.
/// </summary>
public class Engine
{
    private BufferBlock<int> _bufferBlock;

    /// <summary>
    /// Creates source block that produced stub data.
    /// </summary>
    /// <param name="count">Count of actions. If count = 0 then it's infinite loop.</param>
    /// <param name="boundedCapacity">Bounded capacity (throttling).</param>
    /// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
    /// <returns>Source block that constantly produced 0-value.</returns>
    public ISourceBlock<int> CreateEngine(int count, int boundedCapacity, CancellationToken cancellationToken)
    {
        _bufferBlock = new BufferBlock<int>(new DataflowBlockOptions { BoundedCapacity = boundedCapacity });

        Task.Run(async () =>
        {
            var counter = 0;
            while (count == 0 || counter < count)
            {
                await _bufferBlock.SendAsync(0);
                if (cancellationToken.IsCancellationRequested)
                    return;
                counter++;
            }
        }, cancellationToken).ContinueWith((task) =>
        {
            _bufferBlock.Complete();
        });

        return _bufferBlock;
    }
}

Затем продюсер, использующий движок

/// <summary>
/// Producer that generates random byte blobs with specified size.
/// </summary>
public class Producer
{
    private static Random random = new Random();

    /// <summary>
    /// Returns source block that produced byte arrays. 
    /// </summary>
    /// <param name="blobSize">Size of byte arrays.</param>
    /// <param name="count">Total count of blobs (if 0 then infinite).</param>
    /// <param name="boundedCapacity">Bounded capacity (throttling).</param>
    /// <param name="cancellationToken">Cancellation token (used to stop infinite loop).</param>
    /// <returns>Source block.</returns>
    public static ISourceBlock<byte[]> BlobsSourceBlock(int blobSize, int count, int boundedCapacity, CancellationToken cancellationToken)
    {
        // Creating engine with specified bounded capacity.
        var engine = new Engine().CreateEngine(count, boundedCapacity, cancellationToken);

        // Creating transform block that uses our driver as a source.
        var block = new TransformBlock<int, byte[]>(
            // Useful work.
            i => CreateBlob(blobSize),
            new ExecutionDataflowBlockOptions
            {
                // Here you can specify your own throttling. 
                BoundedCapacity = boundedCapacity,
                MaxDegreeOfParallelism = Environment.ProcessorCount,
            });
        // Linking engine (and engine is already working at that time).
        engine.LinkTo(block, new DataflowLinkOptions { PropagateCompletion = true });
        return block;
    }

    /// <summary>
    /// Simple random byte[] generator.
    /// </summary>
    /// <param name="size">Array size.</param>
    /// <returns>byte[]</returns>
    private static byte[] CreateBlob(int size)
    {
        var buffer = new byte[size];
        random.NextBytes(buffer);
        return buffer;
    }
}

Теперь вы можете использовать производителя с потребителем (например, ActionBlock)

        var blobsProducer = BlobsProducer.CreateAndStartBlobsSourceBlock(0, 1024 * 1024, 10, cancellationTokenSource.Token);

        var md5Hash = MD5.Create();

        var actionBlock = new ActionBlock<byte[]>(b => 
        {
            Console.WriteLine(GetMd5Hash(md5Hash, b));
        },
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 10 });

        blobsProducer.LinkTo(actionBlock);

Надеюсь, это поможет вам!

person xneg    schedule 23.02.2018