Обратное давление между службами Lagom с использованием потоковой передачи ServiceCall не работает

Лагом: 1.5.4

Рассмотрите возможность использования ServiceCall (пример):

def stream: ServiceCall[NotUsed, Source[Int, NotUsed]] = ServiceCall { _ =>
  Future.successful(
    Source(1.to(1000)).wireTap(msg => log.info(s"sending $msg"))
  )
}

Когда другая служба (пример) использует этот ServiceCall, например:

val ticker = Source.tick(1.second, 100.millis, true)
helloWorldStreamService.stream.invoke().flatMap(_.zip(ticker).map {
  case (msg, _) =>
    log.info(s"received $msg")
    msg
}.runWith(Sink.seq))

Можно было бы ожидать, что искусственно замедляющий потребитель замедлит производителя. Глядя на журналы, кажется, что это не так:

sending 1
sending 2
sending 3
[...]
sending 1000

[1 second pause]

received 1
[100ms pause]
received 2
[100ms pause]
received 3
[...]

Я пропустил какие-либо скрытые буферы?

Пример кода:
https://github.com/an-tex/lagom-backpressure< /а>

беги sbt runAll

а затем выполнить curl 127.0.0.1:[port of hello-world-stream-client service]/api/test

увидеть эффект


person antex    schedule 15.11.2019    source источник


Ответы (1)


Системные буферы превышают тестовый размер. В Mac OS буфер размером 128 КБ (пакет 512 КБ). Вне буферов обратное давление работает как шарм. Я обновил репозиторий github, указав больший тестовый размер на случай, если кто-то захочет поиграть около.

Кредит принадлежит TimMoore, который ответил на этот вопрос на Lightbend Discuess

person antex    schedule 26.11.2019