Абстракции более высокого уровня для асинхронных вещей в Clojure

В этой статье я собираюсь исследовать библиотеку Manifold, чтобы взглянуть на функции Manifold Deferred, Stream и Event Bus. Это действительно удобный набор инструментов для всех, кто занимается асинхронным программированием на Clojure [Script]. Мы можем использовать отложенные значения, чтобы помочь нам управлять и абстрагироваться от некоторых сложностей работы со значениями, которые могут поступать асинхронно, потоками для соединения всех видов вещей вместе и шиной событий для обеспечения простой архитектуры обмена сообщениями, управляемой событиями.

Ближе к концу статьи мы также рассмотрим, как эти инструменты могут быть объединены для выполнения асинхронной связи между передней и задней частью нашего приложения с помощью веб-сокета. Хотя мы не будем вдаваться в подробности какой-либо темы, было бы полезно получить обзор библиотеки и того, что она может предложить.

Что такое манифольд?

Manifold был первоначально разработан Заком Теллманом и описывает себя как библиотеку, которая предоставляет основные строительные блоки для асинхронного программирования, которые можно использовать в качестве уровня трансляции между библиотеками, использующими похожие, но несовместимые абстракции.

Мне нравится думать о Manifold как о более высоком уровне абстракции для нашего асинхронного программирования. Мы можем использовать ее, как любую другую асинхронную библиотеку. Однако круто то, что он предлагает гораздо большую совместимость и компоновку, чем если бы мы работали с другими библиотеками.

Вот краткий обзор трех ключевых частей библиотеки:

Manifold.deferred
Методы для создания, преобразования и взаимодействия с асинхронными значениями.

Manifold.streams
Потоки Manifold предоставляют механизмы для асинхронных операций ввода и вывода, тайм-аутов и противодавления. Они совместимы с BlockingQueues Java, ленивыми последовательностями Clojure и каналами core.async. Предоставляются методы преобразования в каждый и обратно.

Manifold.bus
Manifold также предоставляет простую шину событий, которую можно использовать для связи между публикациями и подписками.

Вы можете скачать библиотеку здесь:



Прежде чем мы продолжим, Зак также сделал несколько интересных докладов по асинхронному программированию, которые стоит посмотреть:

Двумя основными компонентами Manifold являются отложенные для одиночных асинхронных значений и потоки для упорядоченной последовательности асинхронных значений. Я буду беззастенчиво заимствовать из документации, чтобы понять тему, поэтому я не могу взять на себя ответственность за примеры, которые вы можете найти в библиотеке.

Отложенный вариант немного похож на обещание, и мы можем определить его следующим образом:

(def mydeferredvar (d/deferred)

Когда у нас есть отложенный объект, мы можем вернуть значение успеха или выдать ошибку:

(d/success! mydeferredvar :result)
(d/error! mydeferredvar (Exception. "Error Message")

Затем мы можем разыменовать отложенный объект с помощью оператора @:

@mydeferredvar
; Exception: Error Message

Мы также можем получить deferred, чтобы делать что-то, когда происходят события, путем присоединения функций обратного вызова. Это отражает другое поведение, похожее на обещание, путем передачи функции успеха, за которой следует функция ошибки.

(d/on-realized d
    (fn [x] (println "success!" x))
    (fn [x] (println "error!" x)))

Однако на самом деле вам не следует использовать обработчик обратного вызова после реализации, поскольку обратный вызов - плохой способ выполнения асинхронного программирования. Вместо этого в Manifold отложены функции гораздо более эффективных способов обработки значений.

Цепочка и композиция

Одна из интересных особенностей библиотеки Deferred - это возможность объединять несколько функций в цепочку для обработки результата, что немного похоже на макрос многопоточности. Цепочка будет запускаться последовательно, когда deferred будет присвоено значение с помощью функции (d / success!). Однако, поскольку сами цепные функции могут содержать отложенные значения или значения, подобные отложенным, такие как Futures Clojrue, Futures Java и обещания Clojure. Если какая-либо из функций в цепочке имеет эти значения, они приостановят обработку в цепочке до тех пор, пока указанный deferred не даст значение.

(d/chain mydeferred processfn1 processfn2 #(println "result: " %))

На английском это означает, что мы можем скомпоновать несколько функций вместе, используя цепочку, и выполнить их последовательно по порядку, но только тогда, когда отложенные значения каждой функции готовы для следующего шага. Если какая-либо из функций в цепочке выдает ошибку, оставшиеся шаги пропускаются, а deferred, возвращаемый функцией цепочки, возвращает сообщение об ошибке. С ними можно справиться с помощью manfold.deferred / catch с помощью макроса многопоточности Clojure.

(-> d
    (d/chain dec #(/ 1 %))
    (d/catch Exception #(println "whoops, that didn't work:" %)))

Мы также можем объединить несколько отложенных значений в одно отложенное значение с помощью оператора zip. Это может быть полезно, если у нас есть асинхронные значения, которые мы хотим передать другой функции, когда все они разрешены.

(d/zip (future 1) (future 2) (future 3))
; (1 2 3)

Мы также можем создать отложенные значения, срок действия которых истекает через x мс, используя d / timeout! функция.

@(d/timeout!
     (d/future (Thread/sleep 1000) :deferredval)
     100
     :timedoutvalue)
; :timedoutvalue

Мы также можем использовать let-flow, какое-то умное волшебство, которое позволяет нам обрабатывать отложенное значение, как если бы оно уже существует:

(defn deferred-sum []
  (let-flow [a (call-service-a)
             b (call-service-b)]
    (+ a b)))

Вуаля, теперь в функцию передается несколько асинхронных отложенных операций, которые не будут выполняться, пока не будут представлены оба значения. Мы также можем использовать асинхронные циклы:

(defn my-consume [f stream]
  (d/loop []
    (d/chain (s/take! stream ::drained)

      ;; if we got a message, run it through `f`
      (fn [msg]
        (if (identical? ::drained msg)
          ::drained
          (f msg)))

      ;; wait for the result from `f` to be realized, and
      ;; recur, unless the stream is already drained
      (fn [result]
        (when-not (identical? ::drained result)
          (d/recur))))))

Цикл волшебным образом обрабатывает отложенные значения, только когда они реализованы, поэтому мы получаем программирование в синхронном стиле с асинхронными значениями.

Представляем Manifold Streams

Потоки Manifold обеспечивают уровень абстракции для работы с BlockingQueues Java, ленивыми последовательностями Clojure и каналами core.async. Одним из основных преимуществ Manifold является то, что мы можем легко конвертировать между ними, что дает нам большую гибкость, если мы хотим изменить нашу архитектуру в будущем или добавить к ней функциональные возможности.

Мы можем создать поток так же, как и канал Core.async. Мы тоже можем поставить! ценности и бери! значения из потока. ставить! и возьми! return deferred’s, с которыми мы можем работать, используя отложенные библиотечные функции Manifold (например, цепочку / цикл).

Ключевые концепции:

  • Все асинхронные значения и операции в Manifold представлены как отложенные.
  • Потоки могут быть источниками сообщений (потребителями) или приемниками (производителями) или и тем, и другим.
  • Сообщения из всего, что является источником, могут быть переданы по конвейеру во все, что можно принять, с помощью manifest.stream/connect

Если нам нужна буферизация, мы также можем создать буфер, используя (размер буфера потока) или вызов (размер буфера потока) для создания буфера нисходящего потока. Сообщения могут быть ограничены по скорости с помощью (ограничение максимальной скорости потока).

(def s (s/stream))
(s/put! s "hello")
(s/take! s)
; "hello"

Основное преимущество заключается в том, что мы также можем использовать другие библиотеки с нашими потоками Manifold:

; Core async
(def c (async/chan))
; Call the ->source fn in manifold on the channel to connect it. 
(def s (s/->source c))

Теперь, когда мы подключили канал Core.async в качестве источника, мы можем отправлять в него сообщения с помощью основного оператора .async put (заключенного в блок go):

(async/go (async/>! c 1))

Самое интересное то, что теперь мы можем отменить ссылку на наш поток коллектора и вызвать его, чтобы получить значение:

@(s/take! s)
; 1

Мы также можем сделать то же самое в обратном порядке, вызвав - ›раковина

(def s (s/->sink c))
@(s/put! s "hello")
(async/go (println (async/<! c)))
;"hello"

Функция подключения позволяет нам устанавливать мосты между разными потоками:

> (def s (s/stream))
#'s
> (def c (a/chan))
#'c
> (s/connect s c)
nil
> (s/put! s 1)
<< true >>
> (a/<!! c)
1

Если у нас есть источник, который больше никогда не будет выдавать сообщений, мы считаем, что источник истощен. Мы можем проверить это, используя слитый? и зарегистрируйте обратный вызов по истечении срока службы. брать! на истощенном источнике по умолчанию возвращает nil, но мы можем изменить это, передав другое значение, если захотим:

> @(s/take! s ::drained)
::drained

Мы также можем установить временные ограничения на то, как долго мы готовы ждать завершения нашего ввода или вывода.

> (def s (s/stream))
> @(s/try-put! s :foo 1000 ::timeout)
> @(s/try-take! s ::drained 1000 ::timeout)

Если поместить: foo в наш поток не удалось в течение 1000 мс, мы возвращаем :: timeout. Точно если брать! поток истощен, мы возвращаем :: drained или возвращаем значение :: timeout через 1000 мс.

Потоковые операции

Мы можем мгновенно потреблять сообщения, используя s / consumer:

(s/consume #(prn 'message! %) s)

Мы также можем создавать производные потоки:

(->> [1 2 3]
    (s/map inc)
    s/stream->seq)
; (2 3 4)

В приведенном выше примере мы передаем вектор в источник, сопоставляем функцию (inc) с каждым значением, используя s/map, а затем конвертируем вывод обратно в последовательность. За кулисами Manifold также вызывает - ›источник последовательности для нас. Если бы мы не использовали карту, нам, возможно, пришлось бы быть более точным:

> (->> [1 2 3]
    s/->source
    s/stream->seq
    (map inc))
(2 3 4)

Здесь важно помнить, что (s / map inc) автоматически вызывает (s / - ›source) для нас, если он применяется к последовательности, и мы можем преобразовать поток в последовательность, используя s / stream-› seq.

Что, если бы мы попытались взять новый поток Manifold и определили две нижестоящие функции, которые действуют на значения:

(def sourcestream (s/stream))
; Downstream derived stream
(def a (s/map inc sourcestream))
(def b (s/map dec sourcestream))
@(s/put! sourcestream 0)
@(s/take! a)
; 1
@(s/take! b)
; -1

Мы также можем использовать manifestold.stream/transform с любым преобразователем Clojure, если операция не поддерживается.

(->> [1 2 3]
    (s/transform (map inc))
    s/stream->seq)
(2 3 4)

Соединение ручьев и кормушек

Мы можем соединить два потока вместе с помощью функции s / connect. Не забудьте отменить ссылку на возвращаемое значение как s / put! и с / бери! вернуть отложенный манифольд. Connect берет источник и передает все сообщения в приемник.

> (def a (s/stream))
#'a
> (def b (s/stream))
#'b
> (s/connect a b)
true
> @(s/put! a 1)
true
> @(s/take! b)
1

Connect также использует карту опций:

  • вниз по течению? - Закрывает раковину, если источник закрывается
  • вверх по течению? - закроет ли закрытие приемника источник, даже если есть другие приемники ниже по потоку (по умолчанию false)
  • timeout - максимальное время, затраченное на ожидание передачи сообщения в приемник, прежде чем соединение будет разорвано (по умолчанию ноль)
  • description - Описание связи между источником и приемником
> (def a (s/stream))
#'a
> (def b (s/stream))
#'b
> (s/connect a b {:description "a connection"})
nil
> (s/description a)
{:pending-puts 0, :drained? false, :buffer-size 0, :permanent? false, ...}
> (s/downstream a)
(["a connection" << stream: ... >>])

Мы также можем взять два потока и применить между ними функцию:

> (def a (s/stream))
#'a
> (def b (s/stream))
#'b
> (s/connect-via a #(s/put! b (inc %)) b)
nil

Значение, возвращаемое обратным вызовом для сквозного соединения, обеспечивает противодавление, как если бы было возвращено отложенное значение, дальнейшие сообщения не будут передаваться до тех пор, пока отложенное значение не будет реализовано.

Представляем шину событий Manifold

Шина событий - это простой механизм публикации / подписки для тем. Мы можем использовать его для передачи наших событий по нашему приложению. Части нашего приложения, которые интересуются определенными событиями, могут зарегистрироваться в шине событий с помощью функции подписки. Затем шина событий знает, что нужно пересылать события этим подписчикам, когда они прибывают. По умолчанию это происходит автоматически, если есть зарегистрированные подписчики, но мы также можем использовать буфер для хранения определенного количества сообщений.

Шины событий великолепны, потому что они просты в использовании и помогают разделить код нашего приложения. Когда мы публикуем событие, нам не нужно знать, кто подписаны или что будет с нашим сообщением. Мы просто кладем его в автобус, и он уезжает. Думайте об этом как о покупке билета на автобус для дедушки. В билете указано примерное место назначения, например, Огайо. Мы покупаем билет для дедушки, сажаем его в автобус и верим, что автобус доставит его туда, куда ему нужно. Мы ничего не знаем о маршруте автобуса или о том, на скольких остановках в Огайо может остановиться наш дедушка. Мы просто посадили его в автобус и надеемся, что кто-нибудь расскажет нам, когда он прибудет в Огайо.

В автобусе Manifold Event нам также нужно знать, когда Gramps посетил все пункты назначения. Мы создаем новый пункт назначения для Gramps, используя нашу функцию подписки. Когда мы отправляем дедушку (публикуем), нам нужно дождаться обратной связи, чтобы убедиться, что он благополучно прибыл во все пункты назначения (подписчики). По умолчанию мы сидим у телефона и ждем подтверждения (приостановленная беседа), прежде чем приступить к своим делам. Это поведение может измениться, если мы предоставим буфер. Имея буфер из двух бабушек и дедушек, мы могли бы отправить дедушку и бабушку и не беспокоиться о том, что они ответят. Однако, если бы мы хотели отправить в автобус третьего дедушку и бабушку, нам нужно было бы дождаться, пока одна из наших первых бабушек и дедушек будет успешно доставлена. Буферы меняют то, насколько нам нужно беспокоиться о доставке. Мы можем немедленно возобновить выполнение других обязанностей, как только высадим дедушку, потому что мы знаем, что он находится в очереди, ожидая обработки.

Мы можем создать простую шину событий следующим образом:

(def bus (event-bus))

Чтобы использовать нашу шину событий, мы можем публиковать в темах. Тема сродни логическому каналу для наших сообщений. Когда мы создаем тему, мы можем разделить наши сообщения о событиях на события, которые связаны исключительно с этой темой. Это дает нам возможность:

  • Убедитесь, что наши события обрабатываются только тогда, когда
  • По-разному маршрутизируйте определенные сообщения.
  • Увеличьте масштаб нашего оборудования или распределенных систем, чтобы справиться с увеличивающейся нагрузкой на сообщения.

Вот как мы можем опубликовать сообщение в теме:

(publish! bus topic msg)

Призывы публиковать! вернуть Manifold deferred, который не будет реализован, пока все потоки не примут сообщение. Вы должны знать, что призывы к публикации! будет ждать ответа от всех подписчиков, чтобы убедиться, что сообщение принято. Если у вас медленный потребитель, вы можете использовать буфер или соединение с тайм-аутом, чтобы обойти это.

Также мы можем подписаться на темы:

(subscribe bus topic)

По умолчанию subscribe возвращает небуферизованный поток, но мы можем изменить это при необходимости. Manifold также предоставляет несколько дополнительных служебных функций:

(active? bus topic) Возвращает истину, если на тему есть подписчики.

(downstream bus topic) Возвращает список всех потоков, подписанных на тему.

Соединяя вещи вместе

В этой статье мы рассмотрели множество тем, но ключевой вопрос заключается в том, как на самом деле использовать вышеизложенное для выполнения реальной работы? В качестве примера, вот как мы можем инициализировать и использовать соединение с веб-сокетом (поток) и использовать API Manifold Streams для подключения его к шине Manifold Event:

(ddo [ws (http/websocket-connection req)]
      (stream/consume up-consumer ws)
      (stream/connect (bus/subscribe usermsg-bus topic)
                      ws))))

В приведенном выше примере мы вызываем Consumer и передаем обратный вызов up-consumer с веб-сокетом в качестве источника. Обратный вызов восходящего потребителя затем будет получать (потреблять) сообщения от веб-сокета по мере их поступления. Мы также можем вызвать stream/connect для передачи любых сообщений в отдельную подписку на шину событий, которую мы создали. В приведенном выше коде мы подписываем шину событий usermsg-bus на нашу тему, и теперь шина будет получать поток данных из нашего веб-сокета. Теперь мы можем call (bus/publish! usermsg-bus topic message) для отправки сообщений в наш веб-сокет.

Круто то, что Manifold очень умен и позволяет нам использовать шину событий с его архитектурой, управляемой событиями pub / sub, и подключать ее напрямую к веб-сокету. Затем мы можем подписаться на любые темы и сообщения, которые поступают в этот сокет из нашего интерфейсного приложения.

В нашем интерфейсном приложении мы могли бы создать соединение с веб-сокетом, используя библиотеку веб-сокетов, такую ​​как аккорд, следующим образом:

(go
     (let [{ws :ws-channel err :error} 
       (<! (ws-ch (str "ws://localhost:2200/websockets/" user-id)
                  {:format :json-kw}))]
(if-not err
         (do
           (reset! user-websocket ws)
           (rx-messages ws)
           (js/console.log "subscribed: " user-id))

Здесь мы видим, что вызов (формат URL-адреса ws-ch) предоставит нам соединение с веб-сокетом, которое мы можем использовать для отправки сообщений в формате json-kw. Затем мы можем отправить сообщение следующим образом:

(defn tx-message
  [ws msg]
  (go (let [r (>! ws msg)]
        (js/console.log "tx-message" msg r))))

Наша функция tx-message принимает наш веб-сокет ws и сообщение ms и использует core.async put для отправки сообщения в соединение с веб-сокетом. Затем эти сообщения будут переданы на нашу шину событий, которую мы настроили на сервере.

Мы также можем прослушивать сообщения следующим образом:

(defn rx-messages
  [ws]
  (go-loop [{content :message :as msg} (<! ws)]
    (when msg
      (js/console.log "rx-messages" content)
      (re-frame/dispatch
       [::rx-message content])
      (recur (<! ws)))))

Мы используем макрос go-loop, чтобы постоянно получать значения из нашего веб-сокета.

(<! ws) - это фрагмент кода, который возвращает значение, которое затем деструктурируется в нашу привязку let. Значение content содержит полезное содержимое: message, а: msg содержит весь конверт сообщения. Затем мы можем обрабатывать каждое сообщение по мере его поступления и отправлять события изменения кадра, когда они получены. В нашем примере мы отправим :: rx-message с содержимым сообщения. Затем мы продолжим цикл бесконечно, пока соединение с ws не будет разорвано.

Цикл go-loop помогает нам сделать наш код более читабельным. Это позволяет нам связывать значения и создавать цикл в функции Core.async go без слишком большого количества шаблонов. Нам нужен макрос go для выполнения тела и возврата управления без блокировки.

Вы можете узнать больше о core.async в моей статье об асинхронном Clojure: