Как представить, что актор Akka находится в длительной операции, используя FSM-s

Мой актор, описанный с помощью FSM, ожидает триггера (в состоянии ожидания). Когда он получает это, он начинает обрабатывать некоторые данные (и переходит в состояние выполнения), а когда это делается, он возвращается в состояние ожидания.

Если я правильно понимаю модель FSM, с этой точки зрения было два события: начало обработки (Idle->Running) и обработка завершена (Running->Idle).

Но с актерской точки зрения было только одно сообщение.

Одна из возможностей состоит в том, чтобы делегировать саму обработку другому актеру. Таким образом, я могу перенаправить инициирующее событие и перейти в состояние «Работает», а затем, получив результат, перейти в режим ожидания. Его преимущество состоит в том, что сам FSM может быстро реагировать на запросы (например, спрашивать, каково текущее состояние), но это усложняет конструкцию.

Еще один — отправить себе завершенное сообщение, когда актор завершит обработку, что вызовет переход Running -> Idle, но для меня это выглядит немного странно.

Какие еще варианты у меня есть?

Примечание: есть несколько других состояний с различными переходами, поэтому я хотел бы придерживаться модели FSM.


person Sandor Murakozi    schedule 30.09.2013    source источник


Ответы (1)


Поскольку кажется, что у вас есть актор, которому нужно выполнять обработку и выполнять переходы в FSM, я предлагаю вам следовать этим рекомендациям (некоторые основные наброски кода следуют за этим списком):

  • Актер A получает рассматриваемое сообщение, которое запускает некоторую обработку.
  • Актор A переводит отдельный FSM-актор, скажем F, (akka.actor.FSM) в соответствующее состояние. Актер A при запуске будет порождать конкретный FSM для отслеживания состояния для соответствующего контекста (например, для всех транзакций или состояния для каждой транзакции или некоторого другого контекста). Схема кода ниже использует всю обработку или завершенные транзакции в качестве контекста для примера, но, возможно, потребуется изменить ее.
  • Затем актор A запускает любую обработку, которая должна быть запущена для сообщения. Помните, что актеры обычно не должны блокироваться, но вот ответ, который содержит дополнительные рекомендации о том, когда актер Akka может блокировать.
  • Альтернатива: если вы можете инициировать длительную обработку без блокировки и гарантировать, что вы получаете необходимые события после этапов обработки другой стороной, тогда вы можете исключить фронтальный актор A и просто использовать FSM-актор F. В этом случае вы должны посмотреть на onTransition .

Итак, мое предложение по наброску кода основано на том, что я понял из вопроса:

/* Events */
sealed trait MyEvents
case class ProcessingStarted(txnId: Long) extends MyEvents
case class ProcessingFinished(txnId: Long, result: Result) extends MyEvents

/* Valid states for your FSM */
sealed trait MyStates 
case object Idle extends MyStates
/* Constructor arguments could be anything, I randomly chose a Long for a transaction ID which may be specific to a job */
case object Processing extends MyStates
/* define more case classes or objects depending on the rest of the states */

/* Valid internal state data types for FSM */
sealed trait MyDataTypes
case object Uninitialized extends MyDataTypes
case class StateData(processingIds: Seq[Long], resultMap: Map[Long, Result]) extends MyDataTypes

import akka.actor.{ Actor, ActorRef, FSM }
import scala.concurrent.duration._

 class ActorF extends FSM[MyStates, MyDataTypes] {
   startWith(Idle, Uninitialized)

   when(Idle) {
     case Event(ProcessingStarted(txnId), Uninitialized) =>
       goto(Processing) using StateData(Seq(txnId), Map.empty[Long, Result])
     case Event(ProcessingStarted(txnId), StateData(emptyIds, resultMap)) =>
       goto(Processing) using StateData(Seq(txnId), resultMap)
   }

   when(Processing) {
     case Event(ProcessingFinished(txnId, result), StateData(processingIds, resultMap)) => {
       val remainingIds = processingIds diff Seq(txnId)
       val newResultMap = resultMap + (txnId -> result)
       if (remainingIds.isEmpty) {
         goto(Idle) using StateData(remainingIds, newResultMap)
       } else {
         stay using StateData(remainingIds, newResultMap)
       }
     }
   }

   initialize()
 }

 // inside Actor A do something like this for creating the FSM actor (just like other actors)
 val f = system.actorOf(Props(classOf[ActorF], this))

 // send an event message to it just like other types of actors
 f ! ProcessingStarted(txnId)

Если вы решите инициировать неблокирующие запросы обработки к другим частям вашего кода, вы можете использовать onTransition, чтобы добавить код триггера по мере необходимости. Вы также можете изменить классы дел MyEvents на другое время. Именование события использовалось выше, чтобы показать, что что-то еще было ответственно за его срабатывание (например, актер А получил начальное сообщение, чтобы сделать некоторые вещи).

Также обратите внимание на возможности контроля Akka, которые могут использоваться здесь для наблюдения за рассматриваемыми актерами.

Для получения более подробной информации прочитайте следующее, что может помочь в построении вашего FSM, его тестировании, использовании неблокирующих методов для запуска длительной обработки извне. Все это может быть полезно для ваших нужд:

person Susan Potter    schedule 30.09.2013
comment
Спасибо за ваш подробный ответ. Кажется, это очень похоже на мое текущее решение, которое я описал как первое альтернативное решение. В этом FSM создает рабочего (который существует только для обработки этого запроса и использует отдельный диспетчер, как рекомендовано на связанной странице), и FSM контролирует его жизненный цикл. Есть ли у вашего решения (конечный автомат — управляемый актор) какие-то преимущества по сравнению с этим? - person Sandor Murakozi; 01.10.2013
comment
На мой взгляд, внешний участник в моем предложенном решении может быть более крупнозернистым и очень легко управлять жизненными циклами более мелкозернистых FSM. например внешний субъект, который получает внешнее сообщение, может принимать сообщения системного уровня, но управлять отдельными FSM для каждого пользователя или заказа (более детальный контекст). Альтернатива, которую я предложил в своем решении, является вашей альтернативой: актор FSM, который порождает/управляет неблокирующими частями обработки (которые могут быть акторами или нет). Решение, которое вы выберете, зависит от ваших потребностей, которые не были ясно указаны в вопросе. HTH. - person Susan Potter; 01.10.2013
comment
Важной частью приведенного выше решения является список соображений и ссылка на другой ответ о том, когда актеры могут блокировать. Это только вопросы, на которые вы можете ответить сами, но которые не были ясно изложены в исходном вопросе. Набросок кода - это просто. Я предложил решение, не зная ваших потребностей в демонстрации FSM в Akka, но, как упоминалось выше, может потребоваться некоторая настройка, чтобы соответствовать вашим реальным потребностям, как вы их понимаете. - person Susan Potter; 01.10.2013