не может сохранить документ на диванную базу, используя реактивный подход

Я пытаюсь сохранить некоторую дату, сгенерированную планировщиком, а затем прочитать их через конечную точку отдыха в потоковом режиме. У меня очень простая модель, Feed содержащая только uuid и некоторый текст. У меня также есть репозиторий, который расширяет ReactiveCouchbaseRepository<Feed, String>.

Сохранение работает, когда я использую файл CouchbaseRepository<Feed, String>. вот мой Feed:

@Document
public class Feed {
    @Id
    private String id;
    @Field
    private String text;
    ...

и репозиторий:

@N1qlPrimaryIndexed
@ViewIndexed(designDoc = "feed")
public interface FeedRepository  extends ReactiveCouchbaseRepository<Feed, String> {
}

а также планировщик:

    ....
    @Autowired
    private FeedRepository feedRepository;

    @Scheduled(fixedRate = 1000)
    public void generate(){
        feedRepository.save(new Feed(UUID.randomUUID().toString(), String.valueOf(System.currentTimeMillis())));
    ...

вот моя конечная точка:

@RestController
public class MainController {

    @Autowired
    private FeedRepository feedRepository;

    @GetMapping(value = "/get/feed/live", produces = "text/event-stream")
    public Flux<Feed> getLiveFeeds(){
        return feedRepository.findAll();
    }
}

когда я вызываю конечную точку, я получаю это:

Tue Apr 09 15:12:02 CEST 2019
There was an unexpected error (type=Internal Server Error, status=500).
View feed/all does not exist.
com.couchbase.client.java.error.ViewDoesNotExistException: View feed/all does not exist.
    at com.couchbase.client.java.view.ViewQueryResponseMapper$BuildViewResult.call(ViewQueryResponseMapper.java:211)
    at com.couchbase.client.java.view.ViewQueryResponseMapper$BuildViewResult.call(ViewQueryResponseMapper.java:185)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:77)
    at rx.internal.producers.SingleProducer.request(SingleProducer.java:65)
    at rx.Subscriber.setProducer(Subscriber.java:211)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102)
    at rx.internal.operators.OperatorSingle$ParentSubscriber.onCompleted(OperatorSingle.java:113)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.checkTerminated(OperatorObserveOn.java:281)
    at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.call(OperatorObserveOn.java:216)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxDefer] :
    reactor.core.publisher.Flux.defer(Flux.java:805)
    org.springframework.data.repository.util.ReactiveWrapperConverters$RxJava1ObservableToFluxConverter.convert(ReactiveWrapperConverters.java:682)
Error has been observed by the following operator(s):
    |_  Flux.defer ⇢ org.springframework.data.repository.util.ReactiveWrapperConverters$RxJava1ObservableToFluxConverter.convert(ReactiveWrapperConverters.java:682)
    |_  Flux.map ⇢ org.springframework.http.codec.ServerSentEventHttpMessageWriter.encode(ServerSentEventHttpMessageWriter.java:119)
    |_  Mono.doOnError ⇢ org.springframework.http.server.reactive.AbstractServerHttpResponse.writeAndFlushWith(AbstractServerHttpResponse.java:186)
    |_  Mono.flatMap ⇢ org.springframework.web.reactive.DispatcherHandler.lambda$handleResult$5(DispatcherHandler.java:175)
    |_  Mono.onErrorResume ⇢ org.springframework.web.reactive.DispatcherHandler.handleResult(DispatcherHandler.java:175)
    |_  Mono.flatMap ⇢ org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:152)
    |_  Mono.defer ⇢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)
    |_  Mono.doAfterSuccessOrError ⇢ org.springframework.boot.actuate.web.trace.reactive.HttpTraceWebFilter.filter(HttpTraceWebFilter.java:99)
    |_  Mono.flatMap ⇢ org.springframework.boot.actuate.web.trace.reactive.HttpTraceWebFilter.filter(HttpTraceWebFilter.java:82)
    |_  Mono.defer ⇢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)
    |_  Mono.defer ⇢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)
    |_  Mono.doOnSuccess ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:84)
    |_  Mono.doOnError ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:84)
    |_  Mono.compose ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter.filter(MetricsWebFilter.java:76)
    |_  Mono.defer ⇢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)

Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.java.document.json.JsonObject.class
    at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118)
    at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73)
    ... 16 more

  1. Почему дата не сохраняется при использовании реактивного репозитория
  2. Почему я не могу получить хотя бы те данные, которые уже были сохранены, используя обычный репозиторий Couchbase?

Спасибо за помощь.

РЕДАКТИРОВАТЬ 1: было три вещи, которые были неправильными/отсутствующими, чтобы сохранить:

  1. @EnableReactiveCouchbaseRepositories отсутствовал (указано в одном из ответов)
  2. Конфиг не расширял AbstractReactiveCouchbaseConfiguration, а обычный
    3.В конце вызова feedRepository.save(new Feed(UUID.randomUUID().toString(), String.valueOf(System.currentTimeMillis()))) отсутствовал .subscribe().

Теперь постоянная часть работает. Все еще есть проблема с получением.

редактировать 2

только что обнаружили это отладочное сообщение в журнале при вызове конечной точки:
2019-04-10 16:27:06.467 DEBUG 11172 --- [-computations-3] c.c.client.java.view.ViewRetryHandler : Design document not found, error is {"errors":[{"error":"not_found","reason":"Design document _design/feed not found"}


person mooor    schedule 09.04.2019    source источник
comment
Можете ли вы поделиться кодом где-нибудь? потому что все выглядит нормально для меня.   -  person deniswsrosa    schedule 09.04.2019
comment
вот вам github.com/mooras/reactiveDemo   -  person mooor    schedule 09.04.2019
comment
только что ответил, дайте мне знать, если это сработает для вас   -  person deniswsrosa    schedule 09.04.2019


Ответы (3)


Похоже, вы забыли включить реактивные репозитории с помощью аннотации @EnableReactiveCouchbaseRepositories:

@SpringBootApplication
@EnableScheduling
@EnableReactiveCouchbaseRepositories
public class ReactiveDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(ReactiveDemoApplication.class, args);
    }
}

Вы можете прочитать больше об этом здесь:

https://docs.spring.io/spring-data/couchbase/docs/current/reference/html/#couchbase.reactiverepository.usage

person deniswsrosa    schedule 09.04.2019
comment
Я добавил аннотацию, но в корзине не появилось новых элементов (есть только начальные 49, которые мне удалось сохранить с помощью обычного CouchbaseRepository) - person mooor; 09.04.2019
comment
в документе говорится, что класс конфигурации кушетки должен расширять AbstractReactiveCouchbaseConfiguration, чего раньше не было. Смена ничего не изменила. - person mooor; 10.04.2019

Похоже, в вашем интерфейсе FeedRepository отсутствует имя представления, которое используется для выполнения поиска.

Попробуйте использовать следующее имя представления:

@N1qlPrimaryIndexed
@ViewIndexed(designDoc = "feed", viewName = "all")
public interface FeedRepository  extends ReactiveCouchbaseRepository<Feed, String> {
}

Для меня это сработало после добавления имени представления.

person V Rao    schedule 19.04.2019

немного поздно, но, надеюсь, это может помочь другим. В реактивном программировании вам нужно подписаться на своих издателей, иначе поток информации не запустится. В вашем примере feedRepository.save(***) никогда не выполняется, потому что вам нужно что-то вроде feedRepository.save(**).subscribe().

Что касается второго вопроса, поскольку вы запрашиваете все элементы, вам нужно указать представление, например {item}/all, для более новых версий Couchbase вы можете указать PrimaryKey в качестве обязательного индекса для получения всех элементов.

person Hernán Tenjo    schedule 06.07.2020