Реактивные потоки в JavaScript с RSocket Flowable

Потоковая передача сообщений, обратное давление, отмена и асинхронное программирование на JavaScript с rsocket-flowable.

При программировании асинхронных операций разработчики JavaScript обычно по крайней мере знакомы с обещаниями, async / await и обратными вызовами. Однако новые асинхронные модели, такие как ReactiveX (RxJS в JavaScript), не так популярны. В этой статье мы углубляемся в нечто похожее, но немного отличающееся от RxJS; реализация реактивных потоков под названием RSocket Flowable (rsocket-flowable на npm).

Основные концепции

В основе RSocket Flowable лежит шаблон наблюдателя, реализованный в rsocket-flowable через интерфейсы Single и Flowable.

Шаблон наблюдателя - это шаблон разработки программного обеспечения, в котором объект, называемый субъектом, поддерживает список своих зависимых элементов, называемых наблюдателями, и автоматически уведомляет их о любых изменениях состояния, обычно вызывая один из их методов. Https://en.wikipedia.org/wiki/Observer_pattern

Единые и плавные интерфейсы

Single - это наблюдаемый интерфейс, который поддерживает следующие взаимодействия:

  • передать одно значение через обратный вызов subscriber.onComplete
  • передать значение ошибки через обратный вызов subscriber.onError
  • отмена через обратный вызов cancel, переданный наблюдателям через обратный вызов onSubscribe

Помимо отмены, эти операции должны казаться вам знакомыми, поскольку они в основном аналогичны взаимодействию с обещаниями, поскольку обещания могут только разрешаться или отклоняться.

Единственный пример

Практическим примером использования Единого интерфейса может быть упаковка API / операции обещания, например fetch API. В приведенном ниже случае мы делаем именно это; мы создаем новый экземпляр Single, который при подписке на него будет вызывать API Starwars для получения данных о Люке Скайуокере.

Flowable - это наблюдаемый интерфейс, который поддерживает следующие взаимодействия:

  • передать одно значение через обратный вызов subscriber.onComplete
  • испускать одно или несколько значений с помощью обратного вызова subscriber.onNext при вызове подписки обратного вызова запроса
  • передать одно или несколько значений ошибки через обратный вызов subscriber.onError
  • отмена через обратный вызов cancel, переданный наблюдателям через обратный вызов onSubscribe

Flowable отличается от Single на фундаментальном уровне тем, что мы ожидаем, что Flowable будет выдавать одно или несколько значений. Single всегда должен выдавать только одно значение или не передавать его. Кроме того, Flowable поддерживает концепцию противодавления.

Из реактивного манифеста:

… обратное давление - важный механизм обратной связи, который позволяет системам изящно реагировать на нагрузку, а не разрушаться под ней https://www.reactivemanifesto.org/glossary#Back-Pressure

Концепция противодавления не совсем уникальна для rsocket-flowable, но она проще по сравнению с поддержкой противодавления, предоставляемой через RxJS. Проще говоря, поддержка противодавления Flowable позволяет наблюдателю контролировать скорость, с которой наблюдаемое излучает или «публикует» значения. Для поддержки этого интерфейс Flowable принимает подписчика, который должен реализовать метод запроса. Этот метод запроса действует как обратный вызов, который отвечает за «публикацию» значений по запросу наблюдателя.

Метод запроса отвечает за публикацию данных со скоростью, запрошенной наблюдателем, при этом наблюдатель управляет потоком данных, передавая значение типа int, представляющее количество событий, которые он может обработать.

В этом примере вызов sub.request(3) приведет к вызову onNext() со значениями 0, 1, 2.

Для более сложного «реального» примера использования прочтите подробное объяснение алгоритма, использующего Flowable с пометкой «Flowable Code Example Explanation», или перейдите прямо к соответствующему образцу кода с пометкой «Flowable Code Example».

Ниже мы реализовали издателя Flowable, который будет выдавать данные, полученные из Starwars API, для каждого фильма, в котором есть персонаж Люк Скайуокер. Для этого мы реализуем метод запроса объекта подписки, переданного filmsSubscriber.onSubscribe(), который примерно следует следующему алгоритму:

Когда метод запроса вызывается впервые:

  • Получите данные о Люке Скайуокере из API Звездных войн и разрушьте массив фильмов из ответа. Сохраните коллекцию фильмов в переменной pendingFilms, чтобы мы могли ссылаться на нее при последующих вызовах request.

Когда метод запроса вызывается в первый раз и при каждом последующем вызове запроса:

  • Прокрутите каждый URL в массиве pendingFilms.
  • Разорвите цикл, если мы запросили количество фильмов, запрошенных наблюдателем (requestedFilmsCount).
  • Разорвите цикл, если загружены данные для всех фильмов.
  • Получите новый URL-адрес фильма из списка pendingFilms.
  • Получите данные о фильме, удаленном из списка pendingFilms, и добавьте полученное обещание в массив невыполненных обещаний (fetches).
  • Как только обещание разрешится, передайте полученные данные в filmsSubscriber.onNext(filmData).
  • Если обещание отклоняется, передайте полученную ошибку filmsSubscriber.onError(err).
  • После того, как все обещания, сохраненные в массиве невыполненных обещаний (fetches), будут урегулированы, проверьте, есть ли у нас еще фильмы, для которых мы еще не загрузили данные.
  • Если есть фильмы, для которых еще не загружены данные, ничего не делайте и подождите, пока наблюдатель выполнит следующий вызов request по своей подписке.
  • Если больше нет фильмов, ожидающих загрузки, вызовите filmsSubscriber.onComplete(), который будет означать для наблюдателя, что все возможные данные загружены.

Этот алгоритм значительно сложнее, чем более простой случай использования Single для пересылки результата Promise. Однако поддержка управления скоростью, с которой мы извлекаем дополнительные данные, наряду с поддержкой отмены (с небольшими изменениями), делает добавленную сложность Flowable достойным компромиссом.

Ленивые наблюдаемые

Наблюдаемые интерфейсы, реализованные с помощью rsocket-flowable, являются «ленивыми», что означает, что никакая «работа» не начинается, пока наблюдатель не подпишется на наблюдаемое. Эти наблюдаемые также можно назвать «холодными наблюдаемыми», что в отличие от «горячих наблюдаемых». При работе с горячим наблюдаемым объект может испускать значения независимо от присутствия каких-либо наблюдателей.

Напротив, вы, возможно, уже знакомы с концепцией «нетерпеливых» или «горячих» интерфейсов в форме обещаний. Для обещаний обратный вызов, переданный конструктору Promise, вызывается сразу после создания экземпляра Promise (или при следующем тике цикла событий, если вы хотите конкретизировать).

Если бы дерево, падающее в лесу, можно было наблюдать горячо, оно бы издавало звук независимо от того, был ли кто-нибудь поблизости, чтобы его услышать или нет.

В приведенном выше примере метод setTimeout в обратном вызове, переданном конструктору Promise, вызывается независимо от того, вызывается ли метод прототипа .then() или нет. Вы можете убедиться в этом сами, скопировав приведенный выше пример в консоль инструментов разработчика вашего браузера, где вы увидите, что строка журнала консоли распечатывается сразу, а затем примерно через одну секунду следует случайное значение int.

Аннулирование

Отмена - мощная функция наблюдаемых интерфейсов, таких как rsocket-flowable. Отмена позволяет наблюдателю указать наблюдаемому, что он больше не интересуется результатом каких-либо текущих операций. Отмена полезна при программировании пользовательских интерфейсов с такими фреймворками, как ReactJS, где возможность отмены асинхронных операций на лету важна для очистки состояния, чтобы избежать побочных эффектов при размонтировании компонентов. Поддержка отмены также полезна при реализации веб-сервисов с отслеживанием состояния с такими протоколами, как WebSockets, когда клиент может разорвать свое соединение в любое время, и продолжение выполнения операций от его имени после отключения, вероятно, не имеет смысла.

В приведенном ниже примере мы создаем экземпляр Flowable, который будет выдавать целочисленное значение до тех пор, пока не будет отменен, при этом подписчик запрашивает случайное количество целых чисел каждые 500 миллисекунд (полсекунды). Абонент дополнительно отменит поток int через три секунды. Этот пример аналогичен тому, как вы могли бы реализовать тайм-аут для асинхронной операции, такой как сетевой запрос или чтение файла.

Важно понимать, что отмена наблюдаемого потока только указывает наблюдаемому, что подписчик больше не заботится о получении обновлений, он не отменяет автоматически какие-либо операции, которые, возможно, выполнял издатель. Если для наблюдаемого жизненно важно реагировать на отмену, вы можете реализовать обратный вызов subscription.cancel для выполнения очистки по мере необходимости.

Наконец, с [email protected], чтобы избежать ошибки TypeError ниже, вы должны реализовать обратный вызов отмены на издателе, если вы намереваетесь вызвать отмену от подписчика.

Будущее RSocket Flowable

В предыдущей статье Состояние RSocket в JavaScript мы рассмотрели, как будущее rsocket-js может быть неопределенным, и с сопровождающим rsocket-js, заявившим, что RSocket Flowable может быть заменен в будущем новым проектом. То же самое и с RSocket Flowable. Однако репозиторий GitHub для предлагаемой замены не получал значимых вкладов в течение более 16 месяцев, поэтому трудно сказать, произойдет ли это. Конечно, не исключено, что проект будет развиваться за кулисами как проект с закрытым исходным кодом, который в будущем будет выпущен как OSS.

Последние мысли

Такие достижения, как RSocket, поддерживают постоянное внедрение наблюдаемых паттернов. RSocket моделирует поток сообщений, плавно перемещающихся по системе, каждый из которых использует ReactiveX или реализацию React Streams. Реактивные потоки и ReactiveX (реализованные как RxJS в JavaScript) выросли на мне после того, как я открыл для себя RSocket, и я верю, что мы продолжим видеть рост и принятие этих шаблонов в течение следующих нескольких лет. В экосистеме Java уже ясно, что реактивные потоки стали более распространенными благодаря популярному Project Reactor.

Я рекомендую углубиться в эти концепции, если вас заинтриговало создание высокомасштабируемых реактивных микросервисов и интерфейсов реального времени.

Первоначально опубликовано на https://viglucci.io/reactive-streams-in-javascript-with-rsocket-flowable