Я пытаюсь добиться максимальной производительности от следующей задачи:
- Перечислить каталог с zip-файлами
- Извлечение zip-архивов в памяти в поисках
.json
файлов (обработка вложенных zip-архивов) - Разберите
json
файлы - Записать свойства из
json
файла в агрегированный.CSV
файл
Макет TPL, к которому я стремился, был:
producer -> parser block -> batch block -> csv writer block
Идея состоит в том, что один производитель извлекает zip-архивы и находит файлы json, отправляет текст в блок парсера, который работает параллельно (с несколькими потребителями). Пакетный блок группируется в пакеты по 200, а блок записи сбрасывает 200 строк в файл CSV при каждом вызове.
Вопросов:
- Чем больше времени занимает jsonParseBlock
TransformBlock
, тем больше сообщений удаляется. Как я могу предотвратить это? Как мне лучше использовать
TPL
для увеличения производительности?class Item { public string ID { get; set; } public string Name { get; set; } } class Demo { const string OUT_FILE = @"c:\temp\tplflat.csv"; const string DATA_DIR = @"c:\temp\tpldata"; static ExecutionDataflowBlockOptions parseOpts = new ExecutionDataflowBlockOptions() { SingleProducerConstrained=true, MaxDegreeOfParallelism = 8, BoundedCapacity = 100 }; static ExecutionDataflowBlockOptions writeOpts = new ExecutionDataflowBlockOptions() { BoundedCapacity = 100 }; public static void Run() { Console.WriteLine($"{Environment.ProcessorCount} processors available"); _InitTest(); // reset csv file, generate test data if needed // start TPL stuff var sw = Stopwatch.StartNew(); // transformer var jsonParseBlock = new TransformBlock<string, Item>(rawstr => { var item = Newtonsoft.Json.JsonConvert.DeserializeObject<Item>(rawstr); System.Threading.Thread.Sleep(15); // the more sleep here, the more messages lost return item; }, parseOpts); // batch block var jsonBatchBlock = new BatchBlock<Item>(200); // writer block var flatWriterBlock = new ActionBlock<Item[]>(items => { //Console.WriteLine($"writing {items.Length} to csv"); StringBuilder sb = new StringBuilder(); foreach (var item in items) { sb.AppendLine($"{item.ID},{item.Name}"); } File.AppendAllText(OUT_FILE, sb.ToString()); }); jsonParseBlock.LinkTo(jsonBatchBlock, new DataflowLinkOptions { PropagateCompletion = true }); jsonBatchBlock.LinkTo(flatWriterBlock, new DataflowLinkOptions { PropagateCompletion = true }); // start doing the work var crawlerTask = GetJsons(DATA_DIR, jsonParseBlock); crawlerTask.Wait(); flatWriterBlock.Completion.Wait(); Console.WriteLine($"ALERT: tplflat.csv row count should match the test data"); Console.WriteLine($"Completed in {sw.ElapsedMilliseconds / 1000.0} secs"); } static async Task GetJsons(string filepath, ITargetBlock<string> queue) { int count = 1; foreach (var zip in Directory.EnumerateFiles(filepath, "*.zip")) { Console.WriteLine($"working on zip #{count++}"); var zipStream = new FileStream(zip, FileMode.Open); await ExtractJsonsInMemory(zip, zipStream, queue); } queue.Complete(); } static async Task ExtractJsonsInMemory(string filename, Stream stream, ITargetBlock<string> queue) { ZipArchive archive = new ZipArchive(stream); foreach (ZipArchiveEntry entry in archive.Entries) { if (entry.Name.EndsWith(".json", StringComparison.OrdinalIgnoreCase)) { using (TextReader reader = new StreamReader(entry.Open(), Encoding.UTF8)) { var jsonText = reader.ReadToEnd(); await queue.SendAsync(jsonText); } } else if (entry.Name.EndsWith(".zip", StringComparison.OrdinalIgnoreCase)) { await ExtractJsonsInMemory(entry.FullName, entry.Open(), queue); } } } }
Обновление1
Я добавил async
, но мне непонятно, как дождаться завершения всех блоков потока данных (впервые в C #, async и tpl). Я в основном хочу сказать: «Продолжайте работать, пока все очереди / блоки не опустеют». Я добавил следующий код ожидания и, похоже, работает.
// wait for crawler to finish
crawlerTask.Wait();
// wait for the last block
flatWriterBlock.Completion.Wait();
Wait
будет заблокирован, вы можете вместо этого изменитьRun
наasync Task
и позволить вызывающему абоненту двигаться дальше. - person JSteward   schedule 23.02.2017