RxJava: сохранить частичный список объектов, которые были вставлены в базу данных couchbase до того, как было сгенерировано исключение

 Observable
        .from(couchbaseDocuments)
        .subscribeOn(Schedulers.io())
        .flatMap(docToInsert->asyncBucket.insert(docToInsert))
        .retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
        .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
        .toBlocking()
        .forEach(id->insertedIds.add(id));

Req:

  1. Массовая вставка документов в диван определенного типа.
  2. Создайте еще один документ, в котором будут только идентификаторы документов, которые мы вставили на первом шаге.
  3. Если первые шаги завершаются неудачно на каком-то идентификаторе, который мы перестаем вставлять, тогда второй документ должен содержать только те идентификаторы, которые были вставлены до возникновения исключения.
  4. Звонок синхронный

Я новичок в RxJava. Я написал приведенный выше код реакции, но мне кажется, что я не совсем понял некоторые концепции. Моя идея заключалась в том, что forEach в конце всегда будет получать излучаемый элемент, и если произойдет исключение, я поймаю его, а затем позже использую список вставленныхIds для создания вторых документов. Однако в списке всегда есть все идентификаторы, которые не соответствуют моим требованиям.

Может ли кто-нибудь объяснить, что не так с кодом и как я могу достичь вышеуказанных требований?


person humbleCoder    schedule 23.07.2018    source источник


Ответы (1)


методы retry повторно подпишутся на восходящий поток Observable.

В вашем случае это означает подписку на couchbaseDocuments и потенциальную попытку повторно вставить уже успешно вставленные документы.

Вместо того, чтобы повторять попытку всего потока снова, вы можете просто повторить попытку вставки, которая не удалась:

 Observable
        .from(couchbaseDocuments)
        .subscribeOn(Schedulers.io())
        .flatMap(docToInsert->asyncBucket.insert(docToInsert).retryWhen(...))
        .map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
        .toBlocking()
        .forEach(id->insertedIds.add(id));

По сути: вам нужно было переместить одинарную скобку.

person Andrew Gallasch    schedule 23.07.2018
comment
Спасибо, думаю, работает. Еще один вопрос вместо forEach, если я воспользуюсь подпиской, будет ли это иметь такой же эффект? С помощью подписки я также получаю метод onError, который может обрабатывать исключения. - person humbleCoder; 24.07.2018
comment
да. Более идиоматично использовать подписку. Если вы посмотрите на источник, forEach вызовет подписку. Ваш retryWhen поглотит любую ошибку, которую он не выдает повторно. Да, вы можете обрабатывать неперехваченные исключения в onError, однако вы не увидите никаких, которые обрабатываются в retryWhen, только необработанные исключения, например из самого кода повтора, перебрасывающего, convertToJava или других исключений. - person Andrew Gallasch; 24.07.2018
comment
Хорошо, я только что проверил, что подписка на BlockingObservable помечена как @Beta. В чем причина этого? - person humbleCoder; 25.07.2018
comment
Обновите RxJava до последней версии 1, а лучше 2. - person Andrew Gallasch; 25.07.2018