RxAndroid, как определить, закончилась ли эмиссия observable

Я пишу следующий фрагмент кода, чтобы получить список сохраненных продуктов из базы данных Firebase, а затем, используя этот список, я снова извлекаю отдельные сведения о продуктах из базы данных Firebase.

Следующий код работает нормально, за исключением того, что я не могу понять, как сообщить второй flatMap, что эмиссия первой flatMap завершена (все продукты были обработаны). Поэтому я не могу вызвать метод onCompleted(), поэтому не могу определить, когда весь процесс завершится.

Взгляните на комментарии в следующем фрагменте:

Observable.create<List<PersonalizedFood>> {

            FirebaseDTDatabase.getSavedDietFoodQuery(user.uid).addListenerForSingleValueEvent(object : ValueEventListener {
                override fun onCancelled(p0: DatabaseError?) {

                }

                override fun onDataChange(p0: DataSnapshot?) {
                    val list = ArrayList<PersonalizedFood>()
                    p0?.let {
                        for (dateObject in p0.children) {
                            for (foodItem in dateObject.children) {
                                val food = foodItem.getValue(FBPersonalizedFood::class.java) as FBPersonalizedFood
                                list.add(PersonalizedFood(food))
                            }
                        }
                    }
                    it.onNext(list)
                    it.onCompleted()
                }
            })
        }.subscribeOn(Schedulers.io()).flatMap {
            Observable.from(it) // returning a Observable that emits items of list ("it" is the list here) 
        }.observeOn(Schedulers.io()).flatMap {
        // How does this flatMap know that emission of all item has been finished so that onCompleted() method could be called.
            personalizedFood ->

            Observable.create<Boolean>{
                FirebaseDTDatabase.getFoodListReference(personalizedFood.foodId).addListenerForSingleValueEvent(object :ValueEventListener{
                    override fun onCancelled(p0: DatabaseError?) {
                        it.onError(p0?.toException())
                    }

                    override fun onDataChange(p0: DataSnapshot?) {
                        if(p0 != null) {
                            val food = p0.getValue(FBFood::class.java)!!
                            val repo = LocalFoodRepository()
                            doAsync {
                                repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
                                repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
                                repo.saveFood(this@LoginActivity, personalizedFood)
                                it.onNext(true)
                            }

                        }else {
                            it.onNext(false)
                        }
                    }

                })
            }
        }.observeOn(Schedulers.io()).doOnCompleted{
            dismissProgressDialog()
            finish()
        }.doOnError{
            it.printStackTrace()
            dismissProgressDialog()
            finish()
        }.subscribe()

Спасибо.


person chandil03    schedule 19.07.2017    source источник
comment
Вы используете фаерабс? Есть некоторые сторонние RxFirebase, которые могут дать вам хорошую оболочку для базы данных Firebase.   -  person Phoenix Wang    schedule 19.07.2017
comment
@PhoenixWang Это другое дело. на самом деле я учусь, поэтому хочу знать, как этого добиться без использования какой-либо сторонней оболочки. И я не сторонник использования сторонних библиотек, если это достаточно просто сделать самостоятельно.   -  person chandil03    schedule 19.07.2017
comment
Ну, на самом деле вам не нужно. В первом наблюдаемом вы извлекаете все элементы PersonalizedFood и выдаете их в виде списка, а затем завершаете поток. Затем вы преобразуете его в набор элементов, каждый из которых обрабатывается внутри второго flatMap. Ключевым моментом является то, что каждый onCompleted передается по потоку, поэтому второй flatMap знает, что элементов больше не будет, и завершает себя. Я бы посоветовал проверить безопасность и согласованность кода, потому что некоторые моменты кажутся мне неправильными.   -  person MightySeal    schedule 19.07.2017


Ответы (1)


Observable из flatMap знает, «когда все элементы были завершены», когда все наблюдаемые, испускаемые им, вызвали onCompleted(). Второй flatMap в вашем коде никогда не вызывает onCompleted(), потому что ни один из наблюдаемых, которые он создает, не вызывает onCompleted().

Вы должны вызвать onCompleted() в своем методе onDataChange(). Поскольку каждый из наблюдаемых объектов, созданных в flatMap, генерирует только один элемент, его можно вызывать непосредственно после метода onNext():

override fun onDataChange(p0: DataSnapshot?) {
    if(p0 != null) {
        val food = p0.getValue(FBFood::class.java)!!
        val repo = LocalFoodRepository()
        doAsync {
            repo.insertFood(this@LoginActivity, Food(food.foodId, food.foodName, food.foodDesc))
            repo.insertServingDetails(this@LoginActivity, food.servingList.map { it.component2() })
            repo.saveFood(this@LoginActivity, personalizedFood)
            it.onNext(true)
            it.onCompleted()
        }
    } else {
        it.onNext(false)
        it.onCompleted()
    }
}
person Bryan    schedule 19.07.2017
comment
Это не соответствует моему требованию. Я хочу завершить действие, когда все данные будут получены. Вот почему я не могу позвонить onComplete на onDataChange(). - person chandil03; 20.07.2017
comment
@ chandi03 Chandil03 Это был бы метод doOnCompleted(), не так ли? Вы вызываете finish() внутри этого метода, поэтому добавление onComplete() должно привести к завершению действия. - person Bryan; 20.07.2017
comment
Я знаю, что если вы внимательно проверите мой код, я добавил вызов метода finish() в doOnCompleted(). если я позвоню onComplete() там, где вы мне предложите, doOnComplete method will be called as many times as much items are emitted by observable. Таким образом, finish() будет вызываться несколько раз, что приведет к сбою приложения, поскольку активность будет нулевой. - person chandil03; 20.07.2017
comment
@ chandil03 Это неверно; doOnCompleted() будет вызываться столько раз, сколько flatMap наблюдаемых вызовов onCompleted(). Метод flatMap создает Observable<Boolean> для каждого переданного ему List<PersonalizedFood>, а затем объединяет их в один Observable<Boolean>. Этот единственный наблюдаемый объект вызывает onCompleted() только после того, как все исходные наблюдаемые вызовут onCompleted(). Метод onOnCompleted() следует вызывать только один раз. Ты это пробовал? - person Bryan; 20.07.2017
comment
Круто, это сработало .., Но все еще ломаю голову, чтобы понять, onComplete() вызывается несколько раз, но почему он запускается только один раз? - person chandil03; 20.07.2017
comment
@chandil03 Chandil03 Потому что flatMap создает несколько наблюдаемых, а затем объединяет их в один наблюдаемый. Эта единственная наблюдаемая вызывает onComplete() только после того, как каждая из объединенных наблюдаемых вызывает onComplete(). - person Bryan; 20.07.2017