Лагом: 1.5.4
Рассмотрите возможность использования ServiceCall (пример):
def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
Future.successful(
Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
)
}
Когда другая служба (пример) использует этот ServiceCall, например:
val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
case (msg, _) =>
log.info(s"received $msg")
msg
}.runWith(Sink.seq))
Можно было бы ожидать, что искусственно замедляющий потребитель замедлит производителя. Глядя на журналы, кажется, что это не так:
sending 1
sending 2
sending 3
[...]
sending 1000
[1 second pause]
received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]
Я пропустил какие-либо скрытые буферы?
Пример кода:
https://github.com/an-tex/lagom-backpressure< /а>
беги sbt runAll
а затем выполнить curl 127.0.0.1:[port of hello-world-stream-client service]/api/test
увидеть эффект