Последние несколько месяцев я изучал и использовал Реактивное программирование на работе, в частности, путем реализации промежуточного программного обеспечения, наблюдаемого за сокращением в создаваемом нами расширении Chrome. Если честно, попытка осмыслить эту новую парадигму программирования была трудным путешествием, и у меня определенно еще есть куда пойти. Однако по мере того, как я все больше и больше знакомился с реактивным программированием и RxJS (реализация javascript) и его основной концепцией / типом - Observable - я решил, что это отличная возможность для меня написать новый пост в блоге.

Учитывая, насколько обширна тема реактивного программирования, я считаю важным сначала определить цели этого поста. Во-первых, я не буду пытаться объяснять, что такое реактивное программирование и почему оно полезно. Если вам это интересно, я бы порекомендовал проверить ReactiveX.io, этот замечательный пост в блоге или старую добрую википедию, если вы чувствуете себя особенно академично. Я попытаюсь объяснить некоторые критические концепции и типы данных реактивного программирования, в частности Observable, Observer, Subscription, Operator и Subject.

Одна из главных мотиваций для написания этого блога - это то, что я почувствовал и что упоминалось в других ресурсах по реактивному программированию: просто не так много хороших материалов для его изучения. Официальная документация часто слишком сложная, техническая или насыщенная, чтобы служить хорошим введением, и многие учебные пособия поверхностны или не содержат достаточного объема информации, чтобы вы могли чувствовать себя комфортно в происходящем. Это не значит, что нет ничего, на что стоит обратить внимание. Фактически, я нашел - и буду опираться на эту статью - довольно много фантастических ресурсов, таких как:

Итак, с учетом сказанного я надеюсь, что этот блог послужит хорошим введением для начинающих в основные типы данных и концепции реактивного программирования. Если вы уйдете от этого, имея достаточно знаний, чтобы начать понимать некоторую официальную документацию, или с комфортом следовать вместе с найденным вами учебным курсом, я сделаю свою работу. Если нет, дайте мне знать, как я могу это улучшить, в комментариях!

Наблюдаемый

Как упоминалось выше, Observable - это основной тип данных реактивного программирования. По моему опыту, как только вы поймете, что такое Observable (а что нет) и что он делает, все остальное начинает становиться на свои места. Однако, прежде чем мы погрузимся в Observable, было бы полезно согласовать определение реактивного программирования, которое должно заложить основу того, как Observable вписывается в него. Если вы уже начали изучать реактивное программирование, вы, вероятно, встречали много определений того, что это такое, каждое, вероятно, немного более запутанное, чем предыдущее. Наиболее подходящее определение, которое я нашел до сих пор, принадлежит Андре Стальцу, и оно выглядит следующим образом:

Реактивное программирование - это программирование с использованием асинхронных потоков данных.

Как отмечает Андре, многие из нас уже знакомы с асинхронными потоками данных: подумайте о событиях щелчка / прослушивателях событий. Реактивное программирование берет эту концепцию и применяет ее ко всему. И делает это в первую очередь через Observable.

Итак, что такое наблюдаемое? Учитывая приведенное выше определение реактивного программирования, соблазнительно думать о Observable как о потоке. Некоторые ресурсы, которые вы найдете, могут даже сравнить их с обещаниями. Хотя они могут быть полезны и, безусловно, понятны, учитывая некоторые концептуальные сходства, я думаю, что они в конечном итоге приносят больше вреда, чем пользы, поскольку чрезмерно усложняют то, что такое наблюдаемое. Я пристрастился к определению наблюдаемого, которое дает Бен Леш, а именно:

«Функция, которая принимает наблюдателя и возвращает функцию»

На данный момент мы проигнорируем часть наблюдателя (мы скоро к этому вернемся) и сосредоточимся на идее, что Observable - это функция.

Как вы, возможно, догадались, это немного чрезмерное упрощение, но оно помогло мне развеять часть магии, которую я сначала приписал Observables, и, таким образом, сделало их более доступными. Чтобы получить немного больше контекста, взгляните на следующую диаграмму:

Как поясняется в источнике таблицы, Observables - это «отложенные push-коллекции нескольких значений». В этом смысле они похожи на Promises: оба являются производителями данных, которые предоставляют (отправляют) данные потребителям по своему усмотрению. Потребитель не контролирует получение данных. Это отличается от модели извлечения, в которой потребитель данных (то есть код, вызывающий функцию) диктует, когда он получает данные от производителя (то есть функции). Производитель не знает, когда данные будут запрошены (извлечены) потребителем.

Однако Observables критически отличаются от Promises. Как отмечено в таблице выше, обещания в конечном итоге возвращают только одно значение. Наблюдаемые могут возвращать значения от 0 до бесконечности. Значение, возвращаемое обещанием, всегда одно и то же, независимо от того, какой обратный вызов его получает. Однако Observable, как и функция, при каждом вызове возвращает разные значения. Два вызова функций возвращают два отдельных побочных эффекта, а два вызова Observable вызывают два отдельных побочных эффекта.

Еще одно ключевое отличие состоит в том, что обещания всегда асинхронны: выполнение кода будет продолжаться до тех пор, пока не будет выполнено обещание, после чего будут выполнены предоставленные им обратные вызовы. Наблюдаемые могут быть синхронными или асинхронными, в зависимости от того, что в них происходит. Скоро мы увидим это в действии.

Наконец, обещания можно рассматривать как «горячие» - они пытаются разрешиться сразу после создания, независимо от того, вызываете вы их then() или нет. С другой стороны, наблюдаемые «холодные» - они ничего не оценивают, пока вы их не вызовете. Пока этого не произошло, Observables просто сидят там, поэтому их называют ленивыми. Теперь - и это может сбить с толку - не все наблюдаемые объекты холодные. Их можно заставить стать горячими, например, если они созданы из обещания. Темы, которые унаследованы от Observable и к которым мы еще вернемся, также популярны. Однако пока давайте проигнорируем их. Все примеры в следующем разделе будут иметь дело с холодными Observables.

Наша первая наблюдаемая

Теперь, когда мы получили некоторое представление о том, что такое Observable и как он работает, давайте приступим и создадим его:

В строке 3 мы используем Observable.create(), который является просто псевдонимом для конструктора Observable. Он принимает один аргумент: функцию подписки, которая сама имеет аргумент observer. Код внутри этой функции подписки представляет выполнение Observable: ленивое вычисление, которое происходит только при вызове Observable. Выполнение предоставляет одно или несколько значений с течением времени и может происходить синхронно или асинхронно, как упоминалось в предыдущем разделе. Приведенный выше код демонстрирует это: строки 4–6 будут выполняться синхронно, а setTimeout в строке 7 - асинхронно.

Наблюдаемые исполнения могут выдавать 3 типа значений:

  • next() - передает значение (т.е. число, строку, объект и т. Д.)
  • error() - отправляет ошибку или исключение JavaScript
  • complete() - не отправляет значение, только сигнализирует о завершении Observable

Выше мы видим next() и complete(). Лучше всего заключить любой код в блок try/catch, который доставит уведомление об ошибке, если будет обнаружено исключение. Давай сделаем это сейчас:

Как вы могли догадаться, next() уведомления являются наиболее важным и распространенным типом, поскольку они представляют данные, доставляемые наблюдателю. При выполнении Observable может быть доставлено next() уведомлений от нуля до бесконечности. С другой стороны, error() и complete() уведомления могут появиться только один раз во время выполнения Observable, и может быть только одно из них. Если доставлено error() или complete() уведомление, то больше ничего не может быть доставлено впоследствии.

Вот и все! Наш первый Observable. А теперь давайте что-нибудь с этим сделаем. Помните - наблюдаемые ленивы. Тот, который мы создали выше, просто будет оставаться неисполненным, пока мы его не вызовем. Мы делаем это, позвонив по нему subscribe().

Подписка

Если subscribe() звучит знакомо, это потому, что это так: мы предоставляем subscribe() функцию конструктору Observable или его псевдониму create(). Хотя это может сбивать с толку, это не совпадение! Хотя в библиотеке Reactive они разные, концептуально они равны и должны рассматриваться как таковые для практических целей.

Идея состоит в том, что это помогает подчеркнуть, как subscribe() вызовы Observable, которые вызывают его выполнение, не совместно используются: при вызове observable.subscribe() функция subscribe(), предоставленная Observable.create(), запускается для что данный звонок observable.subscribe(). Другими словами: каждый вызов observable.subscribe() запускает собственное независимое выполнение Observable.

Для простоты я упустил ключевую часть подписки на Observable: Observer. Давайте сейчас погрузимся в это.

Наблюдатель

Мы установили, что Observables - это «ленивые push-коллекции нескольких значений», которые производят 3 типа значений (следующее, ошибка и завершение) и которые могут быть выполнены путем вызова subscribe() для них. Итак, теперь вам может быть интересно, как мы на самом деле получаем эти значения. Введите Observer - коллекцию обратных вызовов, которые прослушивают значения, доставленные Observable. Опять же, если кое-что из того, что следует ниже, кажется вам знакомым, вы не представляете себе вещи:

Как вы, возможно, помните, мы уже вызывали эти функции внутри выполнения Observable, когда создавали Observable. Когда мы подписываемся на Observer, мы просто определяем, что каждый из них будет делать после выполнения.

Хотя наш Observer выше включает обратный вызов для каждого типа значения, созданного Observable, мы также можем предоставить частичный Observer (т.е. только next и complete , или только next ). Наконец, мы также можем предоставить эти обратные вызовы в качестве отдельных аргументов для observable.subscribe() . Первый будет использоваться для next(), второй - для error(), а последний - для complete() . Вот почему иногда можно увидеть:

Собираем все вместе

Теперь мы рассмотрели все ключевые части определения, выполнения и получения значений из Observable. Давайте соберем их все вместе и проанализируем, что происходит.

Строки 1–23 должны выглядеть знакомо, поскольку они такие же, как и те, что мы обсуждали в предыдущих разделах, с добавлением console.log() внутри выполнения Observable, чтобы выделить, когда оно начинается.

Мы видим, что первый результат - это console.log() из строки 25, демонстрирующий, что наш наблюдаемый объект на самом деле является ленивым и еще не был выполнен в этот момент. После вызова subscribe() вызывается console.log() внутри выполнения Observer, и наши observer.next() методы регистрируют числа 1–3, предоставленные ему наблюдаемым. Все это происходит синхронно.

Затем мы видим, как Observable может выполняться асинхронно, если код внутри его выполнения является асинхронным, например setTimeout() в строке 10. Пока этот тайм-аут ожидает завершения, мы получаем console.log() из строки 27, и по истечении 1000 мс мы наконец, получите последнее значение next() и complete() из строк 11 и 12 . Et voila, наша первая наблюдаемая была успешно выполнена!

Удаление наблюдаемых исполнений

Если мы вернемся к определению наблюдаемого, которое дает Бен Леш - « функция, которая принимает наблюдателя и возвращает функцию» - вы можете заметить, что наш пример и разбивка выше описывают только первые две трети. Что насчет всего этого «возвращает функцию»? Как вы могли заметить, на самом деле мы фиксируем возвращаемое значение observable.subscribe() как переменную с именем subscription . Так какой в ​​этом смысл?

Поскольку Observable может возвращать значения от 0 до бесконечности, Observable API предоставляет способ прервать выполнение в течение некоторого конечного промежутка времени. Когда subscribe() вызывается для Observable, возвращается объект Subscription. Эта Подписка представляет собой текущее выполнение Observable и позволяет нам отменить это выполнение, вызвав метод unsubscribe() Подписки.

Давайте посмотрим, что произойдет, если мы вызовем subscription.unsubscribe() в нашем примере выше:

Как мы видим, когда мы вызываем unsubscribe() в строке 30, наблюдаемое выполнение отменяется до завершения setTimeout() внутри observable и имеет возможность доставить окончательное значение 4.

Каждый Observable должен определять, как избавляться от ресурсов для выполнения при его создании. Для нашего простого Observable выше было достаточно функции API по умолчанию unsubscribe() . Однако для более сложных Observables вы можете и должны возвращать пользовательскую функцию unsubscribe() из функции subscribe() , предоставленной для Observable.create(). Ниже приведен пример того, как это может выглядеть.

Функция subscribe() Observable.create() возвращает пользовательскую функцию unsubscribe(), которая содержит логику для очистки countInterval, установленного в рамках выполнения Observable. Если бы это было не так, и вместо этого мы полагались бы на функцию по умолчанию unsubscribe() , observable перестанет доставлять значения через 3 секунды, как это произошло выше. Однако программа никогда не завершится, потому что интервал продолжает тикать!

На этом мы подошли к концу нашего введения в Observable, основу реактивного программирования. Если вы дошли до этого момента, вы в отличной форме. Хотя на самом деле это только верхушка айсберга с точки зрения реактивного программирования, все зависит от твердого понимания Observables. Чем удобнее вы будете с ними работать, тем легче будет понять остальное, что делает реактивное программирование таким мощным.

Возможно, самый мощный аспект реактивного программирования - это его операторы. Итак, без лишних слов, давайте перейдем к тому, что это такое и как мы можем использовать их, чтобы вывести использование Observables на новый уровень.

Операторы

Определение операторов, которое вы найдете в Руководстве ReactiveX по RxJS:

«Операторы - это важные части [реактивного программирования], которые позволяют легко составить сложный асинхронный код декларативным способом»

Звучит довольно модно, но, к сожалению, затрагивает больше «почему», чем «что». Я думаю, что это важно включить сюда, поскольку это поможет объяснить некоторые особенности и поведение операторов по мере нашего продвижения вперед.

Так что же тогда операторы? Проще говоря, это функции, которые принимают исходный Observable, создают новый Observable на основе исходного Observable и возвращают новый Observable. Другими словами: Operator - это чистая функция, которая принимает один Observable в качестве входных данных и генерирует другой, новый Observable в качестве выходных данных. Если вы больше ничего не берете из этого раздела, пусть будет оно!

Как может выглядеть оператор с учетом этого определения? Давайте погрузимся в код и узнаем. Чтобы немного упростить работу, я представил новый способ создания Observable в приведенном ниже примере - не волнуйтесь, если он не совсем понятен, я объясню его в следующем разделе.

Во-первых, как и было обещано, давайте обратимся к новому причудливому способу создания Observable, который называется input в строке 17. Вместо того, чтобы использовать Observable.create() и передавать subscribe() функцию, которая императивно определяет, какие значения будет предоставлять Observable, я вместо этого использовался Observer.from(), который позволяет нам декларативно определять выходные значения Observer, передавая ему массив (или другой тип данных, который может быть преобразован в Observable методом from() , например в виде строки). Если это звучит пугающе похоже на первое определение Операторов "почему", данное выше, вы кое-что знаете. Как оказалось, Observable.create() и Observable.from() - оба оператора!

В частности, это статические операторы - чистые функции, прикрепленные к классу Observable. Большинство статических операторов, таких как create() и from(), используются для создания новых наблюдаемых объектов и поэтому называются операторами создания. Вы можете найти их полный список вместе с документацией здесь.

Переходим к нашему новому оператору square(). Как мы видим, он принимает один параметр input. Внутри функции мы используем Observable.create(), чтобы создать новый Observable с именем output, который возвращается в конце функции square(), таким образом выполняя наш контракт на чистоту. Настоящая магия этого Оператора и любого другого происходит внутри функции subscribe(), предоставленной Observable.create().

В строке 7 мы подписываемся на входной Observable и предоставляем ему объект Observer с обратным вызовом для next, error и complete, , как мы это делали в нашем первом примере Observable. Волшебство здесь в том, что каждый обратный вызов теперь также вызывает observer, предоставленный из нового вывода Observable, из строки 5. Таким образом, каждый раз, когда ввод Observable выдает значение next, то же самое делает и вывод Observable! Учитывая, что мы хотим, чтобы этот оператор возводил в квадрат значения, предоставленные ему входным Observable, мы умножаем значение, предоставленное обратному вызову next в строке 8, на себя в новом вызове observer.next() . Поскольку мы не хотим делать ничего особенного с уведомлениями error или complete , мы просто передаем их без изменений в строках 9 и 10.

Теперь, когда мы рассмотрели основы оператора, давайте посмотрим, как он работает. Обратите внимание, что когда мы subscribe() только наблюдаем input , он передает значения 1, 2, 3, 4, 5 простому наблюдателю, который мы предоставлено ему в строке 18. В строке 20 мы создаем новый Observable, output, который возвращается нашим square() operator, который принимает input в качестве параметра. Напомним, что output теперь является полностью новым Observable, поскольку square() - это чистая функция. Когда мы подписываемся на output ,, мы видим, что сначала получаем 1, затем 4, затем 9 и т. Д. Все квадраты значений, доставленных input!

Обратите внимание, что console.log() в строке 6, который указывает на начало выполнения Observable для square()'s output Observable, появляется только после того, как мы subscribe() до output в строке 22 , подтверждается тем фактом, что мы получаем 'подписку на вывод' сначала из строки 21. Это иллюстрирует ключевую концепцию: мы не подписываемся на вход Observable, пока не подписываемся на выход Observable. Таким образом, если мы «связываем» несколько операторов вместе, выполнение не происходит до тех пор, пока мы не подпишемся на окончательный вывод Observable этой цепочкой.

Другими словами: подписавшись на выходные данные, Observable также подписывается на входные данные Observable. Это обычно называется «цепочкой подписки оператора» и является одной из самых мощных частей реактивного программирования. Общие операторы, такие как filter и map (которые ведут себя аналогично функциям Javascript, из которых они берут своих тезок), могут быть объединены вместе, один за другим, для создания и составления новых Observables декларативно.

Теперь вы можете подумать: мы не можем по-настоящему «связать» Оператор, такой как square() ,, учитывая, что это всего лишь функция - и вы будете правы! Чаще всего, когда мы говорим об операторах, мы на самом деле имеем в виду операторы nstance, которые являются просто методами экземпляра Observable. Упомянутые выше операторы filter и map являются операторами экземпляра.

Принципы, лежащие в основе операторов экземпляра, аналогичны принципам оператора square() выше, но код выглядит немного иначе. Чтобы проиллюстрировать это, давайте создадим нашу собственную реализацию оператора map:

Как видите, вместо того, чтобы принимать input (или source) Observable в качестве параметра, мы можем просто использовать this ,, поскольку это будет относиться к тому наблюдаемому, которое является оператором. призвал. Причина, по которой я перешел на source, заключается в том, чтобы лучше соответствовать терминологии, которую вы найдете в официальной документации или исходном коде. В остальном все остальное остается таким же: мы создаем новый output Observable, подписываемся на source Observable в его выполнении Observable и, наконец, возвращаем его в конце метода. Поскольку это оператор map, мы берем функцию в качестве параметра, который будет применяться к каждому значению, предоставленному source Observable, прежде чем оно будет доставлено методом next() output Observable.

Как упоминалось ранее, операторы экземпляров позволяют нам создавать мощные цепочки подписок, которые позволяют нам легко манипулировать Observables:

Всего за 4 строки кода мы можем создать Observable, который доставляет значения от 0 до 9, фильтрует только те, которые являются четными, добавляет строку к каждому значению, а затем добавляет восклицательный знак к этим значениям. Это, конечно, глупый маленький пример, но, надеюсь, вы понимаете, насколько мощным он может быть в применении к проблемам реального мира. Особенно с учетом огромного количества операторов, включенных в RxJS.

Это почти завершает мое введение в Observable. Однако есть еще одна последняя концепция, которую нам нужно осветить, чтобы считать это введением полностью.

До сих пор мы рассмотрели только Observable, который, как упоминалось ранее, создает новое выполнение каждый раз, когда он subscribed(), точно так же, как функция. Хотя это удобно для изолирования наших Observables, оно оставляет их в некотором роде. Что, если мы хотим подписать несколько разных наблюдателей на одно и то же выполнение Observable? Из-за одноадресной природы Observable это, к сожалению, невозможно без использования некоторых причудливых операторов, таких как share() -, которые я, по общему признанию, еще даже не начал изучать.

К счастью для меня (и вас!), В реактивном программировании есть еще один базовый тип данных, который является многоадресным - это , он может доставлять значения нескольким подписанным наблюдателям одновременно. время. Это: Тема.

Предметы

Субъекты в реактивном программировании наследуются от Observable, поэтому на самом деле они являются просто «особыми» Observable, которые позволяют передавать значения одному или нескольким Observers. В этом смысле они очень похожи на EventEmitters: оба поддерживают реестр всех своих слушателей.

Как и Observables, Subject subscribed() связан с Observer'ом, который затем начнет получать значения, доставленные Subject. Наблюдатели ведут себя одинаково, независимо от того, подписаны ли они на Observable или Subject, и фактически не имеют «знания» о том, от Observable выполнение, от которого он получает значения, от Observable или Subject.

Однако, в отличие от Observables, подписка на Subject не вызывает новое выполнение, как это происходит для Observable. Вместо этого он просто регистрирует Observer в списке Observers, точно так же, как addListener() ведет себя для события в JavaScript. Когда субъект готов доставить значение, ошибку или полное уведомление, он просто перебирает свой список наблюдателей и передает значение / уведомление каждому из них.

Итак, как субъекты доставляют значения, если подписка не вызывает нового выполнения? Интересно, что каждый субъект также является наблюдателем -, что означает, что это объект с методами next(), error() и complete() . Чтобы предоставить новое значение или уведомление Субъекту и, следовательно, его Наблюдателям, просто вызовите Subject.next(value), и оно будет доставлено каждому Наблюдателю, зарегистрированному для него - отсюда и термин многоадресная рассылка.

Посмотрим, как это выглядит:

К настоящему времени все это должно выглядеть довольно знакомо. Единственное отличие состоит в том, что мы создаем новый subject только в строке 3, мы не предоставляем ему subscribe функцию или какой-либо код в рамках выполнения Observable. Кроме того, вызовы subscribe() в строках 6 и 11 не производят никакого вывода. Вместо этого наши наблюдатели вызывают свои next() методы только при вызове subject.next(). Когда это происходит, их обоих зовут!

Возможно, самым мощным аспектом Subjects является то, что, поскольку они также являются наблюдателями, они могут быть предоставлены функции subscribe() любого Observable. Это означает, что всякий раз, когда этот Observable доставляет значение или уведомление, любой Observer, подписанный на Subject, также получит значение или уведомление, предоставленное Observable:

Каждый раз, когда observable доставляет значение, оно вызывает subject.next() , потому что оно было предоставлено как Observer для subscribe() . В результате каждый Observer, зарегистрированный в subject, будет иметь свои next() методы, вызываемые как хорошо. В результате observable теперь эффективно использует многоадресную рассылку!

Последнее ключевое различие, которое следует проводить между Observables и Subject, заключается в том, что Subjects не могут использоваться повторно, в отличие от Observables. Как упоминалось ранее, когда Observable ошибается или завершается, его выполнение Observable завершается, и никакие дальнейшие значения не могут быть доставлены из него. Однако, поскольку Observable выполняются заново каждый раз, когда на них подписаны, Observable все еще может выполняться снова, если subscribe() вызывается на нем снова. Напротив, когда Тема ошибается или завершается - вот и все. Его больше нельзя использовать. Вызов next() по теме, которая содержит ошибки или завершена - и поэтому считается закрытой - ничего не даст. Субъект молча проигнорирует это. Бен Леш приводит пример такого поведения и того, как с ним бороться, в этом фантастическом сообщении блога о предметах. Для удобства я привел код, вдохновленный его примером ниже:

Обратите внимание, что вызовы next() в строках 7 и 8 ведут себя так, как ожидалось. Однако, когда subject.complete() вызывается в строке 9, последующий subject.next() игнорируется без каких-либо уведомлений или ошибок. Только когда мы явно unsubscribe() от субъекта, вызов next() вызывает ошибку.

Заключение

На этом мы завершаем введение в основную концепцию / тип данных реактивного программирования - Observable. В процессе мы рассмотрели не только Observable, но также Subscribe, Observers, unsubscribing, Operators и Subjects. Хотя на самом деле это лишь поверхностная часть всех этих тем, это должно, по крайней мере, дать вам знания, которые вам понадобятся для понимания некоторых официальных документов и изучения других статей по этим темам. До сих пор мне действительно нравилось реактивное программирование, несмотря на трудность его изучения, и я надеюсь, что этот пост сделает его чуть менее болезненным для вас! Это не только уникальный подход к программированию, но и очень мощный инструмент при правильном использовании.