Observable
.from(couchbaseDocuments)
.subscribeOn(Schedulers.io())
.flatMap(docToInsert->asyncBucket.insert(docToInsert))
.retryWhen(RetryBuilder.anyOf(TemporaryFailureException.class).delay(Delay.exponential(TimeUnit.MILLISECONDS, 5)).max(3).build())
.map(doc->convertToJava(JsonObject.fromJson(doc.content()),CouchbaseEntity.class).getId())
.toBlocking()
.forEach(id->insertedIds.add(id));
Req:
- Массовая вставка документов в диван определенного типа.
- Создайте еще один документ, в котором будут только идентификаторы документов, которые мы вставили на первом шаге.
- Если первые шаги завершаются неудачно на каком-то идентификаторе, который мы перестаем вставлять, тогда второй документ должен содержать только те идентификаторы, которые были вставлены до возникновения исключения.
- Звонок синхронный
Я новичок в RxJava. Я написал приведенный выше код реакции, но мне кажется, что я не совсем понял некоторые концепции. Моя идея заключалась в том, что forEach в конце всегда будет получать излучаемый элемент, и если произойдет исключение, я поймаю его, а затем позже использую список вставленныхIds для создания вторых документов. Однако в списке всегда есть все идентификаторы, которые не соответствуют моим требованиям.
Может ли кто-нибудь объяснить, что не так с кодом и как я могу достичь вышеуказанных требований?