Поток данных TPL, BroadcastBlock в BatchBlocks

У меня проблема с подключением BroadcastBlock(s) к BatchBlocks. Сценарий таков, что источники BroadcastBlocks, а получатели BatchBlocks.

В приведенном ниже упрощенном коде выполняется только один из дополнительных блоков действий. Я даже установил размер партии для каждого BatchBlock равным 1, чтобы проиллюстрировать проблему.

Установка для Greedy значения «true» приведет к выполнению 2 ActionBlocks, но это не то, чего я хочу, поскольку это приведет к продолжению BatchBlock, даже если оно еще не завершено. Есть идеи?

class Program
{
    static void Main(string[] args)
    {
        // My possible sources are BroadcastBlocks. Could be more
        var source1 = new BroadcastBlock<int>(z => z);

        // batch 1
        // can be many potential sources, one for now
        // I want all sources to arrive first before proceeding
        var batch1 = new BatchBlock<int>(1, new GroupingDataflowBlockOptions() { Greedy = false }); 
        var batch1Action = new ActionBlock<int[]>(arr =>
        {
            // this does not run sometimes
            Console.WriteLine("Received from batch 1 block!");
            foreach (var item in arr)
            {
                Console.WriteLine("Received {0}", item);
            }
        });

        batch1.LinkTo(batch1Action, new DataflowLinkOptions() { PropagateCompletion = true });

        // batch 2
        // can be many potential sources, one for now
        // I want all sources to arrive first before proceeding
        var batch2 = new BatchBlock<int>(1, new GroupingDataflowBlockOptions() { Greedy = false  });
        var batch2Action = new ActionBlock<int[]>(arr =>
        {
            // this does not run sometimes
            Console.WriteLine("Received from batch 2 block!");
            foreach (var item in arr)
            {
                Console.WriteLine("Received {0}", item);
            }
        });
        batch2.LinkTo(batch2Action, new DataflowLinkOptions() { PropagateCompletion = true });

        // connect source(s)
        source1.LinkTo(batch1, new DataflowLinkOptions() { PropagateCompletion = true });
        source1.LinkTo(batch2, new DataflowLinkOptions() { PropagateCompletion = true });

        // fire
        source1.SendAsync(3);

        Task.WaitAll(new Task[] { batch1Action.Completion, batch2Action.Completion }); ;

        Console.ReadLine();
    }
}

person alpinescrambler    schedule 25.04.2015    source источник
comment
Я думаю, что установка Greedy на true является правильным решением. Это не приведет к созданию меньших партий, если вы об этом беспокоитесь.   -  person svick    schedule 30.08.2015


Ответы (1)


Похоже, что во внутреннем механизме библиотеки потоков данных TPL, которая поддерживает нежадные функции, есть изъян. Что происходит, так это то, что BatchBlock, настроенный как нежадный, будет Postpone все предлагаемые сообщения из связанных блоков, вместо того, чтобы принимать их. Он поддерживает внутреннюю очередь с отложенными сообщениями, и когда их количество достигает своего BatchSize он пытается использовать отложенные сообщения и в случае успеха распространяет их вниз по течению, как и ожидалось. Проблема в том, что исходные блоки, такие как BroadcastBlock и BufferBlock, перестанут предлагать больше сообщений блоку, который отложил ранее предложенное сообщение, до тех пор, пока он не израсходует это единственное сообщение. Сочетание этих двух вариантов поведения приводит к тупиковой ситуации. Прогресс вперед невозможен, потому что BatchBlock ждет, пока будут предложены дополнительные сообщения, прежде чем использовать отложенные, а BroadcastBlock ждет, пока будут использованы отложенные сообщения, прежде чем предлагать больше сообщений...

Этот сценарий происходит только с BatchSize больше единицы (что является типичной конфигурацией для этого блока).

здесь показана демонстрация этой проблемы. В качестве источника используется более распространенный BufferBlock вместо BroadcastBlock. 10 сообщений отправляются в конвейер из трех блоков, и ожидаемое поведение заключается в том, что сообщения проходят через конвейер к последнему блоку. На самом деле ничего не происходит, и все сообщения остаются в первом блоке.

using System;
using System.Threading;
using System.Threading.Tasks.Dataflow;

public static class Program
{
    static void Main(string[] args)
    {
        var bufferBlock = new BufferBlock<int>();

        var batchBlock = new BatchBlock<int>(batchSize: 2,
            new GroupingDataflowBlockOptions() { Greedy = false });

        var actionBlock = new ActionBlock<int[]>(batch =>
            Console.WriteLine($"Received: {String.Join(", ", batch)}"));

        bufferBlock.LinkTo(batchBlock,
            new DataflowLinkOptions() { PropagateCompletion = true });

        batchBlock.LinkTo(actionBlock,
            new DataflowLinkOptions() { PropagateCompletion = true });

        for (int i = 1; i <= 10; i++)
        {
            var accepted = bufferBlock.Post(i);
            Console.WriteLine(
                $"bufferBlock.Post({i}) {(accepted ? "accepted" : "rejected")}");
            Thread.Sleep(100);
        }

        bufferBlock.Complete();
        actionBlock.Completion.Wait(millisecondsTimeout: 1000);
        Console.WriteLine();
        Console.WriteLine($"bufferBlock.Completion: {bufferBlock.Completion.Status}");
        Console.WriteLine($"batchBlock.Completion:  {batchBlock.Completion.Status}");
        Console.WriteLine($"actionBlock.Completion: {actionBlock.Completion.Status}");
        Console.WriteLine($"bufferBlock.Count: {bufferBlock.Count}");
    }
}

Выход:

bufferBlock.Post(1) accepted
bufferBlock.Post(2) accepted
bufferBlock.Post(3) accepted
bufferBlock.Post(4) accepted
bufferBlock.Post(5) accepted
bufferBlock.Post(6) accepted
bufferBlock.Post(7) accepted
bufferBlock.Post(8) accepted
bufferBlock.Post(9) accepted
bufferBlock.Post(10) accepted

bufferBlock.Completion: WaitingForActivation
batchBlock.Completion:  WaitingForActivation
actionBlock.Completion: WaitingForActivation
bufferBlock.Count: 10

Я предполагаю, что внутренний предложение-потреблять-резерв-механизм выпуска настроен для максимальной эффективности при поддержке BoundedCapacity, которая имеет решающее значение для многих приложений, а редко используемая функциональность Greedy = false осталась без тщательного тестирования.

Хорошая новость заключается в том, что в вашем случае вам не нужно устанавливать Greedy в false. BatchBlock в жадном режиме по умолчанию не будет распространять меньше сообщений, чем настроенное BatchSize, если только он не был помечен как завершенный и не распространяет оставшиеся сообщения, или вы вручную не вызываете его TriggerBatch в любой произвольный момент. Нежадная конфигурация предназначена для предотвращения нехватки ресурсов в сложные графические сценарии с множественными зависимостями между блоками.

person Theodor Zoulias    schedule 23.06.2020