Как заставить общий наблюдаемый объект выдавать новое значение из .startWith() каждый раз, когда общий поток получает нового подписчика?

Мне нужна общая наблюдаемая, чтобы выдавать новое значение .startWith() каждый раз, когда подписывается общий поток (счетчик подписчиков идет от 0 до 1). К сожалению, значение, возвращаемое .startWith(), повторно используется в течение всего времени существования общего наблюдаемого экземпляра, даже после того, как у этого общего потока нет подписчиков.

В идеале была бы перегрузка .startWith(), которая принимает функцию в качестве параметра и повторно выполняет ее каждый раз, когда количество подписчиков изменяется с 0 на 1.

var count: Int = 0

@Before
fun setUp() {
    count = 0 //reset
}

fun getTheCount(): Int {
    count++
    return count
}

@Test
fun startWithDefaultValue() {
    val relay = PublishRelay.create<Int>()

    val instance by lazy {
        relay
                .startWith(getTheCount())
                .share()
    }

    val disposable1 = instance.subscribe {
        println(it) //should print 1, and does!
    }

    disposable1.dispose() //subscriber count on shared instance drops from 1 to 0

    //should print 2, but prints 1. getTheCount() is not called again on this subscription
    val disposable2 = instance.subscribe {
        println(it) 
    }
}

person ZakTaccardi    schedule 26.04.2017    source источник


Ответы (1)


Я предполагаю, что val instance by lazy означает инициализацию один раз, и поэтому у вас есть один вызов метода getTheCount(). (Я также надеюсь, что вы понимаете, что Observable.just(getTheCount()) будет выдавать одно и то же значение каждому наблюдателю и больше никогда не будет «вызывать» getTheCount().)

Вы должны либо отложить ленивые внутренности:

Observable.defer(() -> relay.startWith(getTheCount()))
.share();

Или используйте fromCallable и concatWith:

relay.concatWith(Observable.fromCallable(() -> getTheCount()))
.share();
person akarnokd    schedule 26.04.2017
comment
обратите внимание, я пошел с .startWith(Observable.fromCallable{ getTheCount }) - person ZakTaccardi; 03.05.2017