Как маршрутизировать, группировать или иным образом разделять сообщения на согласованные наборы с помощью потока данных TPL

Я новичок в потоке данных TPL и ищу конструкцию, которая позволит разделить список исходных сообщений для равномерно распределенной параллельной обработки при сохранении порядка сообщения сообщений через отдельные конвейеры. Есть ли в API DataFlow какой-то конкретный блок или концепция, которые можно использовать для достижения этой цели, или это больше вопрос предоставления связующего кода или пользовательских блоков между существующими блоками?

Для тех, кто знаком с Akka.NET: я ищу функциональность, аналогичную маршрутизатор ConsistentHashing, которые позволяют отправлять сообщения на один маршрутизатор, который затем перенаправляет эти сообщения на отдельные маршруты для обработки.

Синхронный пример:

var count = 100000;
var processingGroups = 5;
var source = Enumerable.Range(1, count);

// Distribute source elements consistently and evenly into a specified set of groups (ex. 5) so that.
var distributed = source.GroupBy(s => s % processingGroups);

// Within each of the 5 processing groups go through each item and add 1 to it
var transformed = distributed.Select(d => d.Select(i => i + 3).ToArray());

List<int[]> result = transformed.ToList();
Check.That(result.Count).IsEqualTo(processingGroups);
for (int i = 0; i < result.Count; i++)
{
    var outputGroup = result[i];

    var expectedRange = Enumerable.Range(i + 1, count/processingGroups).Select((e, index) => e + (index * (processingGroups - 1)) + 3);
    Check.That(outputGroup).ContainsExactly(expectedRange);
}

person jpierson    schedule 02.12.2016    source источник
comment
Есть ли причина иметь несколько отдельных конвейеров вместо одного конвейера, настроенного с помощью MaxDegreeOfParallelism?   -  person Theodor Zoulias    schedule 25.06.2020
comment
К сожалению, я уже не помню, в каком контексте был задан вопрос.   -  person jpierson    schedule 26.06.2020
comment
Без проблем, мой друг. В эти дни я читаю старые вопросы в теге tpl-dataflow и прошу людей разъяснить их многолетние проблемы. :-)   -  person Theodor Zoulias    schedule 26.06.2020


Ответы (2)


В общем, я не думаю, что то, что вы ищете, предварительно создано в Dataflow, как это может быть с маршрутизатором ConsistentHashing. Однако, добавляя идентификатор к частям данных, которые вы хотите передать, вы можете обрабатывать их в любом порядке, параллельно и переупорядочивать их после завершения обработки.

public class Message {
        public int MessageId { get; set; }
        public int GroupId { get; set; }        
        public int Value { get; set; }
    }

    public class MessageProcessing
    {
        public void abc() {
            var count = 10000;
            var groups = 5;
            var source = Enumerable.Range(0, count);

            //buffer all input
            var buffer = new BufferBlock<IEnumerable<int>>();

            //split each input enumerable into processing groups
            var messsageProducer = new TransformManyBlock<IEnumerable<int>, Message>(ints => 
            ints.Select((i, index) => new Message() { MessageId = index, GroupId = index % groups, Value = i }).ToList());

            //process each message, one action block may process any group id in any order
            var processMessage = new TransformBlock<Message, Message>(msg => 
            {
                msg.Value++;
                return msg;
            }, new ExecutionDataflowBlockOptions() {
                MaxDegreeOfParallelism = groups
            });

            //output of processed message values
            int[] output = new int[count];

            //insert messages into array in the order the started in
            var regroup = new ActionBlock<Message>(msg => output[msg.MessageId] = msg.Value, 
                new ExecutionDataflowBlockOptions() {
                    MaxDegreeOfParallelism = 1
                });
        }        

    }

В примере GroupId сообщения не используется, но его можно использовать в более полном примере для координации групп сообщений. Кроме того, обработка последующих сообщений в буферном блоке может быть выполнена путем изменения выходного массива на список и настройки соответствующего элемента списка каждый раз, когда перечисляемое число целых чисел отправляется в буферный блок. В зависимости от вашего конкретного использования вам может потребоваться поддержка нескольких пользователей вывода, и это можно свернуть обратно в поток.

person JSteward    schedule 25.12.2016

Вы можете динамически создать конвейер с связыванием блоков между собой на основе по предикату:

var count = 100;
var processingGroups = 5;
var source = Enumerable.Range(1, count);

var buffer = new BufferBlock<int>();
var consumer1 = new ActionBlock<int>(i => {  });
var consumer2 = new ActionBlock<int>(i => {  });
var consumer3 = new ActionBlock<int>(i => {  });
var consumer4 = new ActionBlock<int>(i => { Console.WriteLine(i); });
var consumer5 = new ActionBlock<int>(i => {  });

buffer.LinkTo(consumer1, i => i % 5 == 1);
buffer.LinkTo(consumer2, i => i % 5 == 2);
buffer.LinkTo(consumer3, i => i % 5 == 3);
buffer.LinkTo(consumer4, i => i % 5 == 4);
buffer.LinkTo(consumer5);

foreach (var i in source)
{
    buffer.Post(i);
    // consider async option if you able to do it
    // await buffer.SendAsync(i);
}
buffer.Complete();
Console.ReadLine();

Приведенный выше код будет записывать только числа из 4-й группы, обрабатывая другие группы молча, но я надеюсь, что вы поняли идею. Существует общая практика связывать блок хотя бы с одним потребителем без фильтрации сообщений, которые не удаляются, если они не приняты ни одним потребителем, и вы можете сделать это, если у вас нет обработчика по умолчанию (NullTarget<int> просто игнорирует все полученные сообщения):

buffer.LinkTo(DataflowBlock.NullTarget<int>());

Недостатком этого является продолжение его преимуществ: вы должны предоставить предикаты, так как для этого нет встроенных структур. Однако это все же можно было сделать.

person VMAtm    schedule 26.01.2017