Почему Flux.flatMap() не ждет завершения работы внутреннего издателя?

Не могли бы вы объяснить, что именно происходит во Flux/Mono, возвращаемом HttpClient.response() ? Я думал, что значение, сгенерированное http-клиентом, НЕ будет передаваться вниз по течению до тех пор, пока Mono не завершится, но я вижу, что генерируются тонны запросов, которые заканчиваются исключением reactor.netty.internal.shaded.reactor.pool.PoolAcquirePendingLimitException: Pending acquire queue has reached its maximum size of 8. Он работает, как и ожидалось (элементы обрабатываются один за другим), если я заменяю вызов testRequest() на Mono.fromCallable { }.

Что мне не хватает?

Тестовый код:

import org.asynchttpclient.netty.util.ByteBufUtils
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.netty.http.client.HttpClient
import reactor.netty.resources.ConnectionProvider

class Test {
    private val client = HttpClient.create(ConnectionProvider.create("meh", 4))

    fun main() {
        Flux.fromIterable(0..99)
                .flatMap { obj ->
                    println("Creating request for: $obj")
                    testRequest()
                            .doOnError { ex ->
                                println("Failed request for: $obj")
                                ex.printStackTrace()
                            }
                            .map { res ->
                                obj to res
                            }
                }
                .doOnNext { (obj, res) ->
                    println("Created request for: $obj ${res.length} characters")
                }
                .collectList().block()!!
    }

    fun testRequest(): Mono<String> {
        return client.get()
                .uri("https://projectreactor.io/docs/netty/release/reference/index.html#_connection_pool")
                .responseContent()
                .reduce(StringBuilder(), { sb, buf ->
                    val str= ByteBufUtils.byteBuf2String(Charsets.UTF_8, buf)
                    sb.append(str)
                })
                .map { it.toString() }
    }
}

person expert    schedule 15.08.2020    source источник


Ответы (1)


Когда вы создаете ConnectionProvider как этот ConnectionProvider.create("meh", 4), это означает пул соединений с максимальным количеством соединений 4 и максимальным количеством ожидающих запросов 8. См. здесь подробнее об этом.

Когда вы используете flatMap, это означает Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux through merging, which allow them to interleave См. здесь подробнее об этом.

Итак, что происходит, так это то, что вы пытаетесь выполнить все запросы одновременно.

Итак, у вас есть два варианта:

  • Если вы хотите использовать flatMap, увеличьте количество ожидающих запросов.
  • Если вы хотите сохранить количество ожидающих запросов, вы можете, например, использовать concatMap вместо flatMap, что означает Transform the elements emitted by this Flux asynchronously into Publishers, then flatten these inner publishers into a single Flux, sequentially and preserving order using concatenation. Подробнее здесь об этом.
person Violeta Georgieva    schedule 15.08.2020