Превратите этот вызов Clojure в ленивую последовательность

Я работаю с набором инструментов для обмена сообщениями (это Spread, но я не знаю, важны ли детали ). Для получения сообщений из этого инструментария требуется некоторый шаблон:

  1. Создайте соединение с демоном.
  2. Присоединяйтесь к группе.
  3. Получите одно или несколько сообщений.
  4. Покиньте группу.
  5. Отключиться от демона.

Следуя некоторым идиомам, которые я видел в в другом месте, я смог создать несколько рабочих функций. используя Java API Spread и формы взаимодействия Clojure:

(defn connect-to-daemon
  "Open a connection"
  [daemon-spec]
  (let [connection (SpreadConnection.)
        {:keys [host port user]} daemon-spec]
    (doto connection
      (.connect (InetAddress/getByName host) port user false false))))

(defn join-group
  "Join a group on a connection"
  [cxn group-name]
  (doto (SpreadGroup.)
    (.join cxn group-name)))

(defn with-daemon*
  "Execute a function with a connection to the specified daemon"
  [daemon-spec func]
  (let [daemon (merge *spread-daemon* daemon-spec)
        cxn (connect-to-daemon daemon-spec)]
    (try
     (binding [*spread-daemon* (assoc daemon :connection cxn)]
       (func))
     (finally
      (.disconnect cxn)))))

(defn with-group*
  "Execute a function while joined to a group"
  [group-name func]
  (let [cxn (:connection *spread-daemon*)
        grp (join-group cxn group-name)]
    (try
     (binding [*spread-group* grp]
       (func))
     (finally
      (.leave grp)))))

(defn receive-message
  "Receive a single message. If none are available, this will block indefinitely."
  []
  (let [cxn (:connection *spread-daemon*)]
    (.receive cxn)))

(По сути, та же идиома, что и with-open, только класс SpreadConnection использует disconnect вместо close. Грр. Кроме того, я пропустил некоторые макросы, которые не имеют отношения к структурному вопросу здесь.)

Это работает достаточно хорошо. Я могу вызвать получение сообщения изнутри структуры, например:

(with-daemon {:host "localhost" :port 4803}
  (with-group "aGroup"
    (... looping ...
      (let [msg (receive-message)] 
        ...))))

Мне приходит в голову, что receive-message было бы чище в использовании, если бы это была бесконечная ленивая последовательность, производящая сообщения. Итак, если бы я хотел присоединиться к группе и получать сообщения, код вызова должен выглядеть примерно так:

(def message-seq (messages-from {:host "localhost" :port 4803} "aGroup"))
(take 5 message-seq)

Я видел множество примеров ленивых последовательностей без очистки, это не так уж сложно. Загвоздка в шагах № 4 и 5 сверху: выход из группы и отключение от демона. Как привязать состояние соединения и группы к последовательности и запустить необходимый код очистки, когда последовательность больше не нужна?


person Community    schedule 27.10.2009    source источник


Ответы (2)


В этой статье описывается, как сделать именно то, что использует clojure-contrib fill-queue. Что касается очистки - интересная особенность fill-queue заключается в том, что вы можете предоставить блокирующую функцию, которая очищает себя, если возникает ошибка или достигается какое-то условие. Вы также можете хранить ссылку на ресурс, чтобы управлять им извне. Последовательность просто завершится. Поэтому в зависимости от ваших семантических требований вам придется выбрать подходящую стратегию.

person Timothy Pratley    schedule 27.10.2009

Попробуй это:

(ns your-namespace
  (:use clojure.contrib.seq-utils))

(defn messages-from [daemon-spec group-name]
  (let [cnx (connect-to-deamon daemon-spec))
        group (connect-to-group cnx group-name)]
    (fill-queue (fn [fill]
                  (if done? 
                      (do
                        (.leave group)
                        (.disconnect cnx)
                        (throw (RuntimeException. "Finished messages"))
                      (fill (.receive cnx))))))

Готово? значение true, если вы хотите завершить список. Кроме того, любые исключения, созданные в (.receive cnx), также завершат список.

person Eric Normand    schedule 28.10.2009