Получить планировщик цикла событий webflux

Я использую webflux с netty и jdbc, поэтому блокирую операцию jdbc следующим образом:

static <T> Mono<T> fromOne(Callable<T> blockingOperation) {
    return Mono.fromCallable(blockingOperation)
        .subscribeOn(jdbcScheduler)
        .publishOn(Schedulers.parallel());
}

Операция блокировки будет обрабатываться jdbcScheduler, и я хочу, чтобы другой конвейер обрабатывался планировщиком цикла событий webflux.

Как получить планировщик цикла событий webflux?


person Andrey Dorohovich    schedule 04.10.2018    source источник


Ответы (2)


Я настоятельно рекомендую пересмотреть варианты технологии. Если вы собираетесь использовать jdbc, который все еще блокирует, вам не следует использовать webflux. Это связано с тем, что webflux будет сиять в неблокирующем стеке, но в сочетании с Jdbc он будет действовать как узкое место. Производительность фактически снизится.

person Vikram Rawat    schedule 06.10.2018
comment
Спасибо, я понимаю это, из-за этого мы оборачиваем блокирующие операции так, как я описываю в вопросе. - person Andrey Dorohovich; 08.10.2018
comment
Да, но общая производительность все равно значительно снизится. - person Vikram Rawat; 08.10.2018

Я согласен с @Vikram Rawat. Использование jdbc очень опасно в основном потому, что jdbc - это блокирующий API ввода-вывода, а использование реактивной модели цикла событий очень опасно, потому что в основном очень легко заблокировать весь сервер.

Однако, даже если это экспериментальная попытка, я предлагаю вам оставаться в курсе проекта R2DBC, который может использовать API без блокировки для sql, я использовал его для всплеска, и он очень элегантен.

Я могу привести пример, взятый из моего домашнего проекта на github на основе sprign boot 2.1 и kotlin:

веб-слой

@Configuration
class ReservationRoutesConfig {

    @Bean
    fun reservationRoutes(@Value("\${baseServer:http://localhost:8080}") baseServer: String,
                          reservationRepository: ReservationRepository) =
            router {
                POST("/reservation") {
                    it.bodyToMono(ReservationRepresentation::class.java)
                            .flatMap { Mono.just(ReservationRepresentation.toDomain(reservationRepresentation = it)) }
                            .flatMap { reservationRepository.save(it).toMono() }
                            .flatMap { ServerResponse.created(URI("$baseServer/reservation/${it.reservationId}")).build() }

                }

                GET("/reservation/{reservationId}") {
                    reservationRepository.findOne(it.pathVariable("reservationId")).toMono()
                            .flatMap { Mono.just(ReservationRepresentation.toRepresentation(it)) }
                            .flatMap { ok().body(BodyInserters.fromObject(it)) }
                }

                DELETE("/reservation/{reservationId}") {
                    reservationRepository.delete(it.pathVariable("reservationId")).toMono()
                            .then(noContent().build())
                }
            }
}

уровень репозитория:

class ReactiveReservationRepository(private val databaseClient: TransactionalDatabaseClient,
                                    private val customerRepository: CustomerRepository) : ReservationRepository {

    override fun findOne(reservationId: String): Publisher<Reservation> =
            databaseClient.inTransaction {
                customerRepository.find(reservationId).toMono()
                        .flatMap { customer ->
                            it.execute().sql("SELECT * FROM reservation WHERE reservation_id=$1")
                                    .bind("$1", reservationId)
                                    .exchange()
                                    .flatMap { sqlRowMap ->
                                        sqlRowMap.extract { t, u ->
                                            Reservation(t.get("reservation_id", String::class.java)!!,
                                                    t.get("restaurant_name", String::class.java)!!,
                                                    customer, t.get("date", LocalDateTime::class.java)!!)
                                        }.one()
                                    }
                        }
            }

    override fun save(reservation: Reservation): Publisher<Reservation> =
            databaseClient.inTransaction {
                customerRepository.save(reservation.reservationId, reservation.customer).toMono()
                        .then(it.execute().sql("INSERT INTO reservation (reservation_id, restaurant_name, date) VALUES ($1, $2, $3)")
                                .bind("$1", reservation.reservationId)
                                .bind("$2", reservation.restaurantName)
                                .bind("$3", reservation.date)
                                .fetch().rowsUpdated())
            }.then(Mono.just(reservation))


    override fun delete(reservationId: String): Publisher<Void> =
            databaseClient.inTransaction {
                customerRepository.delete(reservationId).toMono()
                        .then(it.execute().sql("DELETE FROM reservation WHERE reservation_id = $1")
                                .bind("$1", reservationId)
                                .fetch().rowsUpdated())
            }.then(Mono.empty())

}

Я надеюсь это поможет тебе

person Valerio Vaudi    schedule 25.01.2019