Выполнение по запросу горячего Observable

Приведем холодный пример:

Observable<Integer> cold = Observable.create(subscriber -> {
  try {
    for (int i = 0; i <= 42; i++) {

      // avoid doing unnecessary work
      if (!subscriber.isUnsubscribed()) {
        break;
      }

      subscriber.onNext(i);
    }
    subscriber.onCompleted();
  } catch (Throwable cause) {
    subscriber.onError(cause);
  }
});

он начинает выполняться с нуля для каждого нового подписчика:

// starts execution
cold.subscribe(...)

и может остановить выполнение, если подписчик откажется от подписки досрочно:

// stops execution
subscription.unsubscribe();

теперь, если вместо примера цикла for у нас есть какая-то фактическая бизнес-логика (которая не должна воспроизводиться для каждого подписчика, а должна выполняться в реальном времени), то мы имеем дело с горячим наблюдаемым ...

PublishSubject<Integer> hot = PublishSubject.create();

Thread thread = new Thread(() -> {
  try {
    for (int i = 0; i < 42; i++) {
      // how to avoid unnecessary work when no one is subscribed?
      hot.onNext(i);
    }
    hot.onCompleted();
  } catch (Throwable cause) {
    hot.onError(cause);
  }
});

когда мы хотим, чтобы это началось, мы могли бы сделать

// stats work (although no one is subscribed) 
thread.start();

отсюда первый вопрос: как начать работу только тогда, когда подписывается первый наблюдатель? (может быть, подключаемый наблюдаемый?)

и важный вопрос: как остановить работу, когда последний подписчик отписался? (Я не могу понять, как получить доступ к текущим подпискам для этого субъекта, и хотел бы найти чистое решение без общего глобального состояния, если такое решение существует )

Одно из решений, которое я могу придумать, - это поднять тему с помощью специального оператора, который будет управлять подписчиками ...


person vach    schedule 25.12.2015    source источник


Ответы (1)


см. оператор refCount - http://reactivex.io/documentation/operators/refcount.html. Этот оператор превращает ваш Observable в ConnectableObservable и подключает его, когда первый подписчик подписывается, и отключается, когда подписок больше нет.

person krp    schedule 25.12.2015