Наблюдаемый против FSharpx asyncSeq

У меня есть следующий класс потока твитов. У него есть событие TweetReceived, которое можно использовать с другими компонентами моей системы.

Кажется, это работает нормально, но у меня такое ощущение, что это сложнее, чем должно быть.

Существуют ли какие-либо инструменты, которые дают мне эту функциональность без необходимости самостоятельно реализовывать механизм mbox/event?

Также вы бы порекомендовали использовать asyncSeq вместо IObservable?

Спасибо!

type TweetStream ( cfg:oauth.Config) =
    let token = TwitterToken.Token (cfg.accessToken,
                                    cfg.accessTokenSecret,
                                    cfg.appKey, 
                                    cfg.appSecret)

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")

    let event = new Event<_>()

    let agent = MailboxProcessor.Start(fun (mbox) ->
        let rec loop () =
            async {
                let! msg = mbox.Receive()
                do event.Trigger(msg)
                return! loop()
            }
        loop ()) 

    member x.TweetReceived = event.Publish

    member x.Start () =
        Task.Factory.StartNew(fun () -> stream.StartStream(token, agent.Post))
        |> ignore

    member x.Stop = stream.StopStream

ОБНОВЛЕНИЕ: Спасибо Томасу за быстрый (как всегда) ответ на второй вопрос.

Мой первый вопрос может быть немного неясным, поэтому я реорганизовал код, чтобы сделать класс AgentEvent видимым, и перефразирую первый вопрос: есть ли способ упростить реализацию логики в AgentEvent? Эта логика уже где-то реализована?

Я спрашиваю об этом, потому что это похоже на общий шаблон использования.

type AgentEvent<'t>()=
    let event = new Event<'t>()

    let agent = MailboxProcessor.Start(fun (mbox) ->
        let rec loop () =
            async {
                let! msg = mbox.Receive()
                do event.Trigger(msg)
                return! loop()
            }
        loop ()) 
    member x.Event = event.Publish
    member x.Post = agent.Post

type TweetStream ( cfg:oauth.Config) =
    let token = TwitterToken.Token (cfg.accessToken,
                                    cfg.accessTokenSecret,
                                    cfg.appKey, 
                                    cfg.appSecret)

    let stream = new SimpleStream("https://stream.twitter.com/1.1/statuses/sample.json")

    let agentEvent = AgentEvent()

    member x.TweetReceived = agentEvent.Event

    member x.Start () =
        Task.Factory.StartNew(fun () -> stream.StartStream(token, agentEvent.Post))
        |> ignore

    member x.Stop = stream.StopStream

person vidi    schedule 11.01.2014    source источник


Ответы (1)


Я думаю, что IObservable — правильная абстракция для публикации событий. Что касается их обработки, я бы использовал либо реактивные расширения, либо агенты F# (MailboxProcessor), в зависимости от того, что вы хотите сделать.

Обратите внимание, что F# автоматически представляет события как значения IObservable (на самом деле IEvent, но это наследуется от observable), поэтому вы можете использовать реактивные расширения непосредственно на TweetReceived.

Что такое правильное представление?

  • Суть asyncSeq в том, что он позволяет вам контролировать скорость генерации данных — он похож на async тем, что вам нужно запустить его, чтобы фактически выполнить работу и получить значение — так что это полезно, если вы можете начать какую-то операцию ( например загрузить следующие несколько байтов), чтобы получить следующее значение

  • IObservable полезен, когда у вас нет контроля над источником данных — когда он просто продолжает создавать значения, и у вас нет возможности приостановить его — это кажется более подходящим для твитов.

Что касается обработки, я думаю, что Reactive Extensions хороши, когда они уже реализуют нужные вам операции. Когда вам нужно написать некоторую пользовательскую логику (которую нелегко выразить в Rx), использование Agent — отличный способ написать свои собственные Rx-подобные функции.

person Tomas Petricek    schedule 11.01.2014
comment
Спасибо за быстрый ответ. Я перефразировал первую часть вопроса. Можете ли вы ответить и на это? Спасибо! - person vidi; 12.01.2014
comment
Другая проблема, с которой я столкнулся при использовании Observable, заключается в том, что функции подписчика возвращают unit вместо Async<unit>. Создать адаптер с помощью BlockingQueueAgent достаточно просто, но это уменьшает детерминизм. - person eulerfx; 13.01.2014