Один из распространенных типов проблем, снова и снова возникающих на stackoverflow.com, - это повторное использование одного экземпляра любого из классов Subject, а затем удивление, что он не работает так, как можно было бы ожидать.

Давайте начнем с разговоров о субъектах и ​​их внутреннем состоянии, а также о том, почему так важно знать complete и error уведомления. Затем мы перейдем к более интересным примерам с классами ReplaySubject и BehaviorSubject.

Повторное использование одного экземпляра Subject

Все субъекты имеют внутреннее состояние, которое отражает самый основной принцип Rx. Каждый Observable излучает ноль или более next уведомлений и одно complete или error уведомление, но никогда то и другое одновременно. На практике это означает, что когда экземпляр Subject получает complete, он никогда ничего не должен испускать. Мы можем увидеть это на следующем примере:

// http://jsbin.com/nobuwud/2/edit?js,console
const s = new Subject();
s.subscribe(console.log);
s.next(1);
s.next(2);
s.complete();
s.next(3);

Это печатает только числа 1 и 2. Номер 3 выдается после того, как наш Subject уже получил complete уведомление, которое пометило себя как остановленное. Это означает, что этот экземпляр Subject больше никогда ничего не будет излучать (нет законного способа снова сделать объект "не остановленным").

Очень распространенная проблема при повторном использовании субъектов - непреднамеренная передача complete уведомления. Например, давайте рассмотрим следующий пример:

// http://jsbin.com/matatit/1/edit?js,console
const s = new Subject();
s.subscribe(console.log);
Observable.range(1, 5)
  .subscribe(s);
s.next(42);

Это печатает только числа 1 - 5, но что случилось с 42?

range(1, 5) source Observable отправляет помимо nexts также complete уведомление в конце. Обычно мы подписываемся на обработку только next элементов, и нас не интересует complete уведомление, но в данном случае это очень важно. Итак, что произошло, должно быть очевидно. s Субъект получил complete уведомление, в котором он пометил себя как остановленный, и больше никогда ничего не будет отправлять.

Так что, если мы хотим получать все next, но не complete уведомление (и не error)? Самый простой способ - вручную вызвать next() в теме:

// http://jsbin.com/matatit/2/edit?js,console
...
Observable.range(1, 5)
  .subscribe(val => s.next(val));
...

Теперь, когда мы снова запускаем этот пример, мы также получаем число 42.

Обратите внимание, что все классы Subject имеют isStopped публичное логическое свойство, в котором вы можете проверить их состояние. Также имейте в виду, что для error уведомлений он работает так же, как и для complete.

Сравнение BehaviorSubject и ReplaySubject

Теперь, когда мы находимся на одной странице, давайте рассмотрим более интересный пример. Мы будем использовать BehaviorSubject и ReplaySubject, потому что их можно часто менять местами. Однако их поведение отличается от сигнала complete.

Мы можем взглянуть на тот же пример, что и выше, но на этот раз мы будем использовать ReplaySubject и подпишемся после получения complete уведомления от range:

// http://jsbin.com/hewomer/2/edit?js,console
const s = new ReplaySubject();
Observable.range(1, 5)
 .subscribe(s);
s.subscribe(console.log); // should this print anything?

Это напечатает числа 1 - 5. Но почему? Разве мы не узнали, что Субъекты ничего не излучают после получения complete, которое автоматически отправляется range, как мы видели ранее?

Это все еще правда. Однако существуют различия в реализациях Subject.

Мы можем увидеть разницу на более общем примере. Допустим, мы хотим кэшировать отдельный элемент, а затем воспроизводить его для каждого нового подписчика. И BehaviorSubject, и ReplaySubject будут работать в этом варианте использования, даже если их использование отличается (BehaviorSubject имеет значение по умолчанию). Например, мы могли бы использовать их следующим образом:

// http://jsbin.com/hotidih/5/edit?js,console
const behavior = new BehaviorSubject(null);
const replay = new ReplaySubject(1);
behavior.skip(1).subscribe(v => console.log(‘BehaviorSubject:’, v));
replay.subscribe(v => console.log(‘ReplaySubject:’, v));
behavior.next(1);
behavior.next(2);
behavior.subscribe(v => console.log(‘Late B subscriber:’, v));
replay.next(1);
replay.next(2);
behavior.subscribe(v => console.log(‘Late R subscriber:’, v));

В этом примере все числа печатаются дважды. Один раз с префиксом «BehaviorSubject», а затем снова с префиксом «ReplaySubject» (обратите внимание, что нам пришлось использовать оператор skip(1), чтобы пропустить значение по умолчанию, исходящее из BehaviorSubject).

Так почему мы должны выбирать одно вместо другого (в сценариях, где производительность не является проблемой) и как это связано с внутренними состояниями Субъекта?

Все сводится к тому, как каждый из них работает внутри по подписке после получения уведомления complete:

  • BehaviorSubject отмечает себя как остановившийся и никогда ничего не излучает. По сути, он возвращает «пустой» Subscription объект, не представляющий никакой реальной подписки.
  • ReplaySubject сначала отправляет весь буфер независимо от его внутреннего состояния новому подписчику, а затем возвращает «пустой» Subscription.

Для нас это означает, что мы можем «завершить» ReplaySubject и в любом случае получить его элементы. В отличие от BehaviorSubject, когда он завершится, он никогда ничего не выдаст.

Мы можем взять тот же пример из приведенного выше и перед подпиской «поздний» подписчик выдает complete:

// http://jsbin.com/wudiqor/3/edit?js,console
behavior.next(1);
behavior.next(2);
behavior.complete();
behavior.subscribe(v => console.log(‘Late B subscriber:’, v));
replay.next(1);
replay.next(2);
replay.complete();
replay.subscribe(v => console.log(‘Late R subscriber:’, v));

«Опоздавший» BehaviorSubject подписчик не получил ни одного элемента, потому что Тема уже заполнена. С другой стороны, ReplaySubject все равно воспроизведет свой буфер (последний элемент, потому что мы создали его как new ReplaySubject(1)), поэтому в консоли мы увидим Late R subscriber: 2.

Если мы снова возьмем пример сверху с range(1, 5), теперь становится понятным, почему ReplaySubject ведет себя иначе, чем Subject. Они оба отмечают себя как остановленные, но логика их подписки различна.

Но не дайте себя обмануть. ReplaySubject тоже «остановлен». Он не будет выдавать никаких новых элементов, он просто воспроизводит свой буфер при подписке.

Для чего все это полезно на практике?

Например, в приложениях Angular принято выполнять HTTP-запрос, а затем кэшировать результат в течение всего времени существования приложения. Очень простой подход - сохранить результат в свойстве объекта, а затем просто вернуть его через Observable.of, что позволяет нам использовать его так же, как если бы это был настоящий HTTP-запрос:

class MyService {
  private cache: any;
  getData(): Observable<any> {
    if (this.cache) {
      return Observable.of(this.cache);
    } else {
      return this.http.get(...).do(r => this.cache = r);
    }
  }
}

Конечно, это работает, но есть более подходящее решение без использования какой-либо переменной состояния и с использованием только ReplaySubject:

// http://jsbin.com/sutiwav/9/edit?js,console
class MyService {
  private cache = new ReplaySubject(1);
  getData(): Observable<any> {
    const http = this.http.get(...);
    const cache = this.cache;
    return Observable.merge(cache, http.do(cache)).take(1);
  }
}
const o = new MyService();
o.getData().subscribe(console.log);
o.getData().subscribe(console.log);
o.getData().subscribe(console.log);

Выглядит довольно странно, не правда ли? Мы могли бы написать это как однострочник, который объединяет наш cache и фактический HTTP-запрос, а затем всегда завершает take(1).

Не стесняйтесь открыть демонстрацию на http://jsbin.com/sutiwav/9/edit?js,console, которая имитирует HTTP-запрос, и вы увидите, что это действительно работает.

Итак, почему это делает всего один запрос, а затем воспроизводит кешированный результат от cache?

  1. Первый подписчик получает Observable, объединяющий cache и http. Поскольку в этот момент cache пусто, он ничего не выдаст, а merge подпишется на http и будет ждать, пока он не выдаст элемент. Этот элемент использует do, вставленный в cache (то же выражение, что и раньше с .subscribe(s)). Затем take(1) гарантирует, что после получения одного элемента он излучает complete, и цепочка уничтожается.
  2. Второй (и любой другой) подписчик получает Observable, объединяющий cache и http. Когда merge подписывается на cache, он излучает свой буфер (это ReplaySubject(1) с одним элементом). Это излучение немедленно распространяется дальше, где take(1) повторно излучает его и добавляет complete уведомление, которое удаляет цепочку. Другими словами, merge никогда не подписывается на http, что означает, что мы никогда не сделаем еще один HTTP-запрос.

В этом примере есть одна очень интересная вещь. Наш cache никогда не получает complete уведомление, хотя мы используем do, который также отправляет complete. В любом случае, это не влияет на функциональность этого кода и связано с синхронной природой внутренних компонентов RxJS.

Вот почему в следующей статье мы поговорим о синхронной и асинхронной эмиссии в RxJS. Мы рассмотрим несколько общих примеров, а затем вернемся к этой демонстрации и посмотрим, что на самом деле произошло внутри.