Совокупность сообщений актера Akka

Следующий актер выполняет деление после того, как получены и числитель, и знаменатель:

package funnelTest

import akka.actor.{Actor, ActorSystem, Props}

object Main extends App {

  val system1 = ActorSystem("funnelTest")
  val input  = system1.actorOf(Props[Funnel], "input")

  input ! 3
  input ! 2.718

}

case object Run

class Funnel extends Actor {

  var i: Option[Int] = None
  var d: Option[Double] = None

  def isReady = i.nonEmpty && d.nonEmpty

  def receive = {
    case v: Int    => i = Some(v) ; if (isReady) self ! Run
    case v: Double => d = Some(v) ; if (isReady) self ! Run
    case Run       => println(s"aggregated, $d / $i = " + d.get/i.get)
    case _         =>  
  }
}

Есть ли более масштабируемый способ агрегирования всех сообщений?


person echo    schedule 30.01.2017    source источник
comment
почему вы думаете, что ваша реализация не масштабируема? (его можно немного улучшить, замените ! Run вызовом нового метода run(), который делает то, что сейчас делает case Run).   -  person Alexei Kaigorodov    schedule 30.01.2017


Ответы (1)


Уникальный идентификатор, идентифицирующий запрос, является одним из способов решения проблемы. Карта (calcRegistry) внутри Актера содержит ранее прибывшие FractionComponent (либо Numerator, либо Denominator). Как только появится вторая часть пары, мы сможем начать расчет, как вы уже сделали.

Реализация по-прежнему не решает проблему утечки памяти, из-за которой вторая пара не будет получена, а карта будет продолжать расти.

import akka.actor.{Actor, ActorSystem, Props}

object Main extends App {

  import Funnel._

  val system1 = ActorSystem("funnelTest")
  val input = system1.actorOf(Props[Funnel], "input")

  (1 to 10) foreach { number =>

    val id = java.util.UUID.randomUUID().toString
    input ! Numerator(id, value = number + 2)
    input ! Denominator(id, value = number + 1)
  }

  system1.awaitTermination()

}

class Funnel extends Actor {

  import Funnel._
  import scala.collection._

  val calcRegistry = mutable.Map[String, FractionComponent]()

  def saveToRegistry(comp: FractionComponent) = calcRegistry(comp.id) = comp

  def printValue(num: Numerator, den: Denominator) = println(s"aggregated, ${num.value} / ${den.value} = ${num.value / den.value}")

  def receive = {
    case num@Numerator(id, _) =>
      if (calcRegistry contains id)
        self ! Run(num, calcRegistry(id).asInstanceOf[Denominator])
      else saveToRegistry(num)
    case den@Denominator(id, _) =>
      if (calcRegistry contains id)
        self ! Run(calcRegistry(id).asInstanceOf[Numerator], den)
      else saveToRegistry(den)
    case Run(num: Numerator, den: Denominator) =>
      calcRegistry.remove(num.id)
      printValue(num, den)
    case _ =>
  }
}

object Funnel {

  sealed trait FractionComponent {
    def id: String
  }

  case class Numerator(override val id: String, value: Double) extends FractionComponent

  case class Denominator(override val id: String, value: Integer) extends FractionComponent

  case class Run(num: Numerator, denominator: Denominator)

}

Пример вывода:

aggregated, 3.0 / 2 = 1.5
aggregated, 4.0 / 3 = 1.3333333333333333
aggregated, 5.0 / 4 = 1.25
aggregated, 6.0 / 5 = 1.2
aggregated, 7.0 / 6 = 1.1666666666666667
aggregated, 8.0 / 7 = 1.1428571428571428
aggregated, 9.0 / 8 = 1.125
aggregated, 10.0 / 9 = 1.1111111111111112
aggregated, 11.0 / 10 = 1.1
aggregated, 12.0 / 11 = 1.0909090909090908

Ссылка: шаблоны реактивного обмена сообщениями с акторной моделью

person Arun Manivannan    schedule 30.01.2017