Непрерывное потребление REST с использованием потока Akka и http

Я пытаюсь создать систему на основе akka, которая будет периодически (каждые 15 секунд) отправлять запрос REST, выполнять некоторую фильтрацию, некоторую очистку данных и проверку полученных данных и сохранять их в HDFS.

Ниже приведен код, который я написал.

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Sink, Source}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import akka.actor.Props
import akka.event.Logging
import akka.actor.Actor

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import akka.http.scaladsl.client.RequestBuilding._

/**
  * Created by rabbanerjee on 4/6/2017.
  */
class MyActor extends Actor {
  val log = Logging(context.system, this)

  import scala.concurrent.ExecutionContext.Implicits.global
  def receive = {
    case j:HttpResponse => log.info("received" +j)
    case k:AnyRef      => log.info("received unknown message"+k)
  }
}

object STest extends App{

  implicit val system = ActorSystem("Sys")
  import system.dispatcher

  implicit val materializer = ActorMaterializer()

  val ss = system.actorOf(Props[MyActor])
  val httpClient = Http().outgoingConnection(host = "rest_server.com", port = 8080)
  val filterSuccess = Flow[HttpResponse].filter(_.status.isSuccess())

    val runnnn = Source.tick(
            FiniteDuration(1,TimeUnit.SECONDS),
            FiniteDuration(15,TimeUnit.SECONDS),
            Get("/"))
        .via(httpClient)
        .via(filterSuccess)
        .to(Sink.actorRef(ss,onCompleteMessage = "done"))

  runnnn.run()
} 

Проблема, с которой я сейчас сталкиваюсь, заключается в том,

Несмотря на то, что я использовал повтор/галочку с источником, я могу увидеть результат только один раз. Это не повторяющийся запрос.

Я также пытаюсь сгруппировать результат, скажем, 50 таких запросов, потому что, поскольку я буду писать его в hadoop, я не могу писать каждый запрос, так как он будет заливать HDFS несколькими файлами.


person RBanerjee    schedule 06.04.2017    source источник


Ответы (1)


Вы не потребляете ответы, полученные от HTTP-вызова. Обязательно использовать байты объектов, возвращаемые Akka HTTP, даже если они вам не интересны.

Подробнее об этом можно узнать в документы.

В вашем примере, поскольку вы не используете объект ответа, вы можете просто отбросить его байты. См. пример ниже:

val runnnn = Source.tick(FiniteDuration(1,TimeUnit.SECONDS),FiniteDuration(15,TimeUnit.SECONDS),Get("/"))
    .via(httpClient)
    .map{resp => resp.discardEntityBytes(); resp}
    .via(filterSuccess)
    .to(Sink.actorRef(ss,onCompleteMessage = "done"))
person Stefano Bonetti    schedule 06.04.2017