Прежде чем погрузиться в операторы совместного использования, нам нужно определить, какие наблюдаемые существуют в RxJ. Обычно существует два типа наблюдаемых: горячие и холодные. Есть отличная статья Hot vs Cold Observables, но в целом главное отличие в том, что

Производитель уведомлений в холодных наблюдаемых объектах создается самим наблюдаемым объектом и только тогда, когда наблюдатель подписывается на него.

Например, interval() создает холодное наблюдаемое. Данные создаются в наблюдаемом, и для каждой новой подписки будет создан новый интервал.

Производитель уведомлений в горячих наблюдаемых объектах создается вне наблюдаемого и независимо от того, есть ли подписчики или нет.

Например, fromEvent() создает горячее наблюдаемое, поскольку производитель уведомлений находится в DOM и существует независимо от количества подписчиков.

Иногда нам нужно заставить холодный наблюдаемый вести себя так же, как горячий, например, с помощью HTTP-запросов. Рассмотрим следующий пример HTTP-запроса в Angular

ngOnInit() {
   this.user$ = this.http.get(`api/user/1`)
   this.name$ = this.user$.pipe(
      map(user => user.name)
   );
   this.age$ = this.user$.pipe(
      map(user => user.age)
   );
}

и мы отображаем имя и возраст пользователя в шаблоне, используя асинхронный канал (предположим, в разных местах, поэтому обернуть его в один асинхронный канал невозможно)

<div>{{name$ | async}}</div>
<div>{{age$ | async}}</div>

Во вкладке браузера «Сеть» мы увидим два запроса. Причина в том, что Angular Http создает холодное наблюдаемое, поэтому каждый новый подписчик равен новому запросу. Мы определенно не хотим, чтобы несколько запросов. Сначала решим эту проблему, а потом посмотрим, как это работает.

На самом деле решить эту проблему очень просто. Все, что нам нужно сделать, это добавить share() или publish(), refCount() вот так

this.user$ = this.http.get(`api/user/1`).pipe(
  share()
);
//or 
this.user$ = this.http.get(`api/user/1`).pipe(
 publish(),
 refCount()
);

И теперь во вкладке сети у нас есть один запрос, потому что данные были разделены между всеми подписчиками. Итак, как share() или publish() волшебным образом решают эту проблему и в чем разница между ними, если они оба делают одно и то же?

TL; DR share (), publish () и другие операторы многоадресной рассылки заставляют холодные наблюдаемые вести себя как горячие

Чтобы понять, как работают операторы совместного доступа, нам нужно понять, что такое многоадресная передача.

Горячие наблюдаемые являются многоадресными, поскольку все подписчики получают данные от одного и того же производителя. Холодные наблюдаемые являются одноадресными, поскольку каждый подписчик получает данные от разных производителей.

многоадресная передача ()

RxJs имеет оператор multicast(), который берет предметную фабрику или предметную фабрику и возвращает ConnectableObservable. Субъект, переданный в качестве аргумента, действует как посредник в наблюдаемой многоадресной рассылке. Он просто передает данные из наблюдаемого источника всем подписчикам. ConnectableObservable является обычным наблюдаемым объектом, но он не будет подписываться на источник до тех пор, пока не будет вызван connect() метод. Давайте изменим приведенный выше пример на multicast(), чтобы понять, как это работает.

this.user$ = this.http.get(`api/user/1`).pipe(
  multicast(new Subject())
);

Само по себе это не сработает, потому что нам нужно вручную вызвать connect()

this.name$ = this.user$.pipe(
      map(user => user.name)
   );
this.age$ = this.user$.pipe(
      map(user => user.age)
   );
this.user$.connect();

После этого мы увидим такое же поведение, будет только один HTTP-вызов вместо двух. Подключение и отключение вручную может быть затруднено, поэтому существует refCount() оператор, который автоматически connect() при первой подписке будет вести подсчет подписок и поддерживать связь Subject с Source как пока есть хотя бы один подписчик. Когда количество S ubscriptions упадет до нуля, Subject будет отключен от Source.

Источник наблюдаемый в нашем примере - наблюдаемый, возвращаемый функцией this.http.get ()
Тема - внутренняя тема, переданная в качестве аргумента для multicast ()
Подписки или наблюдатели, - это this.name $ и this.age $ и другие наблюдатели, подписанные на Subject.

Проще говоря, все наши подписчики будут подписываться на subject X (переданный в многоадресную рассылку), а сам subject X будет подписываться на наш http-вызов. Когда observable, возвращаемый http-вызовом, испускает, наш subject X примет это значение и поделится между всеми подписчиками.

Общая идея многоадресной рассылки состоит в том, что Subject подписывается на Source, а несколько Observers подписываются на Subject.

Теперь давайте изменим наш код так, чтобы он использовал refCount(), чтобы нам не приходилось подключаться вручную.

this.user$ = this.http.get(`api/user/1`).pipe(
  multicast(new Subject()),
  refCount()
);

Теперь нам не нужно вручную вызывать connect () и беспокоиться о разъединении. refCount() подключит Subject к источнику при первой подписке на него и отключится, когда больше нет наблюдателей.
Фактически, раньше мы использовали комбинацию publish(), refCount(), и это полностью совпадает с multicast(new Subject()), refCount()

публиковать()

RxJs имеет оператор publish(), и если мы посмотрим на исходный код, мы увидим, что он использует многоадресную рассылку с Subject ()

publish () === многоадресная рассылка (новая тема ())

publish() опционально принимает функцию селектора и фактически меняет поведение оператора и заслуживает отдельной статьи. Мы пропустим эту часть и рассмотрим возможность использования publish() без функции выбора.

Поэтому, когда мы используем publish(), мы фактически используем старый multicast() с Subject(), и поэтому нам нужно либо вручную выполнять подключение и отключение, либо использовать refCount() для автоматизации этого процесса.

Поскольку мы в основном используем publish() с refCount(), существует очень похожий оператор, который использует refCount() внутри и ведет себя аналогичным образом. Это share()

Поделиться()

share () === multicast (() = ›new Subject ()). refCount ()

В нашем первом примере мы увидели, что share() делает то же самое, что и publish(), refCount(), и в большинстве случаев они одинаковы. share() - это оператор, который использует refCount() для внутренних целей, поэтому нам не нужно его вызывать. share() точно так же, как publish() использует multicast(), но разница в аргументе, переданном в multicast().

publish() использует экземпляр Subject - multicast(new Subject())

share() использует фабричную функцию, которая возвращает экземпляр Subject - multicast(() => new Subject()).refCount()

Это единственная причина, по которой мы не можем сказать, что share() это то же самое, что publish() + refCount(). Это различие вызывает различное поведение поздних подписчиков после завершения работы Source.

Различия между share () и publish () + refCount ()

Они оба используют refCount() для управления подписками, однако

publish() + refCount() - пока есть хотя бы один подписчик на Subject, он будет выдавать значения. Когда не будет подписчиков, Тема будет отключена от Источника. Для всех новых подписчиков, если Source завершился, они получат «завершенные» эмиссии, но если Source не завершил, Subject повторно подпишется на Source

share() - пока есть хотя бы один подписчик на Subject, он будет выдавать значения. Когда не будет подписчиков, Тема будет отключена от Источника. Для любого нового подписчика, независимо от того, был ли источник завершен или нет, он снова подпишется на источник, используя новую тему

Разница небольшая, но очень важная. Давайте изменим наш код, чтобы в нем появилась кнопка, которая будет обновлять данные пользователя. При нажатии на нее будут повторно получены данные с сервера.
Сначала воспользуемся share()

ngOnInit() {
   this.user$ = this.http.get(`api/user/1`).pipe(
      share()
   )
this.name$ = this.user$.pipe(
      map(user => user.name)
   );
this.age$ = this.user$.pipe(
      map(user => user.age)
   );
}
update() {
  this.name$ = this.user$.pipe(
    map(user => user.name + 'update')
  );
  this.age$ = this.user$.pipe(
    map(user => user.age + 'updated')
  );
}

Когда мы изначально загружаем данные, refCount() будет подсчитывать все ссылки. Итак, у нас будет две ссылки на Subject. Как только мы получим данные с сервера, Subject получит эти данные из источника и заполнит их. Оба наших подписчика получат данные от Subject и заполнят их, что означает, что номер ссылки в refCount() будет 0. В этом случае Subject будет отключен от Source.
Когда мы выполняем метод update(), будет создан новый экземпляр Subject (), который будет подписан на Source. Таким образом, каждое выполнение update () фактически отправляет запрос на сервер.

Теперь рассмотрим тот же пример с publish(), refCount().

this.user$ = this.http.get(`api/user/1`).pipe(
 publish(),
 refCount()
);

Снова у нас будет счетчик refCount(), установленный на 2, и как только Source излучает и завершает, счетчик будет равен 0. Но когда мы выполним метод update(), ничего не произойдет, никаких запросов к серверу не будет. Как написано выше, новые подписчики будут получать «полные» уведомления только в том случае, если Источник заполнен.

Причина, по которой они так себя ведут, находится в multicast(). Поскольку publish() использует экземпляр Subject, когда Source завершается, Subject также завершается, поэтому любой новый подписчик на этот Subject получит только «завершенное» уведомление.
share() использует фабричную функцию, которая возвращает экземпляр Subject. Когда Source завершается, Subject также будет завершен, но для новых подписчиков будет создан новый экземпляр Subject, который будет подписан на Source.

multicast () с другим типом темы

До сих пор мы обсуждали многоадресную рассылку с использованием Subject. Есть несколько других типов Subject - ReplaySubject, BehaviorSubject и AsyncSubject. Передача разных тем для многоадресной рассылки вернет ConnectableObservable, но их поведение будет отличаться.

Сначала давайте посмотрим на ReplaySubject(n), он принимает число в качестве аргумента, которое представляет собой количество излучений, которые он будет хранить в буфере. Для любого нового подписчика он будет воспроизводить n-эмитентов.

Если мы передадим ReplaySubject (n) в multicast (), все новые подписчики получат n воспроизведенных значений.

publishReplay ()

publishReplay () === многоадресная передача (новый ReplaySubject ())

publishReplay() возвращает ConnectableObservable, поэтому нам нужно либо использовать connect(), либо refCount() для управления подключениями. Давайте изменим наш пример так, чтобы каждый новый подписчик получал только буферизованное значение. Поэтому, когда мы нажимаем update(), мы не получаем новые данные, а получаем кешированное значение.

this.user$ = this.http.get(`api/user/1`).pipe(
 publishReplay(1),
 refCount()
);

Все подписчики на ReplaySubject до завершения Source получат испускаемые значения (в нашем случае только 1 значение, так как Http выдает только один раз). Для всех новых подписчиков ReplaySubject будет воспроизводить N буферизованных значений.

Поскольку мы в основном используем publishReplay() с refCount(), есть очень похожий оператор, который использует внутренний механизм подсчета ссылок и ведет себя аналогичным образом. Это shareReplay()

shareReplay ()

shareReplay() очень интересный оператор. Он может вести себя так же, как publishReplay() + refCount(), но это зависит от того, как мы используем этот оператор.

До RxJs версии 6.4.0 механизм подсчета ссылок в shareReplay() работал иначе. Начиная с 6.4.0, мы можем явно передавать аргумент to shareReplay(), чтобы использовать «нормальный» механизм подсчета ссылок. Посмотрим подробнее

shareReplay ({refCount: true}) (RXJS 6.4.0 или новее)

refCount: true указывает shareReplay() использовать механизм подсчета ссылок, аналогичный refCount(). В этом случае shareReplay({refCount: true}) почти то же самое, что publishReplay() + refCount(). Давайте изменим наш пример, чтобы использовать shareReplay.

this.user$ = this.http.get(`api/user/1`).pipe(
 shareReplay({refCount: true, bufferSize: 1})
);

Как видите, мы больше не используем refCount(), потому что shareReplay({refCount: true}) использует собственный механизм подсчета ссылок.
Результат будет таким же. Все подписчики на ReplaySubject будут получать значения, пока он их передает. Все новые подписчики получат N буферизованных значений.
Прежде чем говорить о других способах использования shareReplay, давайте рассмотрим их различия.

Различия между shareReplay ({refCount: true}) и publishReplay () + refCount ()

Оба они используют ReplaySubject(), но shareReplay() не реализован с multicast().

publishReplay(n) + refCount() - до тех пор, пока есть хотя бы один подписчик на источник, ReplaySubject будет выдавать значения, при отсутствии подписчиков ReplaySubject будет отключен от источника. Любой новый подписчик получит последние N значений от ReplaySubject и повторно подпишется на источник, используя тот же ReplaySubject, если Источник еще не завершен.

shareReplay({refCount: true, bufferSize: n}) - пока есть хотя бы один подписчик, ReplaySubject будет выдавать значения, при отсутствии подписчиков ReplaySubject будет отключен от источника. Для новых подписчиков, если Source завершился, он будет выдавать последние N значений из ReplaySubject, но если Source не завершен или в случае ошибки, он будет снова подписываться на источник только с помощью нового ReplaySubject

Чтобы увидеть разницу, используйте interval(), чтобы для новых подписчиков Источник не был завершен.

this.source = interval(1000).pipe(
  publishReplay(1),
  refCount()
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);

У нас есть 2 подписки на ReplaySubject, sub1 и sub2. Через 2 секунды они оба отпишутся от темы. Поскольку мы используем refCount(), когда больше не будет подписчиков (например, количество ссылок упадет до нуля), он отключит ReplaySubject от источника. Пока в консоли посмотрим.

 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1

Теперь предположим, что мы создадим нового подписчика на ReplaySubject одним нажатием кнопки. (после того, как refCount упадет до нуля)

newSub() {
 const sub3 = this.source.subscribe(x => console.log(‘sub 3’, x));
}

Когда newSub() будет запущен, sub3 получит последнее буферизованное значение от ReplaySubject (которое будет 1) и проверит, завершился ли Source. Если завершено, sub3 получит уведомление «завершено» и также завершится. Однако, поскольку мы используем interval(), Source не будет завершен, и внутренний ReplaySubject снова повторно подпишется на Source. Результат будет

 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** execution of newSub() ****/
 sub 3– 1 <- replayed value
 sub 3– 0 <- new subscription
 sub 3– 1
 sub 3– 2
...

Внутренний ReplaySubject повторно отображает буферизованные значения для новых наблюдателей и либо завершает, либо повторно подписывается на Source в зависимости от статуса завершения Source.

Теперь тот же пример interval() с использованием shareReplay({refCount:true })

this.source = interval(1000).pipe(
  shareReplay({refCount: true, bufferSize: 1})
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);
//execute newSub() after sub1 and sub2 unsubscribe
newSub() {
 const sub3 = this.source.subscribe(x => console.log(‘sub 3’, x));
}

shareReplay() не реализован с multicast, но он использует внутреннюю фабричную функцию, и если мы используем ее с refCount: true и счетчик ссылок упадет до нуля, для любого нового подписчика, если Source завершил, он воспроизведет буферизованные значения и отправит уведомление «завершено». Если Source не завершен для любого нового подписчика, будет создан новый ReplaySubject и подписан на Source.
В результате после выполнения кода выше и выполнения newSub() мы увидим.

 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** execution of newSub() ****/
 sub 3– 0 <- new subscription
 sub 3– 1
 sub 3– 2
...

Как видите, значение sub3 не воспроизводилось. Причина в том, что когда sub1 и sub2 отписаны, счетчик ссылок будет равен нулю, и если Source завершил, все новые подписчики, такие как sub3, получат все буферизованные значения и уведомление о завершении, но поскольку мы используем interval(), а Source не будет завершен, ReplaySubject будет уничтожен, и любой новый подписчик, такой как sub3, создаст новый экземпляр ReplaySubject и снова подпишется на Source.

shareReplay () без refCount

До сих пор мы использовали shareReplay с refCount: true. Мы можем использовать shareReplay с refCount, установленным в false или не установленным вообще, и указать только размер буфера - например, shareReplay({refCount: false, bufferSize: 3}) и shareReplay(3) одинаковы. Это означает, что ReplaySubject выдаст последние 3 значения, а refCount будет ложным. Это не означает, что не будет механизма подсчета ссылок, просто он будет вести себя иначе.

По умолчанию для refCount установлено значение false в shareReplay()

refCount: false означает, что при первой подписке на ReplaySubject он подпишется на Source. Но он не отключит ReplaySubject от Source, где больше нет подписчиков на ReplaySubject. Давайте еще раз изменим наш пример, используя refCount false.

this.source = interval(1000).pipe(
  shareReplay({refCount: false, bufferSize: 2})
  //or just shareReplay(2)
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);
setTimeout(() => {
 const sub3 = this.source.subscribe(x => console.log(‘sub 3’, x));
}, 4000);

sub1 и sub2 подписываются на ReplaySubject, а ReplaySubject подписывается на интервал источника. Через 2 секунды sub1 и sub2 откажутся от подписки, но в этом случае ReplaySubject НЕ откажется от подписки на Source. Источник будет продолжать передавать значения, даже если нет подписчиков, которые могли бы перехватить эти значения.
Через 4 секунды новый подписчик подпишется на ReplaySubject и получит 2 последних буферизованных значения и продолжит получать значения из источника. Результат будет

 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** after 4 seconds  ****/
 sub 3– 2 <- replayed values
 sub 3– 3 <-
 sub 3– 4 <- continues receiving values
 sub 3– 5
...

sub1 и sub2 подписались, напечатали значения и через две секунды отписались, но поскольку Source еще не завершен, ReplaySubject получит данные от Source, и поэтому, когда через 4 секунды sub3 подписывается на ReplaySubject, он не получит 0 и 1 как буферизованные значения, но 2 и 3, потому что за это время ReplaySubject удалось получить новые значения из Source и обновить его буфер. Единственный случай, когда ReplaySubject откажется от подписки на Source, - это когда Source завершается или возникает ошибка. Любой новый подписчик в этом случае получит воспроизведенные значения и завершится.

shareReplay(n) независимо от того, есть ли активные подписчики или нет, ReplaySubject будет продолжать передавать значения и поддерживать соединение с источником до тех пор, пока источник не завершится или не возникнет ошибка. Любой новый подписчик получит последние N значений (если не ошибка). Если Source еще не завершен, новые подписчики будут продолжать получать значения из Source

publishBehavior ()

publishBehavior() использует multicast с другой темой. BehaviorSubject

publishBehavior () === многоадресная рассылка (новый BehaviorSubject ())

publishBehavior() возвращает ConnectableObservable, поэтому нам нужно использовать refCount() или подключиться вручную.

publishBehavior() принимает значение по умолчанию в качестве параметра и отправит это значение всем подписчикам, если Источник не отправил. Рассмотрим этот пример

this.source = interval(1000).pipe(
 publishBehavior(47),
  refCount()
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  sub1.unsubscribe();
  sub2.unsubscribe();
}, 2000);
setTimeout(() => {
  const sub3 = this.source.subscribe(x => console.log('sub 3', x));
}, 4000);

Результат будет

 sub 1– 47 <- default values
 sub 2– 47 <-
 sub 1– 0
 sub 2– 0
 sub 1– 1
 sub 2– 1
/**** after 4 seconds ****/
 sub 3– 1 <- last buffered value
 sub 3– 0
 sub 3– 1
...

Поскольку interval является асинхронным, когда sub1 и sub2 подписываются на BehaviorSubject, в это время Source еще не сгенерировал, поэтому sub1 и sub2 получат значения по умолчанию от BehaviorSubject. Через две секунды sub1 и sub2 откажутся от подписки на BehaviorSubject, а сам BehaviorSubject откажется от подписки на Source. Через 4 секунды sub3 подпишется на BehaviorSubject, а поскольку Source еще не завершен, sub3 получит последнее переданное значение и повторно подпишется на Source, используя тот же BehaviorSubject.

publishBehavior(default_value) когда подписка на BehaviorSubject осуществляется до того, как Источник отправил значение, BehaviorSubject передаст этому подписчику значение по умолчанию. Пока есть хотя бы один подписчик на исходный объект BehaviorSubject, будет выдавать значения. Когда нет подписчиков, BehaviorSubject будет отключен от источника. Если источник еще не завершен, новые подписчики получат последнее значение от BehaviorSubject и повторно подпишутся на источник, используя тот же BehaviorSubject. Если Source завершил работу, все новые подписчики получат только уведомление о завершении.

publishLast ()

publishLast() использует многоадресную рассылку с AsyncSubject

publishLast () === многоадресная рассылка (новый AsyncSubject ())

Как и все операторы многоадресной рассылки, publishLast() лучше всего использовать с refCount().
AsyncSubject, используемый в этом операторе, очень интересен. Он не будет выдавать значения, если подписан, до тех пор, пока не завершится, после чего выдает последнее значение.

this.source = interval(1000).pipe(
 take(2),
 publishLast(),
 refCount()
);
const sub1 = this.source.subscribe(x => console.log('sub 1', x));
const sub2 = this.source.subscribe(x => console.log('sub 2', x));

setTimeout(() => {
  const sub3 = this.source.subscribe(x => console.log('sub 3', x));
}, 7000);

Поскольку интервал является бесконечным наблюдаемым, мы использовали take(2), поэтому он выдаст 2 значения и завершится. Вот результат

 sub 1– 1 //completed
 sub 2– 1 //completed
/**** after 7 seconds ****/
 sub 3– 1 //completed

когда sub1 и sub2 подписываются на AsyncSubject, они не получат никаких значений до завершения Source. Когда Source завершит работу, AsyncSubject передаст последнее значение всем наблюдателям и тоже завершится. Через 7 секунд sub3 подписывается на AsyncSubject, и, поскольку он завершен, он также передаст последнее значение и уведомление о «завершении» в sub3.

publishLast() — Независимо от того, сколько подписчиков подключено к
AsyncSubject, он не будет выдавать никакого значения, пока Source не завершит работу, однако любой побочный эффект будет выполнен . Когда Source завершает работу, AsyncSubject также завершается и отправляет последнее значение и уведомление о завершении всем подписчикам, текущим и новым.

В Rxjs много чего происходит, и многоадресная рассылка - одна из самых важных вещей в библиотеке. Надеюсь, эта статья помогла вам понять, как работают операторы совместного доступа и в чем их отличие. Спасибо за прочтение.