Блок потока данных TPL потребляет всю доступную память

У меня есть TransformManyBlock со следующим дизайном:

  • Ввод: путь к файлу
  • Вывод: IEnumerable содержимого файла, по одной строке за раз

Я запускаю этот блок в огромном файле (61 ГБ), который слишком велик, чтобы поместиться в ОЗУ. Чтобы избежать неограниченного роста памяти, я установил BoundedCapacity на очень низкое значение (например, 1) для этого блока и всех последующих блоков. Тем не менее, блок явно жадно повторяет IEnumerable, который потребляет всю доступную память на компьютере, останавливая каждый процесс. OutputCount блока продолжает расти без ограничений, пока я не убью процесс.

Что я могу сделать, чтобы блок не потреблял IEnumerable таким образом?

РЕДАКТИРОВАТЬ: Вот пример программы, иллюстрирующей проблему:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

class Program
{
    static IEnumerable<string> GetSequence(char c)
    {
        for (var i = 0; i < 1024 * 1024; ++i)
            yield return new string(c, 1024 * 1024);
    }

    static void Main(string[] args)
    {
        var options = new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 };
        var firstBlock = new TransformManyBlock<char, string>(c => GetSequence(c), options);
        var secondBlock = new ActionBlock<string>(str =>
            {
                Console.WriteLine(str.Substring(0, 10));
                Thread.Sleep(1000);
            }, options);

        firstBlock.LinkTo(secondBlock);
        firstBlock.Completion.ContinueWith(task =>
            {
                if (task.IsFaulted) ((IDataflowBlock) secondBlock).Fault(task.Exception);
                else secondBlock.Complete();
            });

        firstBlock.Post('A');
        firstBlock.Complete();
        for (; ; )
        {
            Console.WriteLine("OutputCount: {0}", firstBlock.OutputCount);
            Thread.Sleep(3000);
        }
    }
}

Если вы используете 64-разрядную систему, обязательно снимите флажок «Предпочитать 32-разрядную версию» в Visual Studio. У меня на компе 16гб ОЗУ, и эта программа сразу потребляет каждый доступный байт.


person brianberns    schedule 23.06.2015    source источник
comment
ну ТБХ: мне некогда тут с тобой спорить - удачи   -  person Random Dev    schedule 23.06.2015
comment
если вы внимательно прочитаете остальную часть раздела, вы увидите, что он не работает так, как вы думаете - ваш firstBlock всегда предлагает все, что может произвести - если вы привяжете второй, он просто отклонит второй ввод и получит его позже   -  person Random Dev    schedule 23.06.2015


Ответы (3)


Вы, кажется, неправильно понимаете, как работает поток данных TPL.

BoundedCapacity ограничивает количество элементов, которые вы можете разместить в блоке. В вашем случае это означает один char в TransformManyBlock и один string в ActionBlock.

Таким образом, вы отправляете один элемент в TransformManyBlock, который затем возвращает 1024*1024 строк и пытается передать их ActionBlock, который принимает только одну за раз. Остальные строки просто останутся в очереди вывода TransformManyBlock.

Что вы, вероятно, захотите сделать, так это создать один блок и отправлять в него элементы в потоковом режиме, ожидая (синхронно или иным образом), когда его емкость будет достигнута:

private static void Main()
{
    MainAsync().Wait();
}

private static async Task MainAsync()
{
    var block = new ActionBlock<string>(async item =>
    {
        Console.WriteLine(item.Substring(0, 10));
        await Task.Delay(1000);
    }, new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });

    foreach (var item in GetSequence('A'))
    {
        await block.SendAsync(item);
    }

    block.Complete();
    await block.Completion;
}
person i3arnon    schedule 23.06.2015
comment
Спасибо. В итоге я создал новый блок, который инкапсулирует исходный ActionBlock и целевой BufferBlock. Блок действий использует SendAsync, как вы предлагаете для заполнения буфера. Для внешнего мира он ведет себя как TransformManyBlock с нужным мне поведением. - person brianberns; 24.06.2015
comment
@brianberns: Извините, если это глупый вопрос, но в чем разница между await block.SendAsync(item) и block.Post(item)? - person Bugmaster; 15.07.2015
comment
@Bugmaster Это вовсе не глупый вопрос: stackoverflow.com/a/13605979/885318 - person i3arnon; 15.07.2015
comment
@i3arnon: Спасибо, я не знал, что Post() вернется сразу же, несмотря ни на что, я думал, что он заблокируется, пока сообщение не будет использовано. Ой! - person Bugmaster; 15.07.2015

Кажется, что для создания ограниченного вывода TransformManyBlock необходимы три внутренних блока:

  1. TransformBlock, который получает входные данные и производит IEnumerable, потенциально работающие параллельно.
  2. Непараллельный ActionBlock, который перечисляет созданные IEnumerable и распространяет окончательные результаты.
  3. BufferBlock, где хранятся окончательные результаты с соблюдением желаемого BoundedCapacity.

Немного сложная часть заключается в том, как распространять завершение второго блока, потому что он не связан напрямую с третьим блоком. В приведенной ниже реализации метод PropagateCompletion написан в соответствии с исходный код библиотеки.

public static IPropagatorBlock<TInput, TOutput>
    CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, Task<IEnumerable<TOutput>>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    if (dataflowBlockOptions == null)
        throw new ArgumentNullException(nameof(dataflowBlockOptions));

    var input = new TransformBlock<TInput, IEnumerable<TOutput>>(transform,
        dataflowBlockOptions);
    var output = new BufferBlock<TOutput>(dataflowBlockOptions);
    var middle = new ActionBlock<IEnumerable<TOutput>>(async results =>
    {
        if (results == null) return;
        foreach (var result in results)
        {
            var accepted = await output.SendAsync(result).ConfigureAwait(false);
            if (!accepted) break; // If one is rejected, the rest will be rejected too
        }
    }, new ExecutionDataflowBlockOptions()
    {
        MaxDegreeOfParallelism = 1,
        BoundedCapacity = dataflowBlockOptions.MaxDegreeOfParallelism,
        CancellationToken = dataflowBlockOptions.CancellationToken,
        SingleProducerConstrained = true,
    });

    input.LinkTo(middle, new DataflowLinkOptions() { PropagateCompletion = true });
    PropagateCompletion(middle, output);

    return DataflowBlock.Encapsulate(input, output);

    async void PropagateCompletion(IDataflowBlock source, IDataflowBlock target)
    {
        try
        {
            await source.Completion.ConfigureAwait(false);
        }
        catch { }

        var exception = source.Completion.IsFaulted ? source.Completion.Exception : null;
        if (exception != null) target.Fault(exception); else target.Complete();
    }
}

// Overload with synchronous delegate
public static IPropagatorBlock<TInput, TOutput>
    CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
    Func<TInput, IEnumerable<TOutput>> transform,
    ExecutionDataflowBlockOptions dataflowBlockOptions)
{
    return CreateOutputBoundedTransformManyBlock<TInput, TOutput>(
        item => Task.FromResult(transform(item)), dataflowBlockOptions);
}

Пример использования:

var firstBlock = CreateOutputBoundedTransformManyBlock<char, string>(
    c => GetSequence(c), options);
person Theodor Zoulias    schedule 15.06.2020

Если коэффициент вывода конвейера ниже коэффициента отправки, сообщения будут накапливаться в конвейере до тех пор, пока не закончится память или не будет достигнут некоторый предел очереди. Если сообщения имеют значительный размер, процесс скоро будет нуждаться в памяти.

Установка BoundedCapacity в 1 приведет к отклонению сообщений очередью, если в очереди уже есть одно сообщение. Это нежелательное поведение, например, в таких случаях, как пакетная обработка. Прочтите этот сообщение, чтобы узнать больше.

Этот рабочий тест иллюстрирует мою точку зрения:

//Change BoundedCapacity to +1 to see it fail
[TestMethod]
public void stackOverflow()
{      
    var total = 1000;
    var processed = 0;
    var block = new ActionBlock<int>(
       (messageUnit) =>
       {
           Thread.Sleep(10);
           Trace.WriteLine($"{messageUnit}");
           processed++;
       },
        new ExecutionDataflowBlockOptions() { BoundedCapacity = -1 } 
   );

    for (int i = 0; i < total; i++)
    {
        var result = block.SendAsync(i);
        Assert.IsTrue(result.IsCompleted, $"failed for {i}");
    }

    block.Complete();
    block.Completion.Wait();

    Assert.AreEqual(total, processed);
}

Итак, мой подход заключается в том, чтобы ограничить публикацию, чтобы конвейер не накапливал много сообщений в очередях.

Ниже простой способ сделать это. Таким образом, поток данных продолжает обрабатывать сообщения на полной скорости, но сообщения не накапливаются, что позволяет избежать чрезмерного потребления памяти.

//Should be adjusted for specific use.
public void postAssync(Message message)
{

    while (totalPending = block1.InputCount + ... + blockn.InputCount> 100)
    {
        Thread.Sleep(200);
        //Note: if allocating huge quantities for of memory for each message the Garbage collector may keep up with the pace. 
        //This is the perfect place to force garbage collector to release memory.

    }
    block1.SendAssync(message)
}
person MiguelSlv    schedule 21.03.2019