У меня есть цепочка блоков потока данных TPL, и я хотел бы наблюдать за прогрессом где-то внутри системы.
Я знаю, что могу просто вставить TransformBlock
в меш, где я хочу наблюдать, заставить его опубликовать в каком-нибудь средстве обновления прогресса, а затем вернуть сообщение без изменений в следующий блок. Мне не нравится это решение, так как блок был бы там исключительно из-за его побочного эффекта, и мне также пришлось бы изменить логику связывания блоков везде, где я хочу наблюдать.
Поэтому я задался вопросом, могу ли я использовать ISourceBlock<T>.AsObservable
для наблюдения за прохождением сообщений внутри меша, не изменяя его и не потребляя сообщения. Это кажется более чистым и практичным решением, если оно сработает.
Из моего (ограниченного) понимания Rx это означает, что мне нужно, чтобы наблюдаемые были горячими, а не холодными, чтобы мой progress
updater видел сообщение, но не потреблял его. И .Publish().RefCount()
, кажется, способ сделать наблюдаемое горячим. Однако он просто не работает должным образом — вместо этого либо block2
, либо progress
получает и потребляет каждое сообщение.
// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});
// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));
// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
block1.Post(v);
}
block1.Complete();
Результат недетерминирован, но я получаю что-то смешанное:
block2:21
progress:22
progress:24
block2:23
progress:25
Итак, я что-то делаю не так, или это невозможно из-за того, как реализован TPL Dataflow AsObservable
?
Я понимаю, что мог бы также заменить LinkTo
между block1
и block2
парой Observable/Observer, и это могло бы сработать, но LinkTo
с нижестоящим BoundedCapacity = 1
— это единственная причина, по которой я использую поток данных TPL в первую очередь.
изменить: Несколько пояснений:
- Я намеревался установить
BoundedCapacity=1
в блоке 2. Хотя в этом тривиальном примере в этом нет необходимости, я считаю, что поток данных TPL действительно полезен в случае ограниченного нисходящего потока. Чтобы прояснить решение, которое я отверг во втором абзаце, нужно добавить следующий блок, связанный между блоком 1 и блоком 2:
var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});
Я также хотел бы поддерживать обратное давление, чтобы, если следующий вышестоящий блок распределял работу на
block1
, а также на других эквивалентных рабочих, он не отправлял работу наblock1
, если эта цепочка уже была занята.
.Publish().RefCount()
, так как он может создавать наблюдаемые объекты, которые могут запускаться только один раз. Вам действительно нужно делиться наблюдателями? - person Enigmativity   schedule 16.06.2017.Publish().RefCount()
, я просто подумал, читая, что это может сделать наблюдаемое «горячим», чтобы и средство обновления прогресса, иblock2
видели пропускную способность.block2
необходимо получить данные для вычислений — простойblock2
в этом примере может быть заменой целой цепочки взаимосвязанных блоков потока данных для выполнения вычислений. Тем временем наблюдаемаяblock1
предназначена для обновлений прогресса, т. е. отчетов в конечном итоге для пользовательского интерфейса. - person theStrawMan   schedule 16.06.2017.Publish().RefCount()
позволяет нескольким наблюдателям одного и того же исходного потока и не имеет ничего общего с тем, что представляет собой источник или как он получает свои данные. - person Enigmativity   schedule 16.06.2017block2
иprogress
. Однако я предполагаю, чтоblock2
не рассматривается какObserver
, потому что его ссылка наblock1
выполняется не способом RX, а вместо этого, однако, поток данных TPL реализуетLinkTo
внутри (иAsObservable
). Таким образом, мы не получаем успешныйMultiCast
, потому что его нужно было бы настроить таким образом внутриblock1
. Это звучит правильно? - person theStrawMan   schedule 16.06.2017ForEachAsync
(вместо этого вы должны использовать.Subscribe
). - person Enigmativity   schedule 16.06.2017GroupingDataflowBlockOptions
, то естьJoinBlock
иBatchBlock
. Этот дизайн имеет для меня некоторый смысл, однако, если вы поместите другие типы блоков, например,TransformBlock
, в свою сетку до этого, вы потеряете способность передавать нежадность дальше вверх по течению. Поэтому я используюBoundedCapacity=1
(ноль не допускается). По сути, TPL Dataflow по умолчанию размещает буферы, буферы повсюду, но в случае использования ограниченного противодавления в нисходящем направлении мне нужен буфер только в определенных местах, а не в других. - person theStrawMan   schedule 17.06.2017LinkTo
для публикации каждого сообщения в наблюдаемом, а затем возвращал true. Чтобы скрыть примесь этого, можно было бы создать метод расширенияObservableLinkTo
, который создает наблюдаемое и одновременно вызывает обычныйLinkTo
. - person theStrawMan   schedule 09.03.2018BroadcastBlock
не будет передавать данные целям, которые не могут принять их немедленно. Я пропустил это. Это отличный вопрос, я был там, где вы были, когда вы его задали -AsObservable
кажется идеальным решением для публикации потока обновлений прогресса, но для деталей. Похоже, вы продвинулись дальше, но если я придумаю что-то, что работает и не слишком хакерское, я могу опубликовать ответ на случай, если это поможет другим. - person Todd Menier   schedule 09.08.2018