Как себя ведет Rx, когда поток данных идет быстрее, чем подписчики могут потреблять?

Мне очень нравится использовать Rx в производственном приложении; где я буду слушать обновления входящих уведомлений, поступающих с другого канала.

Я буду писать Rx-запрос поверх этого потока, где я буду регулировать его с помощью оператора .Window (). Подписчик (в моем случае это ActionBlock) будет обрабатывать эти данные в режиме блокировки; (т.е. он не будет порождать Task из ActionBlock). Имея в виду вышеизложенное, если данные поступают с гораздо большей скоростью, чем может потреблять мой подписчик, то что произойдет с входящими данными. Использует ли запрос Rx какой-либо буфер внутри; он будет переполнен?


person user2757350    schedule 18.12.2013    source источник
comment
Вам абсолютно необходимо обрабатывать каждое отдельное событие или можно игнорировать некоторые из них (даже если для этого потребуется какая-то хитрая логика)?   -  person cwharris    schedule 19.12.2013
comment
Поскольку вы уже используете Dataflow в качестве потребителя, почему бы также не использовать его в качестве производителя? Он неплохо поддерживает троттлинг.   -  person svick    schedule 19.12.2013


Ответы (3)


Фактически это связано с реализацией отдельных операторов, но встроенные будут буферизироваться на основе для каждой подписки, поэтому медленный потребитель не будет блокировать других подписчиков, если, конечно, подписчики не делятся потоки.

Кстати, Rx не всегда защищает грамматику Rx; например, вы несете ответственность за то, чтобы не выполнять одновременных вызовов OnNext по теме. Вы можете использовать Observable.Synchronize(), чтобы исправить это.

person James World    schedule 18.12.2013
comment
Создает ли он внутри более одного экземпляра подписчика или всегда один? - person user2757350; 19.12.2013
comment
Вместо «Подписчик» произнесите «Наблюдатель». IObservable один Subscribe метод принимает IObserver. Перегрузки, которые принимают делегатов, на самом деле являются методами расширения, которые создают анонимного наблюдателя за кулисами - всегда есть один наблюдатель на подписку. - person James World; 19.12.2013

Явление, о котором вы говорите, называется обратным давлением, и команда Rx в настоящее время изучает различные способы справиться с этой ситуацией. Одним из решений может быть обратная связь с Observable, чтобы он «замедлился».

Чтобы уменьшить обратное давление, вы можете использовать операторы с потерями, такие как Throttle или Sample.

Ответ Тимоти в основном правильный, но возможно, что обратное давление возникнет в одном потоке. Это может произойти, если вы используете асинхронный код. В этом смысле противодействие связано с синхронизацией и планированием, а не с потоками (напомним, что по умолчанию Rx является однопоточным).

Если вы столкнетесь со сценарием, когда события создаются быстрее, чем они могут быть использованы, и вы не используете оператор с потерями для уменьшения обратного давления, эти элементы обычно планируются / помещаются в очередь / буферизируются, что может привести к много выделения памяти.

Лично для меня это не было проблемой, поскольку обычно события обрабатываются быстрее, чем передаются, или потеря событий просто не возможна, и поэтому дополнительное потребление памяти неизбежно.

person cwharris    schedule 18.12.2013
comment
Спасибо за ответ; эти элементы обычно планируются / помещаются в очередь / буферизируются, что может привести к выделению большого количества памяти - так что мой предыдущий вопрос касался этого; Где эта работа буферизуется (очевидно, внутри памяти); но имеем ли мы, как разработчик, управление этим буфером; и можем ли мы как разработчик диктовать что-либо, связанное с этой буферизацией сообщений. Спасибо - person user2757350; 19.12.2013
comment
Как говорит Крис, вы должны использовать оператор с потерями - ознакомьтесь с этим подходом к снижению обратного давления для получения дополнительной информации: zerobugbuild.com/?p=192 - person James World; 19.12.2013
comment
Точно. У вас есть четыре варианта. 1: замедлить источник, 2: буферизовать события (противодавление), 3: ускорить потребителя, 4: потерять некоторые события. Для встроенных операторов единственный контроль над способом планирования событий - это введение определенного Scheduler. Это обеспечивает достаточный контроль для определенных обстоятельств, но, по моему опыту, элементы планируются для каждой подписки, а не для отдельных элементов. У некоторых операторов также есть перегрузки селектора, которые могут помочь в определенных областях. Если вам нужен еще больший контроль, вам может потребоваться написать собственные операторы. - person cwharris; 20.12.2013
comment
В моем случае я не могу позволить себе потерять и входящие сообщения, потому что это обновления позиций ... - person user2757350; 20.12.2013
comment
Абсолютны ли позиции? в этом случае теоретически вы можете потерять некоторые обновления из-за потери плавности / точности. Мы могли бы помочь больше, если бы точно знали, в чем проблема. - person cwharris; 20.12.2013
comment
Позиции Криса независимы друг от друга; это не тикеры, и я не могу позволить себе использовать операторов с потерями. Кажется, мое единственное решение - использовать буфер, в котором будут храниться все входящие обновления позиции. - person user2757350; 08.01.2014
comment
Вы должны подумать, действительно ли количество входящих уведомлений так велико. Как сказал Крис, эта проблема возникает не так часто. Обычно это происходит из-за медленного потребителя, а не из-за столь быстрого производителя, что ничто не может за ним угнаться. Примерами медленных потребителей являются все, что связано с сетью, базой данных, пользовательским интерфейсом, даже с локальными файлами и т. Д. Если вы делаете что-либо из этого в своем наблюдателе, вам следует подумать о буферизации обновлений и объединении обработки, если вы не можете использовать операторы с потерями. . Также стоит профилирование кода наблюдателя. - person Niall Connaughton; 25.08.2015

Если подписчик обрабатывает в том же потоке, что и излучающий наблюдаемый объект, данные не могут поступать быстрее, чем подписчик может потреблять.

IObservable<int> data = ...;
var subscription = data.Subscribe(n => Console.WriteLine(n));

В этом примере каждый int, исходящий из data, будет записан в консоль до того, как будет отправлен следующий int.

Если подписка пересекает потоки, то вышеперечисленное не выполняется.

person Timothy Shields    schedule 18.12.2013
comment
Так; что вы говорите, поскольку Rx Query и Subscriber (т.е. Observer) всегда выполняются в одном потоке? Если у меня есть IObservable поверх FileStream; как это будет вести себя. Поскольку FileStream не зависит от IObservable и может предоставлять данные независимо от того, выполняется ли Rx Query поверх него или нет ... - person user2757350; 18.12.2013
comment
В этом комментарии вы говорите о разнице между горячими и холодными наблюдаемыми. Взгляните на этот вопрос: stackoverflow.com/questions/2521277/. С горячим наблюдаемым вы можете пропустить данные, если вы не подписаны, но если вы подписаны, вы должны получить все данные, отправленные наблюдаемым, даже если это означает, что OnNexts буферизуются на медленном наблюдателе. - person James World; 18.12.2013