Использование AsObservable для наблюдения за блоками потока данных TPL без использования сообщений

У меня есть цепочка блоков потока данных TPL, и я хотел бы наблюдать за прогрессом где-то внутри системы.

Я знаю, что могу просто вставить TransformBlock в меш, где я хочу наблюдать, заставить его опубликовать в каком-нибудь средстве обновления прогресса, а затем вернуть сообщение без изменений в следующий блок. Мне не нравится это решение, так как блок был бы там исключительно из-за его побочного эффекта, и мне также пришлось бы изменить логику связывания блоков везде, где я хочу наблюдать.

Поэтому я задался вопросом, могу ли я использовать ISourceBlock<T>.AsObservable для наблюдения за прохождением сообщений внутри меша, не изменяя его и не потребляя сообщения. Это кажется более чистым и практичным решением, если оно сработает.

Из моего (ограниченного) понимания Rx это означает, что мне нужно, чтобы наблюдаемые были горячими, а не холодными, чтобы мой progress updater видел сообщение, но не потреблял его. И .Publish().RefCount(), кажется, способ сделать наблюдаемое горячим. Однако он просто не работает должным образом — вместо этого либо block2, либо progress получает и потребляет каждое сообщение.

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()), new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 }); 
var obs = block1.AsObservable().Publish().RefCount(); // Declare this here just in case it makes a difference to do it before the LinkTo call.
var l1 = block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true});

// Progress
obs.ForEachAsync(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

Результат недетерминирован, но я получаю что-то смешанное:

block2:21
progress:22
progress:24
block2:23
progress:25

Итак, я что-то делаю не так, или это невозможно из-за того, как реализован TPL Dataflow AsObservable?

Я понимаю, что мог бы также заменить LinkTo между block1 и block2 парой Observable/Observer, и это могло бы сработать, но LinkTo с нижестоящим BoundedCapacity = 1 — это единственная причина, по которой я использую поток данных TPL в первую очередь.

изменить: Несколько пояснений:

  • Я намеревался установить BoundedCapacity=1 в блоке 2. Хотя в этом тривиальном примере в этом нет необходимости, я считаю, что поток данных TPL действительно полезен в случае ограниченного нисходящего потока.
  • Чтобы прояснить решение, которое я отверг во втором абзаце, нужно добавить следующий блок, связанный между блоком 1 и блоком 2:

    var progressBlock = new TransformBlock<int, int>( i => {SomeUpdateProgressMethod(i); return i;});

  • Я также хотел бы поддерживать обратное давление, чтобы, если следующий вышестоящий блок распределял работу на block1, а также на других эквивалентных рабочих, он не отправлял работу на block1, если эта цепочка уже была занята.


person theStrawMan    schedule 16.06.2017    source источник
comment
Будьте осторожны при использовании .Publish().RefCount(), так как он может создавать наблюдаемые объекты, которые могут запускаться только один раз. Вам действительно нужно делиться наблюдателями?   -  person Enigmativity    schedule 16.06.2017
comment
@Enigmativity - я не очень хорошо понимаю .Publish().RefCount(), я просто подумал, читая, что это может сделать наблюдаемое «горячим», чтобы и средство обновления прогресса, и block2 видели пропускную способность. block2 необходимо получить данные для вычислений — простой block2 в этом примере может быть заменой целой цепочки взаимосвязанных блоков потока данных для выполнения вычислений. Тем временем наблюдаемая block1 предназначена для обновлений прогресса, т. е. отчетов в конечном итоге для пользовательского интерфейса.   -  person theStrawMan    schedule 16.06.2017
comment
Это не так просто, как горячее и холодное. Просто имейте в виду, что .Publish().RefCount() позволяет нескольким наблюдателям одного и того же исходного потока и не имеет ничего общего с тем, что представляет собой источник или как он получает свои данные.   -  person Enigmativity    schedule 16.06.2017
comment
Я не совсем понимаю, однако мне действительно нужны несколько наблюдателей, то есть block2 и progress. Однако я предполагаю, что block2 не рассматривается как Observer, потому что его ссылка на block1 выполняется не способом RX, а вместо этого, однако, поток данных TPL реализует LinkTo внутри (и AsObservable). Таким образом, мы не получаем успешный MultiCast, потому что его нужно было бы настроить таким образом внутри block1. Это звучит правильно?   -  person theStrawMan    schedule 16.06.2017
comment
Да, так лучше звучит. Проблема в том, что у наблюдаемого есть только один наблюдатель в вашем коде. Блоки потока данных не являются наблюдателями вашего наблюдаемого. Единственный наблюдатель, который у вас есть, создается при вызове метода ForEachAsync (вместо этого вы должны использовать .Subscribe).   -  person Enigmativity    schedule 16.06.2017
comment
@theStrawMan, вы пытались сделать блоки нежадными вместо того, чтобы ограничивать их емкость? Это может быть то, что вам нужно, поскольку блок не будет запрашивать дополнительные сообщения, если он занят.   -  person VMAtm    schedule 16.06.2017
comment
Требуется нежадное поведение, однако оно доступно только для GroupingDataflowBlockOptions, то есть JoinBlock и BatchBlock. Этот дизайн имеет для меня некоторый смысл, однако, если вы поместите другие типы блоков, например, TransformBlock, в свою сетку до этого, вы потеряете способность передавать нежадность дальше вверх по течению. Поэтому я использую BoundedCapacity=1 (ноль не допускается). По сути, TPL Dataflow по умолчанию размещает буферы, буферы повсюду, но в случае использования ограниченного противодавления в нисходящем направлении мне нужен буфер только в определенных местах, а не в других.   -  person theStrawMan    schedule 17.06.2017
comment
@theStrawMan ты решил это в конце концов?   -  person user4388177    schedule 08.03.2018
comment
@ user4388177 - Это было недавно, но я почти уверен, что только что использовал хакерский подход, когда я получил предикат LinkTo для публикации каждого сообщения в наблюдаемом, а затем возвращал true. Чтобы скрыть примесь этого, можно было бы создать метод расширения ObservableLinkTo, который создает наблюдаемое и одновременно вызывает обычный LinkTo.   -  person theStrawMan    schedule 09.03.2018
comment
@theStrawMan спасибо. Я поговорил с другом, и он внедряет несколько новых блоков, которые позволят это сделать, я скажу ему, чтобы он разместил их здесь.   -  person user4388177    schedule 09.03.2018
comment
@theStrawMan По какой-то причине вы никогда не принимали ответ ниже? Кажется, он идеально отвечает на ваш вопрос.   -  person Todd Menier    schedule 07.08.2018
comment
@ToddMenier - я этого не сделал, потому что предлагаемое решение либо отбрасывает сообщения, либо теряет обратное давление, что было для меня неприемлемо. Я закончил тем, что написал что-то нестандартное, что сделало это. PS: если бы я начинал это сегодня, я бы посмотрел на реализацию Reactive Streams, например Akka.NET Streams. вместо этого - выглядит большим улучшением по сравнению с потоком данных TPL.   -  person theStrawMan    schedule 08.08.2018
comment
@theStrawMan Вы правы - BroadcastBlock не будет передавать данные целям, которые не могут принять их немедленно. Я пропустил это. Это отличный вопрос, я был там, где вы были, когда вы его задали - AsObservable кажется идеальным решением для публикации потока обновлений прогресса, но для деталей. Похоже, вы продвинулись дальше, но если я придумаю что-то, что работает и не слишком хакерское, я могу опубликовать ответ на случай, если это поможет другим.   -  person Todd Menier    schedule 09.08.2018
comment
@ToddMenier - да, это кажется разумным подходом к отслеживанию движения через блоки, и постоянный интерес к этому вопросу означает, что мне, вероятно, следует опубликовать упрощенную версию того, что я сделал. Я сделаю это.   -  person theStrawMan    schedule 09.08.2018


Ответы (4)


Проблема с вашим кодом заключается в том, что вы подключаете двух потребителей block1. Затем поток данных просто дает значение тому, какой потребитель появляется первым.

Поэтому вам нужно транслировать значения из block1 в два других блока, чтобы затем иметь возможность потреблять их независимо.

Просто примечание: не делайте .Publish().RefCount(), так как это не делает то, что вы думаете. Это эффективно сделает один запуск доступным только для наблюдения, что во время этого одного запуска позволит нескольким наблюдателям подключаться и видеть одни и те же значения. Это не имеет никакого отношения ни к источнику данных, ни к тому, как взаимодействуют блоки потока данных.

Попробуйте этот код:

// Set up mesh
var block1 = new TransformBlock<int, int>(i => i + 20);
var block_boadcast = new BroadcastBlock<int>(i => i, new DataflowBlockOptions());
var block_buffer = new System.Threading.Tasks.Dataflow.BufferBlock<int>();
var block2 = new ActionBlock<int>(i => Debug.Print("block2:" + i.ToString()));
var obs = block_buffer.AsObservable();
var l1 = block1.LinkTo(block_boadcast);
var l2 = block_boadcast.LinkTo(block2);
var l3 = block_boadcast.LinkTo(block_buffer);

// Progress
obs.Subscribe(i => Debug.Print("progress:" + i.ToString()));

// Start
var vals = Enumerable.Range(1, 5);
foreach (var v in vals)
{
    block1.Post(v);
}
block1.Complete();

Это дает мне:

block2:21
block2:22
block2:23
block2:24
block2:25
progress:21
progress:22
progress:23
progress:24
progress:25

Что, я думаю, ты и хотел.

Кроме того, использование Rx для этого может быть лучшим вариантом во всех отношениях. Это намного мощнее и декларативнее, чем любой вариант TPL или Dataflow.

Ваш код сводится к следующему:

Observable
    .Range(1, 5)
    .Select(i => i + 20)
    .Do(i => Debug.Print("progress:" + i.ToString()));
    .Subscribe(i => Debug.Print("block2:" + i.ToString()));

Это в значительной степени дает вам тот же результат.

person Enigmativity    schedule 16.06.2017
comment
Я думаю, что это разумный подход, за исключением того, что, вводя буфер после BroadcastBlock, мы теряем все обратное давление в системе. Я не указывал это как ограничение на вопрос явно, однако цель BoundedCapacity=1 в block2 состоит в том, чтобы показать, что он ограничен. Я мог бы захотеть поддерживать обратное давление, если, например. это всего лишь один из нескольких путей обработки, и в восходящем направлении фактически есть возможность отправить на другие пути, если он заполнен. Введя здесь BufferBlock, этот путь теперь по существу жадный, и работа не может быть успешно распределена. - person theStrawMan; 16.06.2017
comment
@theStrawMan - блок буфера предназначен только для наблюдаемого. Это не должно влиять на остальную часть вашего потока данных. - person Enigmativity; 16.06.2017
comment
Ах, извините, я неправильно это прочитал. Я должен проверить ваше решение. Признаюсь, я на самом деле не знаю, существует ли риск того, что broadcast_block потеряет сообщение, если block2 отклонит его - может ли текущее сообщение broadcast_block быть смещено при этом? Я бы надеялся, что нет, но я не уверен. Если нет, то это действительно разумное решение. - person theStrawMan; 16.06.2017
comment
(с добавлением BoundedCapacity=1 к broadcast_block) - person theStrawMan; 16.06.2017
comment
Я только что проверил это, и, к сожалению, сообщения отбрасывались. Я думал, что BroadcastBlock может быть реализован таким образом, что в ситуации обратного давления он будет передавать обратно своим вышестоящим блокам «Я полон», и поэтому они еще не будут отправлять ему другое сообщение, но нет, это не так. - person theStrawMan; 16.06.2017
comment
@theStrawMan - Попробуйте использовать Rx. Он создает обратное давление, но не теряет ценности. - person Enigmativity; 16.06.2017
comment
Ой? Насколько я понимаю, Rx.NET не имеет обратного давления, при по крайней мере, не до такой степени, когда производитель узнает о блокировке нижестоящего потока и может скорректировать поведение. Хотя это немного отходит от моего первоначального вопроса здесь. - person theStrawMan; 16.06.2017

Есть два варианта, которые следует учитывать при создании наблюдаемого блока потока данных. Вы также можете:

  1. выдавать уведомление каждый раз, когда сообщение обрабатывается, или
  2. выдавать уведомление, когда ранее обработанное сообщение, хранящееся в выходном буфере блока, принимается связанным блоком.

Оба варианта имеют плюсы и минусы. Первый вариант обеспечивает своевременные, но неупорядоченные уведомления. Второй вариант обеспечивает упорядоченные, но отложенные уведомления, а также должен иметь дело с одноразовой связью между блоками. Что должно произойти с наблюдаемым, когда связь между двумя блоками удаляется вручную до того, как блоки будут завершены?

Ниже приведена реализация первого варианта, который создает TransformBlock вместе с непотребляющим IObservable этого блока. Также существует реализация для ActionBlock эквивалента, основанная на первой реализации (хотя ее можно было бы реализовать и самостоятельно путем копирования-вставки и адаптации реализации TransformBlock, так как кода не так много).

public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, Task<TOutput>> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (transform == null) throw new ArgumentNullException(nameof(transform));
    dataflowBlockOptions = dataflowBlockOptions ?? new ExecutionDataflowBlockOptions();

    var semaphore = new SemaphoreSlim(1);
    int startedIndexSeed = 0;
    int completedIndexSeed = 0;

    var notificationsBlock = new BufferBlock<(TInput, TOutput, int, int)>(
        new DataflowBlockOptions() { BoundedCapacity = 100 });

    var transformBlock = new TransformBlock<TInput, TOutput>(async item =>
    {
        var startedIndex = Interlocked.Increment(ref startedIndexSeed);
        var result = await transform(item).ConfigureAwait(false);
        await semaphore.WaitAsync().ConfigureAwait(false);
        try
        {
            // Send the notifications in synchronized fashion
            var completedIndex = Interlocked.Increment(ref completedIndexSeed);
            await notificationsBlock.SendAsync(
                (item, result, startedIndex, completedIndex)).ConfigureAwait(false);
        }
        finally
        {
            semaphore.Release();
        }
        return result;
    }, dataflowBlockOptions);

    _ = transformBlock.Completion.ContinueWith(t =>
    {
        if (t.IsFaulted) ((IDataflowBlock)notificationsBlock).Fault(t.Exception);
        else notificationsBlock.Complete();
    }, TaskScheduler.Default);

    observable = notificationsBlock.AsObservable();
    // A dummy subscription to prevent buffering in case of no external subscription.
    observable.Subscribe(
        DataflowBlock.NullTarget<(TInput, TOutput, int, int)>().AsObserver());
    return transformBlock;
}

// Overload with synchronous lambda
public static TransformBlock<TInput, TOutput>
    CreateObservableTransformBlock<TInput, TOutput>(
    Func<TInput, TOutput> transform,
    out IObservable<(TInput Input, TOutput Output,
        int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableTransformBlock(item => Task.FromResult(transform(item)),
        out observable, dataflowBlockOptions);
}

// ActionBlock equivalent (requires the System.Reactive package)
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Func<TInput, Task> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    if (action == null) throw new ArgumentNullException(nameof(action));
    var block = CreateObservableTransformBlock<TInput, object>(
        async item => { await action(item).ConfigureAwait(false); return null; },
        out var sourceObservable, dataflowBlockOptions);
    block.LinkTo(DataflowBlock.NullTarget<object>());
    observable = sourceObservable
        .Select(entry => (entry.Input, entry.StartedIndex, entry.CompletedIndex));
    return block;
}

// ActionBlock equivalent with synchronous lambda
public static ITargetBlock<TInput>
    CreateObservableActionBlock<TInput>(
    Action<TInput> action,
    out IObservable<(TInput Input, int StartedIndex, int CompletedIndex)> observable,
    ExecutionDataflowBlockOptions dataflowBlockOptions = null)
{
    return CreateObservableActionBlock(
        item => { action(item); return Task.CompletedTask; },
        out observable, dataflowBlockOptions);
}

Пример использования в Windows Forms:

private async void Button1_Click(object sender, EventArgs e)
{
    var block = CreateObservableTransformBlock((int i) => i + 20,
        out var observable,
        new ExecutionDataflowBlockOptions() { BoundedCapacity = 1 });

    var vals = Enumerable.Range(1, 20).ToList();
    TextBox1.Clear();
    ProgressBar1.Value = 0;

    observable.ObserveOn(SynchronizationContext.Current).Subscribe(onNext: x =>
    {
        TextBox1.AppendText($"Value {x.Input} transformed to {x.Output}\r\n");
        ProgressBar1.Value = (x.CompletedIndex * 100) / vals.Count;
    }, onError: ex =>
    {
        TextBox1.AppendText($"An exception occured: {ex.Message}\r\n");
    },
    onCompleted: () =>
    {
        TextBox1.AppendText("The job completed successfully\r\n");
    });

    block.LinkTo(DataflowBlock.NullTarget<int>());

    foreach (var i in vals) await block.SendAsync(i);
    block.Complete();
}

В приведенном выше примере тип переменной observable:

IObservable<(int Input, int Output, int StartedIndex, int CompletedIndex)>

Два индекса отсчитываются от 1.

person Theodor Zoulias    schedule 21.06.2020

Попробуйте заменить:

obs.ForEachAsync(i => Debug.Print("progressBlock:" + i.ToString()));

с участием:

obs.Subscribe(i => Debug.Print("progressBlock:" + i.ToString()));

Я полагаю, что метод ForEachAsync не подключается должным образом/он срабатывает, но что-то странное происходит с асинхронной частью.

person Clint    schedule 16.06.2017
comment
Спасибо за предложение, однако я получаю точно такое же поведение. - person theStrawMan; 16.06.2017
comment
Дратс, видел ответ @VMAtm, возможно, вы могли бы разделить два блока, использовать наблюдаемое в качестве единственного потребителя, а затем передать наблюдаемое значение во второй блок? Таким образом, вы можете использовать наблюдаемое значение столько, сколько захотите. - person Clint; 16.06.2017

Указав BoundedCapacity для блока внутри цепочки, вы создаете ситуацию, когда некоторые из ваших сообщений отклоняются целевыми блоками, поскольку буфер для ActionBlock заполнен, и сообщение отклоняется.

Создавая наблюдаемое из вашего буферного блока, вы создаете условие гонки: два потребителя ваших данных получают сообщения одновременно. Блоки в TPL Dataflow передают данные первому доступному потребителю, что приводит к недетерминированному состоянию приложения.

Теперь вернемся к вашей проблеме. Вы можете ввести BroadcastBlock, так как он предоставляет копию данных всем потребителям, а не только первому, но в этом случае вам нужно снять ограничение на размер буфера, блок вещания похож на телевизионный канал. , вы не можете получить предыдущее шоу, у вас есть только текущее.

Дополнительные примечания: вы не проверяете возвращаемое значение метода Post, вы можете рассмотреть использование await SendAsync, и для лучшего эффекта регулирования установите BoundedCapacity для блока начальной точки, а не для среднего.

person VMAtm    schedule 16.06.2017
comment
Спасибо, тот факт, что моя система потока данных ограничена нисходящим потоком, был преднамеренным - imo, это вариант использования, в котором сияет библиотека потока данных. Я добавлю примечание по этому поводу. По этой причине я бы не удалял параметр BoundedCapacity=1, а BroadcastBlock не подходил бы. - person theStrawMan; 16.06.2017
comment
Так и не понял, зачем вам такое ограничение, можно поподробнее? Поток данных добавляет небольшие накладные расходы, если буфер ограничен. - person VMAtm; 16.06.2017