Хранение клиентских сессий на сервере akka-http

Как в службе akka-http кэшировать некоторую информацию для каждого сеанса клиента? Это не совсем очевидно в документах. Например, я хотел бы создать актера для каждого соединения.

Где я должен создать актера и как мне получить ссылку на него внутри моих стадий?

Моя служба связана примерно так:

  val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
    Http().bind(interface = bindAddress, port = bindPort)

  val bindingFuture: Future[Http.ServerBinding] =
    serverSource
      .to(Sink.foreach { connection => 
        connection.handleWithSyncHandler (requestHandler)
        // seems like I should set up some session state storage here,
        // such as my actor      
      })
      .run()

...

и позже:

  val packetProcessor: Flow[A, B, Unit] =  Flow[A]
    .map {
      case Something =>
        // can i use the actor here, or access my session state?
    }

Я подозреваю, что, возможно, неверно истолковываю всю парадигму, пытаясь подогнать ее. Я не могу сказать, есть ли что-то встроенное или сколько мне нужно реализовать вручную.


person Will I Am    schedule 26.01.2016    source источник


Ответы (2)


Я нашел Agent очень удобным механизмом для одновременного кэширования. .

Скажем, например, вы хотите сохранить работающими Set всех удаленных адресов, к которым вы были подключены. Вы можете настроить агент для хранения значений и Flow для записи в кеш:

import scala.concurrent.ExecutionContext.Implicits.global
import akka.agent.Agent

import scala.collection.immutable

val addressCache = Agent(immutable.Set.empty[java.net.InetSocketAddress])

import akka.stream.scaladsl.Flow

val cacheAddressFlow = Flow[IncomingConnection] map { conn =>
  addressCache send (_ + conn.remoteAddress) //updates the cache
  conn //forwards the connection to the rest of the stream
}

Затем этот поток можно сделать частью вашего потока:

val bindingFuture: Future[Http.ServerBinding] =
  serverSource.via(cacheAddressFlow)
              .to(Sink.foreach { connection => 
    connection.handleWithSyncHandler (requestHandler)
  })
  .run()

Затем вы можете «запросить» кеш полностью вне логики привязки:

def somewhereElseInTheCode = {
  val currentAddressSet = addressCache.get

  println(s"address count so far: ${currentAddressSet.size}")
}

Если ваша цель — отправить все значения IncomingConnection в Actor для обработки, это можно сделать с помощью Sink.actorRef:

object ConnectionStreamTerminated

class ConnectionActor extends Actor {
  override def receive = {
    case conn : IncomingConnection => ???
    case ConnectionStreamTerminated => ???
  }
}

val actorRef = actorSystem actorOf Props[ConnectionActor]

val actorSink = 
  Sink.actorRef[IncomingConnection](actorRef, ConnectionStreamTerminated)

val bindingFuture: Future[Http.ServerBinding] =
  serverSource.runWith(actorSink)
person Ramón J Romero y Vigil    schedule 26.01.2016
comment
Так что, по сути, свернуть мой собственный кеш для всей службы? Похоже, в Akka-HTTP чего-то не хватает. Я надеялся, что смогу добраться до какого-нибудь актера, определяющего связь, и начать оттуда. Возможно, мне придется пойти с подходом, который вы предлагаете, но сейчас, вероятно, гораздо больше кода для управления кешем. :( Спасибо! - person Will I Am; 26.01.2016
comment
@WillIAm Я добавил к своему ответу предложение, которое, похоже, больше соответствует тому, что вы ищете ... - person Ramón J Romero y Vigil; 26.01.2016
comment
Спасибо, похоже, что я действительно хочу. Однако я пытаюсь выяснить, не отправляет ли это все потоки на один ActorRef? Это потенциально может взорвать кеш Актера или, по крайней мере, замедлить все потоки. Может быть, я упускаю суть моего первоначального вопроса - как создать ConnectionActor для каждого соединения (или мне нужно вернуться к сворачиванию кеша?). - person Will I Am; 26.01.2016
comment
@WillIAm Добро пожаловать. Да, это отправляет все IncomingConnection одному и тому же Актеру. Но обычно это нормально, потому что вы обычно делаете что-то вроде conn.handleWith(...), которое материализуется в отдельный поток. Следовательно, ваш Актер на самом деле не делает много работы, кроме материализации других Актеров. Помните, что есть потоки двух уровней: (1) источник соединений, (2) источник HttpRequest для каждого соединения... - person Ramón J Romero y Vigil; 26.01.2016

По той причине, что предлагаемые агенты устарели. Я бы предложил использовать akka-http-session. Это гарантирует, что данные сеанса безопасны и не могут быть подделаны.

person Epicurist    schedule 19.02.2018