Реактивный R2DBC для простых вызовов RESTful

Поскольку R2DBC является реактивным и неблокирующим, я хотел бы понять преимущества использования R2DBC в простой службе RESTful CRUD.

Предположим, что приложение весенней загрузки предоставляет службу RESTful, используя репозиторий ниже.

public interface CustomerRepository extends ReactiveCrudRepository<Customer, Long> {

    @Query("SELECT * FROM customer WHERE last_name = :lastname")
    Flux<Customer> findByLastName(String lastName);

}

Этот репозиторий вызывается из службы, и результаты должны быть преобразованы в службе перед возвратом в контроллер.

Flux<Customer> customers = repository.findAll();

Чтобы получить доступ к полному списку клиентов, мне нужно вызвать blockLast() на Flux, что делает его блокирующим и лишает цели использования реактивных компонентов.

Означает ли это, что в этом простом примере нет никакой пользы от использования R2DBC? Я что-то упускаю ?

Можно ли использовать Flux только для асинхронной подписки, когда обработка коллекций Flux происходит в другом потоке?


person lives    schedule 09.07.2021    source источник


Ответы (2)


  1. Поскольку метод повторно настраивает Flux, он возвращает обещание, которое может завершиться когда-нибудь в будущем.
  2. Поток, вызывающий этот метод, не должен блокироваться — в этом весь смысл реактивности. Если вы используете .blockLast(), вы сделаете блок вызывающего потока. Как сказал Майкл в этом ответе, это следует делать только тогда, когда вы объединяете реактивный и нереактивный код вместе.
  3. Вот диаграмма, которая объясняет, как async. вычисления рабочие (реактивные, CompletableFuture и т.д.). Надеюсь, это развеет ваши сомнения;

введите здесь описание изображения

Как вы можете видеть на диаграмме, если вы делаете .blockLast(), вы теряете настоящую неблокировку. В идеале вызывающий поток должен немедленно освободиться, чтобы он мог выполнять другую работу.

  1. Правильно написанное реактивное приложение будет представлять собой единую цепочку с подпиской, происходящей на самом верхнем уровне. Все приложение должно быть асинхронным конвейером. Вы должны использовать такие приложения, как BlockHound, чтобы проверить, блокируются ли ваши потоки где-либо.

Можно ли использовать Flux только для асинхронной подписки, когда обработка коллекций Flux происходит в другом потоке? - Flux - это не что иное, как обещание, в котором говорится, что он может закончиться когда-то в будущем. Как показано на диаграмме, обратные вызовы будут выполняться в потоке, в котором работает async. вычисление завершено. Вы можете переключить эту тему, используя .publishOn().

person Prashant Pandey    schedule 12.07.2021

Чтобы получить доступ к полному списку клиентов, мне нужно вызвать blockLast() для Flux, что делает его блокирующим и лишает цели использования реактивных компонентов.

Вам нужно вызвать blockLast() только в том случае, если вы действительно хотите получить ссылку на List<Customer>, но, как вы говорите, если вы это сделаете, вы потеряете все реактивные преимущества. (Единственный случай, когда вам действительно следует подумать о блокировке IMHO, - это если вы переходите на реактивную систему и устанавливаете реактивные библиотеки, но еще не готовы сделать всю систему реактивной.)

Если вы просто хотите сразу получить доступ к полному списку клиентов, вы можете разумно позвонить collectList(), чтобы получить Mono<List<Customer>. Таким образом, вы остаетесь в реактивном контексте, но в любом реактивном операторе вам доступен весь список.

Единственное, на что следует обращать внимание, — это объем памяти: если вы просто обрабатываете Flux<Customer> как есть, вам никогда не нужно хранить их группу в памяти, поэтому на самом деле не имеет значения, сколько их (можно даже быть бесконечным.) Если вы сначала соберете его в список, тогда все эти клиенты должны быть сохранены в памяти одновременно.

Является ли это проблемой, зависит от вашего варианта использования. Если вы говорите о 10 или около того клиентах, вообще никаких проблем. Если вы говорите о миллиардах, то, скорее всего, это не будет разумным решением.

person Michael Berry    schedule 09.07.2021
comment
Спасибо Михаил за подробное разъяснение. Есть ли какая-либо польза от использования Mono‹List‹Customer›› в приведенном выше примере, кроме того факта, что код может оставаться в реактивном контексте? - person lives; 09.07.2021
comment
Соглашаться. Если есть миллион записей, то использование Flux является правильным вариантом, поскольку записи можно обрабатывать по мере их доступности, вместо того, чтобы ждать, пока будет построен огромный список. Это поможет эффективно использовать память. Но я проверил этот пример и обнаружил, что потребитель в этом случае обрабатывает в том же основном потоке. Это так же хорошо, как сценарий блокировки. Я разместил пример кода здесь. stackoverflow.com/questions/68320273/ Дайте мне знать, если я неправильно понимаю - person lives; 09.07.2021
comment
@lives Я не уверен, что понимаю вашу точку зрения - это определенно не эквивалентно решению блокировки только потому, что оно использует один поток. Обработка потока не имеет большого значения - смысл реактивного решения заключается в том, что вы можете иметь несколько неблокирующих операций, выполняемых одновременно в одном потоке, что позволяет избежать накладных расходов на переключение контекста между разными потоками. - person Michael Berry; 09.07.2021
comment
Спасибо, Майк. Не могли бы вы поделиться какой-либо ссылкой, объясняющей несколько неблокирующих операций, выполняемых одновременно в одном потоке? - person lives; 10.07.2021
comment
@lives Лучший ответ здесь был бы хорошим началом - по сути, это концепция однопоточного цикла событий: stackoverflow.com/questions/34855352/ - person Michael Berry; 10.07.2021