Подождите, пока предыдущие блоки закончат обработку, прежде чем продолжить

У меня есть процесс, который выглядит так.

  1. Получить набор файлов CSV из папки
  2. Прочитайте файлы CSV и сохраните содержимое в базе данных.
  3. Прочитайте данные из базы данных и выполните дополнительную обработку.

Причина разделения шагов 2 и 3 состоит в том, чтобы отделить проблемы, связанные с чтением файлов, от проблем, связанных с обработкой файлов.

Я могу смоделировать это с помощью трех блоков потока данных. У меня проблема в том, что я не хочу запускать блок 3, пока все файлы не будут сохранены в базе данных. Мне нужен какой-то способ определить, что все файлы, полученные в блоке 1, были обработаны блоком 2. В блоке 2 MaxDegreeOfParallelism будет установлено на Unbounded — я хочу, чтобы они обрабатывались параллельно.

Я рассматривал возможность использования Encapsulate в первых двух блоках, но не думаю, что это сработает. Возможно, мне нужно какое-то Batchblock, но не все партии будут одинакового размера.

Как я могу это сделать? Нужно ли мне создавать свой собственный тип блока?


person bornfromanegg    schedule 29.12.2014    source источник
comment
Вы должны читать все данные из базы данных одновременно, или вы можете читать их файл за файлом или что-то в этом роде?   -  person svick    schedule 31.12.2014


Ответы (1)


Это не подходит для одного потока TDF, поскольку шаг № 2 не передает элементы шагу № 3, который начинается после того, как предыдущие уже завершены.

У вас должно быть 2 отдельных потока. Первый читает из папки и сохраняет в базе данных, а второй читает из базы данных и начинает обработку. Вы можете дождаться завершения первого потока, ожидая свойства Completion:

var reader = // Create #1 block
var dbFiller = // Create #2 block

reader.LinkTo(dbFiller, new DataflowLinkOptions { PropagateCompletion = true }); // Link both blocks with Completion Propagation

reader.Post( // Queue up work for reader

await reader.Completion; // Asynchronously wait for previous steps to complete

var processor = // Create #3 block

processor.Post( // Queue up work for processor
person i3arnon    schedule 29.12.2014
comment
Однако, чтобы сделать это таким образом, мне нужно было бы выполнить считыватель Complete(), не так ли? Что отсутствует в приведенном выше коде. Я не хочу этого делать, так как это повторяемый процесс, и я хочу повторно использовать конвейер. - person bornfromanegg; 29.12.2014
comment
@ user1158174, но вы говорите, что хотите, чтобы обработка начиналась только после завершения предыдущей. Что значит завершено, если оно не завершено... - person i3arnon; 29.12.2014
comment
Да. Я думаю, это действительно вопрос, не так ли? Шаг 1 будет выполняться один раз, скажем, каждые десять минут. При выполнении он может найти десять файлов. Я хочу, чтобы все они обрабатывались на шаге 2 до начала шага 3. Поэтому, когда я говорю «закончено», я имею в виду завершение текущего пакета файлов. Здесь я застрял, потому что Dataflow не предлагает этого из коробки. BatchBlock ближе всего, но ему нужны партии определенного размера, чего у меня нет. - person bornfromanegg; 29.12.2014
comment
@ user1158174 TDF вообще этого не предлагает. Я рекомендую вам использовать мой ответ ... но вызывать его каждые 10 минут и ждать завершения всего цикла. Затем подождите 10 минут и сделайте это снова.. и т.д. - person i3arnon; 29.12.2014
comment
Хорошо, спасибо. Думаю, я надеялся найти способ инкапсулировать это поведение в пользовательский блок или что-то в этом роде. - person bornfromanegg; 29.12.2014
comment
@ user1158174 Не подходит. блок TDF завершается только один раз. То, что вы ищете, это фазы... - person i3arnon; 29.12.2014
comment
@user1158174 user1158174 Вы можете создать блок, который получает пинг, создает блоки для шагов №1 и №2, а когда №2 завершается, отправляет пинг следующему блоку. Но я не уверен, что это было бы действительно полезно. - person svick; 31.12.2014