Преобразование моего кода на основе C# BlockingCollection в поток данных TPL

У меня есть конкретная проблема, которую, я уверен, можно решить с помощью потока данных TPL. Я просто новичок в этом, поэтому мне нужна ваша помощь, чтобы ускорить мое понимание. Мой код сейчас такой:

Текущий код

где Процесс1, Процесс2, Процесс3 — это Задача. Объекты передаются из одного блока в другой блок с блокирующей коллекцией. Я хотел бы сделать это:

Нужно так

Я читал о TransformBlock, ActionBlock и BatchBlock. Можете ли вы помочь мне, как использовать эти классы для достижения вышеуказанного дизайна.


person Subhash Makkena    schedule 26.12.2016    source источник
comment
Не размещайте код в виде изображения.   -  person VMAtm    schedule 29.12.2016
comment
Связано: поток данных TPL, альтернатива ограничениям JoinBlock?   -  person Theodor Zoulias    schedule 11.06.2020


Ответы (1)


Вы не предоставили никаких подробностей о том, что ваши блоки Process2 и Process3 делают с входными данными, поэтому я сделал предположение, что каждый из них выполняет какое-то уникальное преобразование с исходными объектами, так что выходной список объектов содержит объединенные результаты обоих процессов. .

using System;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace TPLDataFlowExample1
{
    class Program
    {
        static void Main(string[] args)
        {
            var inputListOfObjects = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };

            // Process1 block
            var process1 = new TransformBlock<int, int>(i => i * 2);
            // Broadcast block which passes objects to Process2 and Process3
            var broadCast = new BroadcastBlock<int>(null);
            // Process2 block
            var process2 = new TransformBlock<int, string>(i => $"Process 2: {i}");
            // Process3 block
            var process3 = new TransformBlock<int, string>(i => $"Process 3: {i}");
            // Just simple action block which will print the result
            var print = new ActionBlock<string>(s => Console.WriteLine(s));

            // Link the output of Process1 block with the input of Broadcast block. Propagate completion to the next block.
            process1.LinkTo(broadCast, new DataflowLinkOptions { PropagateCompletion = true });
            // Link the output of Broadcast block with the input of Process2 block. Propagate completion to the next block.
            broadCast.LinkTo(process2, new DataflowLinkOptions { PropagateCompletion = true });
            // Link the output of Broadcast block with the input of Process3 block. Propagate completion to the next block.
            broadCast.LinkTo(process3, new DataflowLinkOptions { PropagateCompletion = true });
            // Link the output of Process2 block with the input of Print block. 
            process2.LinkTo(print);
            // Link the output of Process2 block with the input of print block. 
            process3.LinkTo(print);

            // We didn't propagate completion to Print block because it must complete when both Process2 and Process3 blocks are in Completed state.
            Task.WhenAll(process2.Completion, process3.Completion).ContinueWith(_ => print.Complete());

            // Post data to the Process1 block
            foreach (var obj in inputListOfObjects)
            {
                process1.Post(obj);
            }

            // Mark the Process1 block as complete
            process1.Complete();
            // Wait for the last block to process all messages
            print.Completion.Wait();
        }
    }
}


// Output:
//
// Process 2: 2
// Process 3: 2
// Process 3: 4
// Process 3: 6
// Process 3: 8
// Process 3: 10
// Process 3: 12
// Process 3: 14
// Process 3: 16
// Process 3: 18
// Process 3: 20
// Process 2: 4
// Process 2: 6
// Process 2: 8
// Process 2: 10
// Process 2: 12
// Process 2: 14
// Process 2: 16
// Process 2: 18
// Process 2: 20
// Press any key to continue . . .
person Alexey Andrushkevich    schedule 28.12.2016