flatMapFirst: flatMap, который добавляет новую наблюдаемую только в том случае, если предыдущая закончилась

Как реализовать оператор flatMapFirst, похожий на flatMap, но добавляющий новый наблюдаемый только в том случае, если предыдущий закончился? Если предыдущий все еще работает, он просто игнорирует новый наблюдаемый. Как это реализовать в RxJava 2?

В беконе он уже есть — flatMapFirst И в кефире — flatMapFirst.

введите здесь описание изображения


person Dmitry Ryadnenko    schedule 24.01.2017    source источник


Ответы (3)


Вам нужен не новый оператор, а комбинация существующих:

source.onBackpressureLatest().flatMap(function, 1)

FlatMap будет запускать 1 внутренний источник одновременно, а onBackpressureLatest будет продолжать отбрасывать значения внешнего источника (кроме последних), если нет спроса, пока flatMap запускает 1 внутренний источник.

Если вы не хотите продолжать использовать последнюю версию, доступную из исходного кода, рассмотрите возможность использования вместо этого onBackpressureDrop.

person akarnokd    schedule 24.01.2017
comment
Кажется, он работает с onBackpressureDrop() вместо onBackpressureLatest(). Является ли onBackpressureDrop() правильным? - person Dmitry Ryadnenko; 24.01.2017
comment
Я собираюсь исправить это на onBackpressureDrop(), не стесняйтесь исправлять его обратно, если это ошибка - person Dmitry Ryadnenko; 24.01.2017
comment
Они имеют разное семантическое значение: с onBackpressureDrop у вас может быть пробел в обработке, если источник по какой-то причине остановился после всплеска. - person akarnokd; 24.01.2017
comment
@akarnokd, похоже, это работает только с Flowable-s. Есть ли какой-нибудь сокращенный аналог flatMapFirst для Observable без преобразования в Flowable? - person mekarthedev; 13.04.2018
comment
Нет, это эффект противодавления, доступный по дешевке через Flowable. Вам придется написать собственный оператор Observable. - person akarnokd; 13.04.2018

Дополнение к ответу Дэвида. Если вы ищете способ иметь flatMapFirst в RxJava 2 Observable, а не Flowable, вот быстрая реализация Kotlin:

fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
    toFlowable(BackpressureStrategy.DROP)
        .flatMap({ transform(it).toFlowable(BackpressureStrategy.BUFFER) }, 1)
        .toObservable()

UPD. Альтернативная реализация, основанная на предложении Дэвида Карнока:

fun <T, R> Observable<T>.flatMapFirst(transform: (T) -> Observable<R>) =
    Observable.defer {
        val busy = AtomicBoolean()
        return@defer this
                .filter { busy.compareAndSet(false, true) }
                .flatMap {
                    transform(it).doAfterTerminate { busy.set(false) }
                }
    }
person mekarthedev    schedule 13.04.2018
comment
BackpressureStrategy.MISSING — неправильная стратегия. Вам нужно применить оператор onBackpressureXXX, иначе flatMap, скорее всего, переполнит свою внутреннюю очередь. - person akarnokd; 13.04.2018
comment
@akarnokd, как я понял, .BUFFER будет разумным эквивалентом поведения SpscLinkedArrayQueue под капотом Observable.flatMap. Или я что-то упускаю? - person mekarthedev; 14.04.2018
comment
Вот и вопрос, должен ли там быть БУФЕР или что-то еще :) Отредактировал. - person mekarthedev; 14.04.2018
comment
Это может привести к избыточному использованию памяти. Вы можете просто отфильтровать элементы восходящего потока, пока не завершится внутренний источник: filter(v->atomicBoolean.compareAndSet(false, true)) и flatMap(v -> obs.doAfterTerminate(() -> atomicBoolean.set(false)). - person akarnokd; 14.04.2018

Мне удалось решить это с помощью этого:

/**
 * Flatmaps upstream items into [source] items.
 * Ignores upstream items if there is any [source] instance currently running.
 *
 * ```
 * upstream ----u-----u---u-------u---------------|-->
 *              ↓                 ↓               ↓
 * source       ---s-------|->    ---s-------|->  ↓
 *                 ↓                 ↓            ↓
 * result   -------s-----------------s------------|-->
 * ```
 */
fun <T, R> Observable<T>.flatMapWithDrop(source: Observable<R>): Observable<R> {
  return this.toFlowable(BackpressureStrategy.DROP)
    .flatMap({ source.toFlowable(BackpressureStrategy.MISSING) }, 1)
    .toObservable()
}
person Dmitry Ryadnenko    schedule 13.04.2018