Давайте напишем нашу собственную Observable реализацию интерфейса, чтобы понять, что происходит под капотом, когда мы работаем с RxJS.

Наблюдаемый объект - это просто функция. Эта функция принимает наблюдателя в качестве аргумента и возвращает объект подписки.

  • Наблюдатель - это просто объект с тремя методами: next принимает значение, error принимает сообщение об ошибке и complete не имеет аргументов. Вот как выглядит стандартный (регистрирующий) обозреватель:
{
  next(value) {
    console.log(value);
  },
  error(err) {
    console.error(err);
  },
  complete() {
    console.info('done');
  }
}
  • Объект subscription представляет одноразовый ресурс, например выполнение Observable. В этой подписке есть несколько методов, таких как add и remove, но наиболее важным из них является unsubscribe, который не принимает аргументов и просто удаляет ресурс, удерживаемый подпиской. Подробнее об этом позже, когда мы перейдем к асинхронным примерам.

Когда Observable производит значение, он сообщает наблюдателю, вызывая next для созданного значения или error при возникновении проблемы.

Это общение между наблюдаемым и наблюдателем может завершиться двумя разными способами:

  • наблюдатель (потребитель значений) решает, что он больше не заинтересован в получении дополнительных значений, и поэтому отписывается от наблюдаемого, вызывая функцию unsubscribe, возвращаемую после подписки.
  • наблюдаемый объект (производитель значений) больше не имеет значений для отправки и информирует наблюдателя, вызывая complete на нем.

Пример синхронного Observable: Rx.Observable.from

Давайте попробуем воссоздать следующее поведение, изначально предусмотренное RxJS. Мы создадим наблюдаемую, которая возвращает синхронно и немедленно пять значений с течением времени, а затем завершается.

const numbers$ = Rx.Observable.from([0, 1, 2, 3, 4]);
numbers$.subscribe(
  (value) => console.log(value),
  (err) => console.error(err),
  () => console.info('done')
);

Основываясь на нашем предыдущем определении наблюдаемого, мы можем переписать наш numbers$ поток с нуля, используя функцию, которая принимает объект наблюдателя в качестве аргумента, например:

function Observable(subscribe) {
  this.subscribe = subscribe;
}
Observable.from = (values) => {
  return new Observable((observer) => {
    values.forEach((value) => observer.next(value));
    observer.complete();
    return ({
      unsubscribe() {
        console.log('unsubscribbed');
      }
    });
  });
}
const observer = {
  next(value) {
    console.log(value);
  },
  error(err) {
   console.error(err);
  },
  complete() {
    console.info('done');
  }
};
const numbers$ = Observable.from([0, 1, 2, 3, 4]);
const subscription = numbers$.subscribe(observer);
setTimeout(subscription.unsubscribe, 500);

Заявление forEach отвечает за синхронную передачу наших ценностей. Сразу после этого мы вызываем метод complete, чтобы уведомить наблюдателя о том, что мы закончили создание значений.

По большому счету, то, что здесь происходит, - это то, что наблюдаемое уведомляет наблюдателя обо всех испущенных значениях и сразу после того, как последнее значение было продвинуто, он позволяет наблюдателю теперь, когда он испускает.

Дополнительное примечание: здесь нет смысла отказываться от подписки, поскольку значения доставляются синхронно. Просто хотел проиллюстрировать, как возвращается объект подписки и как мы можем отказаться от подписки в какой-то момент позже.

Недостаток в нашей реализации

Несмотря на то, что наша реализация работает, есть крайний случай, который мы не учли: мы можем продолжать генерировать значения даже после вызова complete. Попробуй это:

Observable.from = (values) => {
  return new Observable((observer) => {
    values.forEach((value) => observer.next(value));
    observer.complete();
    observer.next('still emitting');
      
    return ({
        unsubscribe() {
            console.log('unsubscribed');
        }
    });
  });
}

Наблюдатель получит уведомление об этом последнем still emitting значении, даже если наблюдаемый уже сообщил наблюдателю, что он испускал значения.

Давайте настроим нашу реализацию так, чтобы она перестала генерировать сообщения после отмены подписки. Для начала мы создадим своего рода оболочку наблюдателя с некоторой базовой логикой проверки, которая также отслеживает, не отписался ли уже от наблюдаемого объекта (это будет состояние наблюдателя).

class Observer {
  constructor(handlers) {
    this.handlers = handlers; // next, error and complete logic
    this.isUnsubscribed = false;
  }
  
  next(value) {
    if (this.handlers.next && !this.isUnsubscribed) {
      this.handlers.next(value);
    }
  }
  
  error(error) {
    if (!this.isUnsubscribed) {
      if (this.handlers.error) {
        this.handlers.error(error);
      }
        
      this.unsubscribe();
    }
  }
  
  complete() {
    if (!this.isUnsubscribed) {
      if (this.handlers.complete) {
        this.handlers.complete();
      }
        
      this.unsubscribe();
    }
  }
  
  unsubscribe() {
    this.isUnsubscribed = true;
    
    if (this._unsubscribe) {
      this._unsubscribe();
    }
  }
}

Я хотел бы здесь выделить одну вещь: как мы отказываемся от подписки после звонка complete или error. Это означает, что мы не только устранили ошибку в реализации, но и отменяем подписку при возникновении ошибки. Подробнее об этом здесь.

Нам также необходимо настроить нашу Observable функцию, чтобы она работала с этой новой реализацией Observer.

class Observable {
  constructor(subscribe) {
    this._subscribe = subscribe;
  }
  
  subscribe(obs) {
    const observer = new Observer(obs);
    
    observer._unsubscribe = this._subscribe(observer);
    
    return ({
      unsubscribe() {
        observer.unsubscribe();
      }
    });
  }
}

Дополнительное примечание: посмотрите, как мы возвращаем объект подписки после подписки? По этой причине мы не можем продолжать связывать операторы после вызова subscribe в потоке.

Итак, теперь, когда у нас есть вспомогательные классы Observable и Observer, давайте перепишем наш пример, чтобы использовать их:

Observable.from = (values) => {
  return new Observable((observer) => {
    values.forEach((value) => observer.next(value));
    
    observer.complete();
      
    return () => {
      console.log('Observable.from: unsubscribed');
    };
  });
}
const numbers$ = Observable.from([0, 1, 2, 3, 4]);
const subscription = numbers$.subscribe({
  next(value) { console.log(value); },
  error(err) { console.error(err); },
  complete() { console.info('done'); }
});
setTimeout(subscription.unsubscribe, 500);

Пример асинхронного Observable: Rx.Observable.interval

Мы рассказали, как Rx.Observable.from работает. Давайте теперь поработаем с асинхронным примером, воссоздающим внутреннюю работу Rx.Observable.interval:

const interval$ = Rx.Observable.interval(1000);
interval$.subscribe({
  next(value) { console.log(value); },
  error(error) { console.error(error); },
  complete() { console.info('done'); }
});

Перво-наперво: мы хотим иметь возможность отменить интервал в какой-то момент. Вот почему нам нужно вернуть какую-то логику отмены (логику удаления в терминах RxJS). Для этого мы будем использовать clearInterval.

Observable.interval = (interval) => {
  return new Observable((observer) => {
    let i = 0;
    const id = setInterval(() => {
      observer.next(i++);
    }, interval);
  
    return () => {
      clearInterval(id);
      console.log('Observable.interval: unsubscribbed');
    };
  });
}
const observer = {
  next(value) { console.log(value); },
  error(err) { console.error(err); },
  complete() { console.info('done'); }
};
const interval$ = Observable.interval(100);
const subscription = interval$.subscribe(observer);
setTimeout(subscription.unsubscribe, 1000);

Отмена подписки происходит через секунду после начала передачи наблюдаемого, то есть мы будем получать 10 значений каждые 100 миллисекунд.

Кроме того, complete никогда не вызывается, потому что это бесконечный поток - значения доступны бесконечно, поскольку они поступают из setInterval.

Наблюдение за (асинхронными) событиями DOM: Rx.Observable.fromEvent

Теперь посмотрим на события DOM. Вот как это сделать в RxJS:

const button = document.getElementById('btn');
const clicks$ = Rx.Observable.fromEvent(button, 'click');
clicks$.subscribe({
  next(value) { console.log('clicked'); },
  error(error) { console.error(error); },
  complete() { console.info('done'); }
});

Давайте добавим эту функциональность в наш Observable класс:

Observable.fromEvent = (element, eventName) => {
  return new Observable((observer) => {
    const eventHandler = (event) => observer.next(event);
    
    element.addEventListener(eventName, eventHandler, false);
    
    return () => {
      element.removeEventListener(eventName, eventHandler, false);
      console.log('Observable.fromEvent: unsubscribbed');
    };
  });
};

Как и раньше, мы подключаем прослушиватель событий при подписке через addEventListener и отключаем его, когда отказываемся от подписки, используя removeEventListener.

В этом примере мы прослушиваем только события кликов в течение первых 1,5 секунд, а потом отказываемся от подписки.

const clicks$ = Observable.fromEvent(button, 'click');
const subscription = clicks$.subscribe({
  next(value) { console.log('clicked'); },
  error(err) { console.error(err); },
  complete() { console.info('done'); }
});
setTimeout(subscription.unsubscribe, 1500);

Операторы

До сих пор мы говорили о создании наблюдаемых из разных источников, но мы не коснулись операторов, которые являются основой функционального реактивного программирования.

Давайте воссоздадим функциональность, предоставляемую map. Что делает map, так это применяет функцию преобразования к каждому значению, переданному в поток.

Примечание для сайта: операторы всегда возвращают новые потоки, что позволяет связывать их с другими операторами.

Observable.prototype.map = function (transformation) {
  const stream = this;
  
  return new Observable((observer) => {
    const subscription = stream.subscribe({
      next: (value) => observer.next(transformation(value)),
      error: (err) => observer.error(err),
      complete: () => observer.complete()
    });
    
    return subscription.unsubscribe;
  });
};

Обратите внимание, что this - это наблюдаемая, которую мы вызвали map (у нас есть доступ к ней, потому что map - это метод прототипа).

Кредиты

Я написал этот пост после просмотра этого урока Бена Леша.

Кроме того, вот jsbin, который я собрал, чтобы вы могли немного поиграть с ним.