rxjs share with interval вызывает проблему при ожидании следующей итерации интервала

Я новичок в RxJ и нуждаюсь в помощи / понимании по следующим вопросам.

У меня есть страница, на которой отображаются текущие случаи заболевания коронавирусом. У меня есть настройка для опроса каждые 60 секунд. Я пытаюсь понять, что если я подпишусь на эту наблюдаемую через другой новый компонент, мне придется дождаться завершения следующей 60-секундной итерации, чтобы получить данные. Мой вопрос: если я хочу поделиться, есть ли способ принудительно отправить данные и перезапустить таймер?

Мне не нужны 2 разных 60-секундных интервала для вызова API. Мне просто нужен один и интервал для перезапуска, если инициализируется новый подписчик. Надеюсь, это имеет смысл.

this.covidCases$ = timer(1, 60000).pipe(
      switchMap(() =>
        this.covidService.getCovidCases().pipe(

          map(data => {
            return data.cases;
          }),
        ),
      ),
      retry(),
      share(),
    );

person KingKongFrog    schedule 06.06.2020    source источник
comment
Почему нельзя использовать только shareReplay(1)? Это потому, что вы хотите перезапускать таймер при каждой подписке?   -  person martin    schedule 07.06.2020
comment
@martin Вы действительно подняли важный вопрос.   -  person KingKongFrog    schedule 07.06.2020
comment
@KingKongFrog - одно из решений, работающих для вас, или вам нужны дальнейшие обновления?   -  person Jonathan Stellwag    schedule 15.06.2020


Ответы (3)


Я думаю, это должно сработать:

const newSubscription = new Subject();

const covidCases$ = interval(60 * 1000).pipe(
  takeUntil(newSubscription),
  repeat(),
  switchMap(() =>
    this.covidService.getCovidCases().pipe(
      /* ... */
    ),
  ),
  takeUntil(this.stopPolling),
  shareReplay(1),
  src$ => defer(() => (newSubscription.next(), src$))
);

Я заменил timer(1, 60 * 1000) + retry() на interval(60 * 1000).

Я полагал, что для перезапуска таймера (interval()) мы должны повторно подписаться на него. Но перед повторной подпиской мы должны сначала unsubscribed от нее.

Вот что делают эти строки:

interval(60 * 1000).pipe(
  takeUntil(newSubscription),
  repeat(),
  /* ... */
)

У нас есть таймер, пока не сработает newSubscription. Когда это произойдет, takeUntil выдаст complete notification, а затем отменит подписку на свой источник (в данном случае источник, созданный interval).

repeat перехватит это complete notification и повторно подпишется на наблюдаемый источник (source = interval().pipe(takeUntil())), что означает, что таймер перезапустится.

shareReplay(1) гарантирует, что новый подписчик получит последнее переданное значение.

Тогда очень важно разместить src$ => defer(() => (newSubscription.next(), src$)) после shareReplay. Используя defer(), мы можем определить момент прибытия нового подписчика.

Если бы вы поместили src$ => defer(() => (console.log('sub'), src$)) выше shareReplay(1), вы бы увидели, что sub выполнено, зарегистрировано только один раз после создания первого подписчика. Поместив его под shareReplay(1), вы должны видеть это сообщение в журнале каждый раз, когда создается подписчик.

Вернемся к нашему примеру, когда новый подписчик зарегистрирован, newSubscription будет излучать, что означает, что таймер будет перезапущен, но поскольку мы также используем repeat, полное уведомление не будет передано shareReplay, если только не отправит stopPolling.

StackBlitz demo.

person Andrei Gătej    schedule 07.06.2020
comment
отличное решение, однако первоначальное посещение ждет 3 секунды ... как мы можем настроить, чтобы первый вызов выполнялся немедленно. - person KingKongFrog; 07.06.2020
comment
Вы можете просто добавить startWith (0) после repeat () - person Andrei Gătej; 07.06.2020

Этот код создает наблюдаемый объект. Я думаю, что вам следует добавить Replaysubject вместо Observable.

Replaysubjects дает возможность генерировать то же событие при возникновении новой подписки.

timer(1, 60000).pipe(
  switchMap(() =>
    this.covidService.getCovidCases().pipe(
      tap(result => {
        if (!result.page.totalElements) {
          this.stopPolling.next();
        }
      }),

      map(data => {
        return data.cases;
      }),
      tap(results =>
        results.sort(
          (a, b) =>
            new Date(b.covidDateTime).getTime() -
            new Date(a.covidDateTime).getTime(),
        ),
      ),
    ),
  ),
  retry(),
  share(),
  takeUntil(this.stopPolling),
).subscribe((val)=>{this.covidcases.next(val)});

Эта модификация приводит к созданию таймера один раз, поэтому, когда вы подпишетесь на тему, он немедленно выдаст последнее значение.

person Sándor Jankovics    schedule 06.06.2020
comment
Не знаете, какую модификацию вы внесли, чтобы сделать эту тему повторяемой? - person KingKongFrog; 07.06.2020
comment
Модификация заключается в том, что вместо того, чтобы сделать наблюдаемое равным трубе. Изменил его на .subscribe в конце - person Sándor Jankovics; 07.06.2020

Вы можете написать оператор, который подталкивает количество вновь добавленных подписчиков к заданной теме:

const { Subject, timer, Observable } = rxjs;
const { takeUntil, repeat, map, share } = rxjs.operators;

// Operator
function subscriberAdded (subscriberAdded$) {
  let subscriberAddedCounter = 0;
  return function (source$) {
    return new Observable(subscriber => {
      source$.subscribe(subscriber)
      subscriberAddedCounter += 1;
      subscriberAdded$.next(subscriberAddedCounter)
    });
  }
}

// Usage
const subscriberAdded$ = new Subject();

const covidCases$ = timer(1, 4000).pipe(
  takeUntil(subscriberAdded$),
  repeat(),
  map(() => 'testValue'),
  share(),
  subscriberAdded(subscriberAdded$)
)

covidCases$.subscribe(v => console.info('subscribe 1: ', v));
setTimeout(() => covidCases$.subscribe(v => console.info('subscribe 2: ', v)), 5000);
subscriberAdded$.subscribe(v => console.warn('subscriber added: ', v));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>

Будущие возможности:

  • Вы можете легко обновить оператора, чтобы уменьшить количество, если вы хотите отреагировать на отписавшихся

! важно

TakeUnit + repeat уже был опубликован @ AndreiGătej. Я только предоставил альтернативный способ получения события при добавлении подписчика.

Запуск stackblitz с машинописным текстом

Если оператору subscriberAdded требуется корректировка, дайте мне знать, и я обновлю

person Jonathan Stellwag    schedule 07.06.2020
comment
Не могли бы вы уточнить «Вам не нужно вручную добавлять каждую подписку к теме»? Какая подписка к какой теме добавляется вручную? - person Andrei Gătej; 07.06.2020
comment
@ AndreiGătej Мне очень жаль. Я что-то перепутал и подумал, что вы добавляете каждую подписку через настраиваемую тему. Ваше решение работает абсолютно нормально и делает то же самое, что и мой пользовательский оператор. Просто он встроен. Единственное, может быть, преимущество (но не желаемое здесь) состоит в том, что вы можете легко добавить логику для отказа от подписки. Я оставлю этот ответ в информационных целях и обновлю свой комментарий. Еще раз извините. - person Jonathan Stellwag; 07.06.2020
comment
Не беспокойтесь, m8! Нельзя ли добавить логику отказа от подписки с finalize()? Вы можете получить уведомление об отказе от подписки, поставив finalize() после shareReplay(). Одно «предостережение» заключается в том, что обратный вызов finalize также будет вызываться, когда источник завершает / завершает работу с ошибкой. - person Andrei Gătej; 07.06.2020
comment
Это зависит от вашего желаемого поведения. Я думал о добавлении уменьшения к оператору subscriberAdded всякий раз, когда абонент остается. Это позволит вам реализовать поведение для каждого теряющего подписчика, а не только для сценария завершения или 0 подписчиков. Я думаю, это просто зависит от вашего варианта использования, но finalize кажется очень умным решением для 0 сабов :) - person Jonathan Stellwag; 07.06.2020