Как правильно использовать TryScan в F#

Я пытался найти пример использования TryScan, но ничего не нашел, не могли бы вы мне помочь?

Что я хотел бы сделать (весьма упрощенный пример): у меня есть MailboxProcessor, который принимает два типа сообщений.

  • Первый GetState возвращает текущее состояние. GetState сообщения отправляются довольно часто

  • Другой UpdateState очень дорогой (отнимает много времени) - например. загружая что-то из Интернета, а затем соответствующим образом обновляя состояние. UpdateState вызывается очень редко.

Моя проблема в том, что сообщения GetState блокируются и ждут, пока не будут обслужены предыдущие UpdateState. Вот почему я пытался использовать TryScan для обработки всех GetState сообщений, но безуспешно.

Мой пример кода:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                mbox.TryScan(fun m ->
                    match m with 
                    | GetState(chnl) -> 
                        printfn "G processing TryScan"
                        chnl.Reply(state)
                        Some(async { return! loop state})
                    | _ -> None
                ) |> ignore

                let! msg = mbox.Receive()
                match msg with
                | UpdateState ->
                    printfn "U processing"
                    // something very time consuming here...
                    async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                    return! loop (state+1)
                | GetState(chnl) ->
                    printfn "G processing"
                    chnl.Reply(state)
                    return! loop state
             }
             loop 0
)

[async { for i in 1..10 do 
          printfn " U"
          mbox.Post(UpdateState)
          async { do! Async.Sleep(200) } |> Async.RunSynchronously
};
async { // wait some time so that several `UpdateState` messages are fired
        async { do! Async.Sleep(500) } |> Async.RunSynchronously
        for i in 1..20 do 
          printfn "G"
          printfn "%d" (mbox.PostAndReply(GetState))
}] |> Async.Parallel |> Async.RunSynchronously

Если вы попытаетесь запустить код, то увидите, что сообщение GetState практически не обрабатывается, так как ждет результата. С другой стороны, UpdateState работает только по принципу "запустил и забыл", что блокирует эффективное получение состояния.

Изменить

Текущее решение, которое работает для меня, это:

type Msg = GetState  of AsyncReplyChannel<int> | UpdateState
let mbox = MailboxProcessor.Start(fun mbox ->
             let rec loop state = async {
                // this TryScan doesn't work as expected
                // it should process GetState messages and then continue
                let! res = mbox.TryScan((function
                    | GetState(chnl) -> Some(async {
                            chnl.Reply(state)
                            return state
                        })
                    | _ -> None
                ), 5)

                match res with
                | None ->
                    let! msg = mbox.Receive()
                    match msg with
                        | UpdateState ->
                            async { do! Async.Sleep(1000) } |> Async.RunSynchronously
                            return! loop (state+1)
                        | _ -> return! loop state
                | Some n -> return! loop n
             }
             loop 0
)

Реакции на комментарии: идея с другими MailboxProcessor или ThreadPool, выполняющими UpdateState параллельно, хороша, но сейчас мне это не нужно. Все, что я хотел сделать, это обработать все сообщения GetState, а затем и остальные. Меня не волнует, что при обработке UpdateState агент заблокирован.

Я покажу вам, в чем была проблема на выходе:

// GetState messages are delayed 500 ms - see do! Async.Sleep(500)
// each UpdateState is sent after 200ms
// each GetState is sent immediatelly! (not real example, but illustrates the problem)
 U            200ms   <-- issue UpdateState
U processing          <-- process UpdateState, it takes 1sec, so other 
 U            200ms       5 requests are sent; sent means, that it is
 U            200ms       fire-and-forget message - it doesn't wait for any result
                          and therefore it can send every 200ms one UpdateState message
G                     <-- first GetState sent, but waiting for reply - so all 
                          previous UpdateState messages have to be processed! = 3 seconds
                          and AFTER all the UpdateState messages are processed, result
                          is returned and new GetState can be sent. 
 U            200ms
 U            200ms       because each UpdateState takes 1 second
 U            200ms
U processing
 U
 U
 U
 U
U processing
G processing          <-- now first GetState is processed! so late? uh..
U processing          <-- takes 1sec
3
G
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
U processing          <-- takes 1sec
G processing          <-- after MANY seconds, second GetState is processed!
10
G
G processing
// from this line, only GetState are issued and processed, because 
// there is no UpdateState message in the queue, neither it is sent

person stej    schedule 02.02.2011    source источник
comment
Тот факт, что у вас есть |> ignore после вызова TryScan, должен предупредить вас о том, что вы неправильно используете API. (Сейчас нет времени на полный ответ, надеюсь, кто-нибудь опередит меня.)   -  person Brian    schedule 03.02.2011
comment
Я знаю, что использую его неправильно. Но я не нашел ни одного поста о его использовании вообще.   -  person stej    schedule 03.02.2011
comment
Я думаю, что смысл TryScan и Scan заключается в том, чтобы дождаться сообщения и тайм-аута, если оно не получено. Единственная разница между ними заключается в том, что по тайм-ауту TryScan возвращает параметр, а Scan вызывает исключение.   -  person gradbot    schedule 03.02.2011
comment
Ваш отредактированный код все еще ожидает завершения обновления, прежде чем он ответит на вызов GetState. Единственный способ исправить это — запустить Update в другом потоке, чтобы MailboxProcessor все еще мог получать и обрабатывать сообщения GetState.   -  person gradbot    schedule 03.02.2011


Ответы (3)


Я не думаю, что метод TryScan поможет вам в этом случае. Это позволяет вам указать тайм-аут, который будет использоваться во время ожидания сообщений. Как только какое-то сообщение будет получено, оно начнет обрабатывать сообщение (игнорируя тайм-аут).

Например, если вы хотите дождаться какого-то определенного сообщения, но выполнять какие-то другие проверки каждую секунду (во время ожидания), вы можете написать:

let loop () = async {
  let! res = mbox.TryScan(function
    | ImportantMessage -> Some(async { 
          // process message 
          return 0
        })
    | _ -> None)
  match res with
  | None -> 
       // perform some check & continue waiting
       return! loop ()
  | Some n -> 
       // ImportantMessage was received and processed 
}

Что можно сделать, чтобы избежать блокировки процессора почтового ящика при обработке сообщения UpdateState? Обработчик почтового ящика (логически) однопоточный — вы, вероятно, не хотите отменять обработку UpdateState сообщения, поэтому лучший вариант — запустить его обработку в фоновом режиме и дождаться завершения обработки. Код, который обрабатывает UpdateState, может затем отправить какое-то сообщение обратно в почтовый ящик (например, UpdateStateCompleted).

Вот набросок, как это может выглядеть:

let rec loop (state) = async {
  let! msg = mbox.Receive()
  match msg with
  | GetState(repl) -> 
      repl.Reply(state)
      return! scanning state
  | UpdateState -> 
      async { 
        // complex calculation (runs in parallel)
        mbox.Post(UpdateStateCompleted newState) }
      |> Async.Start
  | UpdateStateCompleted newState ->
      // Received new state from background workflow
      return! loop newState }

Теперь, когда фоновая задача выполняется параллельно, вам нужно быть осторожным с изменяемым состоянием. Кроме того, если вы отправляете UpdateState сообщений быстрее, чем успеваете их обработать, у вас будут проблемы. Это можно исправить, например, игнорированием или постановкой в ​​очередь запросов, когда вы уже обрабатываете предыдущий.

person Tomas Petricek    schedule 02.02.2011
comment
Спасибо, Томаш. Я все же попробую использовать TryScan с тайм-аутом, а если возникнут проблемы, есть возможность использовать другие MailboxProcessor. - person stej; 03.02.2011
comment
@stej Просто имейте в виду, что тайм-аут не отменит обработку сообщения UpdateState, если оно уже запущено (и что во время обработки сообщения вы не можете обрабатывать другие сообщения параллельно). - person Tomas Petricek; 03.02.2011
comment
Да, я понимаю это. Я не хочу отменять UpdateState. На самом деле мне не нужен тайм-аут — все, что я хочу, это обработать все сообщения GetState в очереди, а после этого обработать другие (в настоящее время только UpdateState) сообщения. Я немного обновлю вопрос. - person stej; 03.02.2011

НЕ ИСПОЛЬЗУЙТЕ TRYSCAN!!!

К сожалению, функция TryScan в текущей версии F# не работает по двум причинам. Во-первых, все дело в том, чтобы указать тайм-аут, но реализация фактически не соблюдает его. В частности, нерелевантные сообщения сбрасывают таймер. Во-вторых, как и в случае с другой функцией Scan, очередь сообщений проверяется под замком, который предотвращает отправку сообщений любым другим потокам на время сканирования, которое может занимать сколь угодно долгое время. Следовательно, сама функция TryScan имеет тенденцию блокировать параллельные системы и даже может создавать тупиковые ситуации, поскольку код вызывающей стороны оценивается внутри блокировки (например, отправка аргумента функции в Scan или TryScan может привести к зависанию агента, когда код под блокировкой блокирует ожидание). приобрести замок, под которым он уже находится).

Я использовал TryScan в раннем прототипе моего производственного кода, и это вызвало множество проблем. Тем не менее, мне удалось спроектировать вокруг него, и получившаяся архитектура была на самом деле лучше. По сути, я с нетерпением Receive все сообщения и фильтрую, используя свою собственную локальную очередь.

person J D    schedule 03.02.2011
comment
Довольно интересно. Однако, если честно, я ожидал, что очередь будет заблокирована во время поиска целевого сообщения. В любом случае, нужно знать, что это правда и что тайм-аут может работать странным образом. Если бы вы могли показать какой-нибудь код, использующий локальную очередь (вы упомянули в последнем абзаце), было бы здорово. - person stej; 04.02.2011
comment
Я ожидаю, что реализация почтовых ящиков (и TPL) будет без ожидания. Я добавлю код в свой ответ... - person J D; 04.02.2011

Как упомянул Томас, MailboxProcessor является однопоточным. Вам понадобится еще один MailboxProcessor для запуска обновлений в отдельном потоке от получателя состояния.

#nowarn "40"

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | UpdateState

let runner_UpdateState = MailboxProcessor.Start(fun mbox ->
    let rec loop = async {
        let! state = mbox.Receive()
        printfn "U start processing %d" !state
        // something very time consuming here...
        do! Async.Sleep 100
        printfn "U done processing %d" !state
        state := !state + 1
        do! loop
    }
    loop
)

let mbox = MailboxProcessor.Start(fun mbox ->
    // we need a mutiple state if another thread can change it at any time
    let state = ref 0

    let rec loop = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> runner_UpdateState.Post state
        | GetState chnl -> chnl.Reply !state

        return! loop 
    }
    loop)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine() |> ignore

выход:

U start processing 0
U done processing 0
U start processing 1
U done processing 1
U start processing 2
U done processing 2
U start processing 3
U done processing 3
U start processing 4
U done processing 4
G 5
U start processing 5
G 5
U done processing 5
G 5
G 6
U start processing 6
G 6
G 6
U done processing 6
G 7
U start processing 7
G 7
G 7
U done processing 7
G 8
G U start processing 8
8
G 8
U done processing 8
G 9
G 9
U start processing 9
G 9
U done processing 9
G 9
G 10
G 10
G 10
G 10

Вы также можете использовать ThreadPool.

open System.Threading

type Msg = 
    | GetState of AsyncReplyChannel<int> 
    | SetState of int
    | UpdateState

let mbox = MailboxProcessor.Start(fun mbox ->
    let rec loop state = async {
        let! msg = mbox.Receive()

        match msg with
        | UpdateState -> 
            ThreadPool.QueueUserWorkItem((fun obj -> 
                let state = obj :?> int

                printfn "U start processing %d" state
                Async.Sleep 100 |> Async.RunSynchronously
                printfn "U done processing %d" state
                mbox.Post(SetState(state + 1))

                ), state)
            |> ignore
        | GetState chnl -> 
            chnl.Reply state
        | SetState newState ->
            return! loop newState
        return! loop state
    }
    loop 0)

[
    async { 
        for i in 1..10 do 
            mbox.Post UpdateState
            do! Async.Sleep 200
    };
    async { 
        // wait some time so that several `UpdateState` messages are fired
        do! Async.Sleep 1000

        for i in 1..20 do 
            printfn "G %d" (mbox.PostAndReply GetState)
            do! Async.Sleep 50
    }
] 
|> Async.Parallel 
|> Async.RunSynchronously
|> ignore

System.Console.ReadLine() |> игнорировать

person gradbot    schedule 03.02.2011
comment
Является ли оператор ! в (state := !state + 1 ) атомарным? Я использую MailboxProcessor, чтобы избежать изменяемых переменных, и в этом случае я думаю, что у меня могут быть проблемы. Пул потоков звучит хорошо, спасибо. - person stej; 03.02.2011
comment
Это не атомарно. Используйте дополнительные SetState и ThreadPool, если вы хотите неизменное состояние. - person gradbot; 03.02.2011