Отправка ответа за пределы системы актеров akka

У меня есть приложение play (2.4.2 с akka 2.4.18), в котором я использую актеров akka для загрузки файла. У меня есть родительский супервизор Актер с такой иерархией

UploadSupervisor ---child---> UploadActor ---child--->
DataWriteActor & MetaWriteActor

Листовые акторы MetaWriteActor и DataWriteActor выполняют фактическую запись. Очень упрощенная версия моего кода приведена ниже:

Сначала у меня есть супервизор актера:

class UploadSupervisor extends Actor {
  val uploadActor = context.actorOf(Props(new UploadActor), "UploadActor") 
 override def supervisorStrategy = OneForOneStrategy() {
    case _: Throwable => Restart
 }

override def receive: Receive = {
  case data: Data => uploadActor ! data
  case meta: MetaInfo => uploadActor ! meta
  //How do I send response outside of actor system?
  case dataSuccess: DataUploadResponse => ??? //Line 10
  case metaSuccess: MetaUploadResponse => ??? //Line 11

}

object UploadSupervisor {
  val uploadSupervisor = Akka.system
    .actorOf(Props(new UploadSupervisor), "UploadSupervisor")
}
//Request & Response case classes
case class Data(content: String)
case class MetaInfo(id: String, createdDate: Timestamp)

case class DataUploadResponse(location: String)
case class MetaUploadResponse(location: String)

ЗагрузитьАктер:-

class UploadActor extends Actor {  
val dataWriteActor = context.actorOf(Props(new DataWriteActor), "dataWriteActor")  
val metaWriteActor = context.actorOf(Props(new MetaWriteActor), "UploadActor")

override def receive = {   
case data: Data => dataWriteActor ! data   
case meta: MetaInfo => metaWriteActor ! meta   
case dataResp: DataUploadResponse => context.parent ! dataResp   
case metaResp: MetaUploadResponse => context.parent ! metaResp 

 }
}

Актер записи данных:

class DataWriteActor extends Actor {
  case data: Data => //Do the writing 
                     println("data write completed")
                     sender() ! DataUploadResponse("someLocation")  

}

MetaWriteActor

class MetaWriteActor extends Actor {
  case meta: MetaInfo=> //Do the writing 
                     println(" meta info writing completed")
                     sender() ! MetaUploadResponse("someOtherLocation")  

}

Где-то за пределами системы Актера: -

implicit val timeout = Timeout(10 seconds)
val f1 = UploadSupervisor.uploadSupervisor ? Data("Hello Akka").mapTo(implicitly[scala.reflect.ClassTag[DataUploadResponse]])

val f2 = UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime).mapTo(implicitly[scala.reflect.ClassTag[MetaUploadResponse]])

//Do something with futures

Вопрос в том, как отправить ответ вне системы акторов? Потому что в строках 10 и 11 я не могу использовать sender ! msg, поскольку текущим отправителем является UploadActor.


person Aiden    schedule 13.08.2017    source источник
comment
Вопрос в том, как отправить ответ вне системы акторов? что вы имеете в виду под внешней системой? Одним из способов было бы использовать промежуточное ПО для обмена сообщениями и отправить туда сообщение от вашего актера. Потребитель подписывается на него и потребляет оттуда. Akka также использует аналогичную концепцию, но, насколько я знаю, она ограничена только актерами.   -  person Imran    schedule 13.08.2017
comment
@Imran Я имею в виду от UploadSupervisor (строки 10 и 11) к основному потоку, где я использую? (спросить), чтобы получить значения   -  person Aiden    schedule 13.08.2017
comment
@Imran Не могли бы вы предложить одно такое промежуточное ПО для обмена сообщениями, которое я могу изучить? Кроме того, предполагая, что я не планирую создавать несколько экземпляров этих актеров. Как вы относитесь к созданию синглтона UploadActor (класс-объект). Затем я могу позвонить UploadActor.dataWriteActor ? Data("Hello Akka") и т. д.   -  person Aiden    schedule 13.08.2017


Ответы (1)


Вы можете сохранить UploadSupervisor ссылок на первоначальных отправителей:

class UploadSupervisor extends Actor {
  val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")

  override val supervisorStrategy = OneForOneStrategy() {
    case _ => Restart
  }

  var dataSender: Option[ActorRef] = None
  var metaSender: Option[ActorRef] = None

  def receive = {
    case data: Data =>
      val s = sender
      dataSender = Option(s)
      uploadActor ! data
    case meta: MetaInfo =>
      val s = sender
      metaSender = Option(s)
      uploadActor ! meta
    case dataSuccess: DataUploadResponse =>
      dataSender.foreach(_ ! dataSuccess)
    case metaSuccess: MetaUploadResponse =>
      metaSender.foreach(_ ! metaSuccess)
  }
}

Чтобы отправить сообщение UploadSupervisor:

implicit val timeout = Timeout(10 seconds)

val f1 = (UploadSupervisor.uploadSupervisor ? Data("Hello Akka")).mapTo[DataUploadResponse]

val f2 = (UploadSupervisor.uploadSupervisor ? MetaInfo("1234", new Timestamp(new Date().getTime)).mapTo[MetaUploadResponse]

Вышеприведенное предполагает, что вы одновременно отправляете одно сообщение Data и одно сообщение MetaInfo пользователю UploadSupervisor. Этот подход не работает, если вы отправляете несколько сообщений Data и MetaInfo и ожидаете одновременных ответов. Более общее решение состоит в том, чтобы включить ссылку на исходного отправителя в дополнительные классы дел, которые обертывают ваши существующие классы дел, передавая эту ссылку через вашу иерархию акторов:

case class DataMsg(data: Data, target: ActorRef)
case class MetaInfoMsg(metaInfo: MetaInfo, target: ActorRef)

case class DataUploadMsg(response: DataUploadResponse, target: ActorRef)
case class MetaUploadMsg(response: MetaUploadResponse, target: ActorRef)

class UploadSupervisor extends Actor {
  val uploadActor = context.actorOf(Props[UploadActor], "UploadActor")

  override val supervisorStrategy = OneForOneStrategy() {
    case _ => Restart
  }

  def receive = {
    case data: Data =>
      val s = sender
      uploadActor ! DataMsg(data, s)
    case meta: MetaInfo =>
      val s = sender
      uploadActor ! MetaInfoMsg(meta, s)
    case DataUploadMsg(response, target) =>
      target ! response
    case MetaUploadMsg(response, target) =>
      target ! response
  }
}

UploadActor:

class UploadActor extends Actor {  
  val dataWriteActor = context.actorOf(Props[DataWriteActor], "dataWriteActor")  
  val metaWriteActor = context.actorOf(Props[MetaWriteActor], "UploadActor")

  def receive = {   
    case data: DataMsg => dataWriteActor ! data   
    case meta: MetaInfoMsg => metaWriteActor ! meta   
    case dataResp: DataUploadMsg => context.parent ! dataResp   
    case metaResp: MetaUploadMsg => context.parent ! metaResp 
  }
}

Сценаристы:

class DataWriteActor extends Actor {
  def receive = {
    case DataMsg(data, target) =>
      // do the writing 
      println("data write completed")
      sender ! DataUploadMsg(DataUploadResponse("someLocation"), target)
  }
}

class MetaWriteActor extends Actor {
  def receive = {
    case MetaInfoMsg(meta, target) =>
      // do the writing 
      println("meta info writing completed")
      sender ! MetaUploadMsg(MetaUploadResponse("someOtherLocation"), target) 
  }
}
person Jeffrey Chung    schedule 13.08.2017
comment
Второй подход понравился больше. Хотя это заставляет меня создать пару дополнительных классов case, но я бы не прочь это сделать. Спасибо за помощь. Принятие ответа. - person Aiden; 14.08.2017