Не могли бы вы объяснить, что именно происходит во Flux/Mono, возвращаемом HttpClient.response()
? Я думал, что значение, сгенерированное http-клиентом, НЕ будет передаваться вниз по течению до тех пор, пока Mono не завершится, но я вижу, что генерируются тонны запросов, которые заканчиваются исключением reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8
. Он работает, как и ожидалось (элементы обрабатываются один за другим), если я заменяю вызов testRequest()
на Mono.fromCallable { }
.
Что мне не хватает?
Тестовый код:
import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider
class Test {
private val client = HttpClient.create(ConnectionProvider.create("meh", 4))
fun main() {
Flux.fromIterable(0..99)
.flatMap { obj ->
println("Creating request for: $obj")
testRequest()
.doOnError { ex ->
println("Failed request for: $obj")
ex.printStackTrace()
}
.map { res ->
obj to res
}
}
.doOnNext { (obj, res) ->
println("Created request for: $obj ${res.length} characters")
}
.collectList().block()!!
}
fun testRequest(): Mono<String> {
return client.get()
.uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
.responseContent()
.reduce(StringBuilder(), { sb, buf ->
val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
sb.append(str)
})
.map { it.toString() }
}
}