Один из распространенных типов проблем, снова и снова возникающих на stackoverflow.com, - это повторное использование одного экземпляра любого из классов Subject, а затем удивление, что он не работает так, как можно было бы ожидать.
Давайте начнем с разговоров о субъектах и их внутреннем состоянии, а также о том, почему так важно знать complete
и error
уведомления. Затем мы перейдем к более интересным примерам с классами ReplaySubject
и BehaviorSubject
.
Повторное использование одного экземпляра Subject
Все субъекты имеют внутреннее состояние, которое отражает самый основной принцип Rx. Каждый Observable излучает ноль или более next
уведомлений и одно complete
или error
уведомление, но никогда то и другое одновременно. На практике это означает, что когда экземпляр Subject
получает complete
, он никогда ничего не должен испускать. Мы можем увидеть это на следующем примере:
// http://jsbin.com/nobuwud/2/edit?js,console const s = new Subject(); s.subscribe(console.log); s.next(1); s.next(2); s.complete(); s.next(3);
Это печатает только числа 1
и 2
. Номер 3
выдается после того, как наш Subject
уже получил complete
уведомление, которое пометило себя как остановленное. Это означает, что этот экземпляр Subject
больше никогда ничего не будет излучать (нет законного способа снова сделать объект "не остановленным").
Очень распространенная проблема при повторном использовании субъектов - непреднамеренная передача complete
уведомления. Например, давайте рассмотрим следующий пример:
// http://jsbin.com/matatit/1/edit?js,console const s = new Subject(); s.subscribe(console.log); Observable.range(1, 5) .subscribe(s); s.next(42);
Это печатает только числа 1
- 5
, но что случилось с 42
?
range(1, 5)
source Observable отправляет помимо next
s также complete
уведомление в конце. Обычно мы подписываемся на обработку только next
элементов, и нас не интересует complete
уведомление, но в данном случае это очень важно. Итак, что произошло, должно быть очевидно. s
Субъект получил complete
уведомление, в котором он пометил себя как остановленный, и больше никогда ничего не будет отправлять.
Так что, если мы хотим получать все next
, но не complete
уведомление (и не error
)? Самый простой способ - вручную вызвать next()
в теме:
// http://jsbin.com/matatit/2/edit?js,console ... Observable.range(1, 5) .subscribe(val => s.next(val)); ...
Теперь, когда мы снова запускаем этот пример, мы также получаем число 42
.
Обратите внимание, что все классы Subject имеют isStopped
публичное логическое свойство, в котором вы можете проверить их состояние. Также имейте в виду, что для error
уведомлений он работает так же, как и для complete
.
Сравнение BehaviorSubject и ReplaySubject
Теперь, когда мы находимся на одной странице, давайте рассмотрим более интересный пример. Мы будем использовать BehaviorSubject
и ReplaySubject
, потому что их можно часто менять местами. Однако их поведение отличается от сигнала complete
.
Мы можем взглянуть на тот же пример, что и выше, но на этот раз мы будем использовать ReplaySubject
и подпишемся после получения complete
уведомления от range
:
// http://jsbin.com/hewomer/2/edit?js,console const s = new ReplaySubject(); Observable.range(1, 5) .subscribe(s); s.subscribe(console.log); // should this print anything?
Это напечатает числа 1
- 5
. Но почему? Разве мы не узнали, что Субъекты ничего не излучают после получения complete
, которое автоматически отправляется range
, как мы видели ранее?
Это все еще правда. Однако существуют различия в реализациях Subject.
Мы можем увидеть разницу на более общем примере. Допустим, мы хотим кэшировать отдельный элемент, а затем воспроизводить его для каждого нового подписчика. И BehaviorSubject
, и ReplaySubject
будут работать в этом варианте использования, даже если их использование отличается (BehaviorSubject
имеет значение по умолчанию). Например, мы могли бы использовать их следующим образом:
// http://jsbin.com/hotidih/5/edit?js,console const behavior = new BehaviorSubject(null); const replay = new ReplaySubject(1); behavior.skip(1).subscribe(v => console.log(‘BehaviorSubject:’, v)); replay.subscribe(v => console.log(‘ReplaySubject:’, v)); behavior.next(1); behavior.next(2); behavior.subscribe(v => console.log(‘Late B subscriber:’, v)); replay.next(1); replay.next(2); behavior.subscribe(v => console.log(‘Late R subscriber:’, v));
В этом примере все числа печатаются дважды. Один раз с префиксом «BehaviorSubject», а затем снова с префиксом «ReplaySubject» (обратите внимание, что нам пришлось использовать оператор skip(1)
, чтобы пропустить значение по умолчанию, исходящее из BehaviorSubject
).
Так почему мы должны выбирать одно вместо другого (в сценариях, где производительность не является проблемой) и как это связано с внутренними состояниями Субъекта?
Все сводится к тому, как каждый из них работает внутри по подписке после получения уведомления complete
:
BehaviorSubject
отмечает себя как остановившийся и никогда ничего не излучает. По сути, он возвращает «пустой»Subscription
объект, не представляющий никакой реальной подписки.ReplaySubject
сначала отправляет весь буфер независимо от его внутреннего состояния новому подписчику, а затем возвращает «пустой»Subscription
.
Для нас это означает, что мы можем «завершить» ReplaySubject
и в любом случае получить его элементы. В отличие от BehaviorSubject
, когда он завершится, он никогда ничего не выдаст.
Мы можем взять тот же пример из приведенного выше и перед подпиской «поздний» подписчик выдает complete
:
// http://jsbin.com/wudiqor/3/edit?js,console behavior.next(1); behavior.next(2); behavior.complete(); behavior.subscribe(v => console.log(‘Late B subscriber:’, v)); replay.next(1); replay.next(2); replay.complete(); replay.subscribe(v => console.log(‘Late R subscriber:’, v));
«Опоздавший» BehaviorSubject
подписчик не получил ни одного элемента, потому что Тема уже заполнена. С другой стороны, ReplaySubject
все равно воспроизведет свой буфер (последний элемент, потому что мы создали его как new ReplaySubject(1)
), поэтому в консоли мы увидим Late R subscriber: 2
.
Если мы снова возьмем пример сверху с range(1, 5)
, теперь становится понятным, почему ReplaySubject
ведет себя иначе, чем Subject
. Они оба отмечают себя как остановленные, но логика их подписки различна.
Но не дайте себя обмануть. ReplaySubject
тоже «остановлен». Он не будет выдавать никаких новых элементов, он просто воспроизводит свой буфер при подписке.
Для чего все это полезно на практике?
Например, в приложениях Angular принято выполнять HTTP-запрос, а затем кэшировать результат в течение всего времени существования приложения. Очень простой подход - сохранить результат в свойстве объекта, а затем просто вернуть его через Observable.of
, что позволяет нам использовать его так же, как если бы это был настоящий HTTP-запрос:
class MyService { private cache: any; getData(): Observable<any> { if (this.cache) { return Observable.of(this.cache); } else { return this.http.get(...).do(r => this.cache = r); } } }
Конечно, это работает, но есть более подходящее решение без использования какой-либо переменной состояния и с использованием только ReplaySubject
:
// http://jsbin.com/sutiwav/9/edit?js,console class MyService { private cache = new ReplaySubject(1); getData(): Observable<any> { const http = this.http.get(...); const cache = this.cache; return Observable.merge(cache, http.do(cache)).take(1); } } const o = new MyService(); o.getData().subscribe(console.log); o.getData().subscribe(console.log); o.getData().subscribe(console.log);
Выглядит довольно странно, не правда ли? Мы могли бы написать это как однострочник, который объединяет наш cache
и фактический HTTP-запрос, а затем всегда завершает take(1)
.
Не стесняйтесь открыть демонстрацию на http://jsbin.com/sutiwav/9/edit?js,console, которая имитирует HTTP-запрос, и вы увидите, что это действительно работает.
Итак, почему это делает всего один запрос, а затем воспроизводит кешированный результат от cache
?
- Первый подписчик получает Observable, объединяющий
cache
иhttp
. Поскольку в этот моментcache
пусто, он ничего не выдаст, аmerge
подпишется наhttp
и будет ждать, пока он не выдаст элемент. Этот элемент используетdo
, вставленный вcache
(то же выражение, что и раньше с.subscribe(s)
). Затемtake(1)
гарантирует, что после получения одного элемента он излучаетcomplete
, и цепочка уничтожается. - Второй (и любой другой) подписчик получает Observable, объединяющий
cache
иhttp
. Когдаmerge
подписывается наcache
, он излучает свой буфер (этоReplaySubject(1)
с одним элементом). Это излучение немедленно распространяется дальше, гдеtake(1)
повторно излучает его и добавляетcomplete
уведомление, которое удаляет цепочку. Другими словами,merge
никогда не подписывается наhttp
, что означает, что мы никогда не сделаем еще один HTTP-запрос.
В этом примере есть одна очень интересная вещь. Наш cache
никогда не получает complete
уведомление, хотя мы используем do
, который также отправляет complete
. В любом случае, это не влияет на функциональность этого кода и связано с синхронной природой внутренних компонентов RxJS.
Вот почему в следующей статье мы поговорим о синхронной и асинхронной эмиссии в RxJS. Мы рассмотрим несколько общих примеров, а затем вернемся к этой демонстрации и посмотрим, что на самом деле произошло внутри.