извлечение zip-архивов, синтаксический анализ файлов и преобразование в CSV

Я пытаюсь добиться максимальной производительности от следующей задачи:

  • Перечислить каталог с zip-файлами
  • Извлечение zip-архивов в памяти в поисках .json файлов (обработка вложенных zip-архивов)
  • Разберите json файлы
  • Записать свойства из json файла в агрегированный .CSV файл

Макет TPL, к которому я стремился, был:

producer -> parser block -> batch block -> csv writer block

Идея состоит в том, что один производитель извлекает zip-архивы и находит файлы json, отправляет текст в блок парсера, который работает параллельно (с несколькими потребителями). Пакетный блок группируется в пакеты по 200, а блок записи сбрасывает 200 строк в файл CSV при каждом вызове.

Вопросов:

  • Чем больше времени занимает jsonParseBlock TransformBlock, тем больше сообщений удаляется. Как я могу предотвратить это?
  • Как мне лучше использовать TPL для увеличения производительности?

    class Item
    {
        public string ID { get; set; }
        public string Name { get; set; }
    }
    
    class Demo
    {
        const string OUT_FILE = @"c:\temp\tplflat.csv";
        const string DATA_DIR = @"c:\temp\tpldata";
        static ExecutionDataflowBlockOptions parseOpts = new ExecutionDataflowBlockOptions() { SingleProducerConstrained=true, MaxDegreeOfParallelism = 8, BoundedCapacity = 100 };
        static ExecutionDataflowBlockOptions writeOpts = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 };
    
        public static void Run()
        {
            Console.WriteLine($"{Environment.ProcessorCount} processors available");
            _InitTest(); // reset csv file, generate test data if needed
            // start TPL stuff
            var sw = Stopwatch.StartNew();
            // transformer
            var jsonParseBlock = new TransformBlock<string, Item>(rawstr =>
            {
                var item = Newtonsoft.Json.JsonConvert.DeserializeObject<Item>(rawstr);
                System.Threading.Thread.Sleep(15); // the more sleep here, the more messages lost
                return item;
            }, parseOpts);
    
            // batch block
            var jsonBatchBlock = new BatchBlock<Item>(200);
    
            // writer block
            var flatWriterBlock = new ActionBlock<Item[]>(items =>
            {
                //Console.WriteLine($"writing {items.Length} to csv");
                StringBuilder sb = new StringBuilder();
                foreach (var item in items)
                {
                    sb.AppendLine($"{item.ID},{item.Name}");
                }
                File.AppendAllText(OUT_FILE, sb.ToString());
            });
    
            jsonParseBlock.LinkTo(jsonBatchBlock, new DataflowLinkOptions { PropagateCompletion = true });
            jsonBatchBlock.LinkTo(flatWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });
    
            // start doing the work
            var crawlerTask = GetJsons(DATA_DIR, jsonParseBlock);
            crawlerTask.Wait();
            flatWriterBlock.Completion.Wait();
            Console.WriteLine($"ALERT: tplflat.csv row count should match the test data");
            Console.WriteLine($"Completed in {sw.ElapsedMilliseconds / 1000.0} secs");
         }
    
        static async Task GetJsons(string filepath, ITargetBlock<string> queue)
        {
            int count = 1;
            foreach (var zip in Directory.EnumerateFiles(filepath, "*.zip"))
            {
                Console.WriteLine($"working on zip #{count++}");
                var zipStream = new FileStream(zip, FileMode.Open);
                await ExtractJsonsInMemory(zip, zipStream, queue);
            }
            queue.Complete();
        }
    
        static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
        {
            ZipArchive archive = new ZipArchive(stream);
            foreach (ZipArchiveEntry entry in archive.Entries)
            {
                if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
                {
                    using (TextReader reader = new StreamReader(entry.Open(), Encoding.UTF8))
                    {
                        var jsonText = reader.ReadToEnd();
                        await queue.SendAsync(jsonText);
                    }
                }
                else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
                {
                    await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
                }
            }
        }
    }
    

Обновление1

Я добавил async, но мне непонятно, как дождаться завершения всех блоков потока данных (впервые в C #, async и tpl). Я в основном хочу сказать: «Продолжайте работать, пока все очереди / блоки не опустеют». Я добавил следующий код ожидания и, похоже, работает.

// wait for crawler to finish
crawlerTask.Wait(); 
// wait for the last block
flatWriterBlock.Completion.Wait(); 

person lifebythedrop    schedule 22.02.2017    source источник
comment
Просто к сведению: я вижу примерно 1,7-кратное улучшение по сравнению с однопоточной производительностью на 4-ядерной машине.   -  person lifebythedrop    schedule 23.02.2017
comment
Завершение выглядит нормально, как есть. Просто знайте, что Wait будет заблокирован, вы можете вместо этого изменить Run на async Task и позволить вызывающему абоненту двигаться дальше.   -  person JSteward    schedule 23.02.2017


Ответы (2)


Короче ваш постинг и игнорирование возвращаемого значения. У вас есть два варианта: добавить несвязанный BufferBlock для хранения всех ваших входящих данных или await на SendAsync, который предотвратит удаление любых сообщений.

static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue)
{
    var archive = new ZipArchive(stream);
    foreach (ZipArchiveEntry entry in archive.Entries)
    {
        if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase))
        {
            using (var reader = new StreamReader(entry.Open(), Encoding.UTF8))
            {
                var jsonText = reader.ReadToEnd();
                await queue.SendAsync(jsonText);
            }
        }
        else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase))
        {
            await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue);
        }
    }
}

Вам нужно будет полностью восстановить асинхронность, но это должно помочь вам начать.

person JSteward    schedule 22.02.2017
comment
Я добавил async апдейтов и вроде все работает - ни одного сообщения не упало. Не уверен на 100%, что я должным образом жду завершения обработки. - person lifebythedrop; 23.02.2017

Из MSDN о методе DataflowBlock.Post<TInput>:

Возвращаемое значение
Тип: System.Boolean
true, если элемент был принят целевым блоком; в противном случае - false.

Итак, проблема здесь в том, что вы отправляете свои сообщения, не проверяя, может ли конвейер принять другое сообщение или нет. Это происходит из-за ваших вариантов блоков:

new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 }

и эта строка:

// this line isn't waiting for long operations and simply drops the message as it can't be accepted by the target block
queue.Post(jsonText);

Здесь вы говорите, что обработка должна быть отложена до тех пор, пока длина входной очереди не станет равной 100. В этом случае либо MSDN, либо @StephenCleary в его Введение в серию Dataflow предлагают простое решение:

Однако можно задушить блок, ограничив его размер буфера; в этом случае вы можете использовать SendAsync, чтобы (асинхронно) дождаться освобождения места, а затем поместить данные во входной буфер блока.

Итак, как уже предлагал @JSteward, вы можете ввести бесконечный буфер между вашими рабочими, чтобы избежать отбрасывания сообщения, и это обычная практика, поскольку проверка результата метода Post может заблокировать поток производителя на долгое время. .

Вторая часть вопроса, касающаяся производительности, заключается в использовании решения, ориентированного на async (которое идеально подходит для _ 9_ метод использования), поскольку вы постоянно используете операции ввода-вывода. Асинхронная операция - это, по сути, способ сказать программе: «Начни делать это и сообщи мне, когда это будет сделано». И, поскольку нет потока для таких операций , вы получите выгоду, освободив пул потоков для других операций в вашем конвейере.

PS: @JSteward предоставил вам хороший пример кода для этого подхода.

person VMAtm    schedule 22.02.2017
comment
Изначально я пробовал использовать неограниченный буфер, но у меня не хватало памяти, так как потребители не могли успевать. Сейчас работаю над асинхронной версией. - person lifebythedrop; 23.02.2017
comment
@lifebythedrop да, неограниченная емкость буфера может привести к проблемам с памятью, в этом случае вы либо создаете блокирующий поток для публикации сообщений, либо выполняете асинхронный режим. Удачи с этим. - person VMAtm; 23.02.2017