одноадресная передача в платформе Play и SSE (scala): как узнать, в какой поток отправлять?

в моем приложении перечислены хосты, и этот список является динамическим и изменяющимся. он основан на акторах Akka и событиях, отправленных сервером. когда подключается новый клиент, ему необходимо отобразить текущий список. но я не хочу отправлять список всем клиентам каждый раз, когда подключается новый. Итак, последовал примеру эластичного поиска в реальном времени и эмулировал одноадресную рассылку, создав (Enumerator, Channel) для Connect () и присвоив ему UUID. когда мне нужно транслировать, я буду отображать все и обновлять их, чтобы иметь возможность делать одноадресную рассылку клиентам (а их должно быть очень мало).

Моя проблема в том, как мне получить новому клиенту его UUID, чтобы он мог его использовать? поток, который я ищу: - клиент запрашивает EventStream - сервер создает новый (перечислитель, канал) с UUID и возвращает перечислитель и UUID клиенту - клиент запрашивает таблицу, используя uuid - сервер отправляет таблицу только на канал, соответствующий uuid

Итак, как клиент узнает об UUID? если бы это был веб-сокет, отправка запроса должна была бы дать желаемый результат, так как он достиг бы своего собственного канала. но в SSE клиент → сервер выполняется на другом канале. какие решения для этого?

фрагменты кода:

case class Connected(uuid: UUID, enumerator: Enumerator[ JsValue ] )

trait MyActor extends Actor{
  var channelMap = new HashMap[UUID,(Enumerator[JsValue], Channel[JsValue])]

  def connect() = {
    val con = Concurrent.broadcast[JsValue]
    val uuid = UUID.randomUUID()
    channelMap += (uuid -> con)
    Connected(uuid, con._1)
  }
...
}

object HostsActor extends MyActor {
...
  override def receive = {
    case Connect => {
      sender ! connect
    }
...
}

object Actors {
  def hostsStream = {
    getStream(getActor("hosts", Props (HostsActor)))
  }

  def getActor(actorPath: String, actorProps : Props): Future[ActorRef] = {
    /* some regular code to create a new actor if the path does not exist, or return the existing one else */
  }


  def getStream(far: Future[ActorRef]) = {
    far flatMap {ar =>
      (ar ? Connect).mapTo[Connected].map { stream =>
        stream
      }
    }
  }
...
}

object AppController extends Controller {
  def getHostsStream = Action.async {
    Actors.hostsStream map { ac =>
************************************
**  how do i use the UUID here??  **
************************************
      Ok.feed(ac.enumerator &> EventSource()).as("text/event-stream")
    }
  }

person Ehud Kaldor    schedule 16.06.2014    source источник


Ответы (1)


Мне удалось решить эту проблему, асинхронно нажав uuid после возврата канала, с некоторым промежутком времени между ними:

  override def receive = {
    case Connect => {
      val con = connect()
      sender ! con
      import scala.concurrent.ExecutionContext.Implicits.global 
      context.system.scheduler.scheduleOnce(0.1 seconds){
        unicast(
          con.uuid, 
          JsObject (
            Seq (
              "uuid" -> JsString(con.uuid.toString)   
            )    
          ) 
        )
      }
    }

это достигло своей цели - клиент получил UUID и смог кэшировать и использовать его для отправки getHostsList на сервер:

  @stream = new EventSource("/streams/hosts")
  @stream.addEventListener "message", (event) =>
    data = JSON.parse(event.data)
    if data.uuid
      @uuid = data.uuid
      $.ajax
        type: 'POST',
        url: "/streams/hosts/" + @uuid + "/sendlist"
        success: (data) ->
          console.log("sent hosts request to server successfully")
        error: () ->
          console.log("failed sending hosts request to server")
    else
      ****************************
      *                          *
      *  handle parsing hosts    *
      *                          *
      *                          *
      ****************************
      @view.render()

пока это работает, я должен сказать, что мне это не нравится. введение искусственной задержки, чтобы клиент мог получить канал и начать прослушивание (я пытался без задержки, и клиент не получил uuid) опасно, так как он все равно может пропустить, если система станет более загруженной, но делает это слишком долго вредит аспекту реактивности.

Если у кого-то есть решение, в котором это можно сделать синхронно - с возвращением uuid как части исходного запроса eventSource - я был бы более чем счастлив понизить свое решение.

person Ehud Kaldor    schedule 19.06.2014