Подписка RxJava на побочный эффект

У меня есть вопрос, это простой поток бизнес-логики:

проверьте, находится ли в кеше сотрудник из нескольких отделов, отношений между отделами и сотрудниками, сначала проверьте, существуют ли отношения в кеше, если они существуют, проверьте, принадлежит ли им сотрудник, в случае отсутствия в кеше, получите его из базы данных и проверьте отношения о сотруднике, а затем сохраните информацию об отделе в кеш.

это код:

public Observable  isEmployeeInDepartment(List<Long> departmentIds, long employeeId){

     //this observable will resolve twice, and cause unnecessary cache access
     Observable  departmentInfoExsitInCache= checkDepartmentInfoFromCache(...).share();   

     Observable  departInfoNotInCache = departmentInfoExsitInCache.filter(...);

     //this observable will resolve twice, and cause unnecessary database access
     Observable  departmentInfoFromDb=departInfoNotInCache.flatMap(departmentIds->checkFromDb()).share(); 

     Observable<Long> saveResult=departmentInfoFromDb.flatMap(departmentInfo->saveToCache());

     Observable<Long> departInfoInCache = departmentInfoExsitInCache.filter(...);

     return departInfoInCache.check(userId).merge( departmentInfoFromDb.check(userId)).doOnCompleted(saveResult.subscribe());
}

проблема в том, что отделInfoExsitInCache и saveResult будут разрешены дважды при подписке на клиентский метод.

Я обнаружил, что после удаления кода подписки сохранения .doOnCompleted(saveResult.subscribe()) он станет нормальным и разрешится только один раз. Что-то не так с этим кодом?


person junfei wang    schedule 27.04.2017    source источник


Ответы (1)


Вы неправильно используете общий ресурс.
Проблема в том, что share() в этом случае вам не поможет. share() на самом деле publish().autoConnect() предназначена для сохранения единой подписки на поток, поэтому повторный вызов subscribe не вызовет повторного вызова логики подписки, а просто подключит вас к существующему потоку.
Но после того, как все подписчики отпишутся из общего потока Observable откажется от подписки. Это означает, что при повторном вызове subscribe() вы вызовете логику подписки и снова вызовете DB/Cache.

Итак, вы снова подписываетесь на общего оператора после того, как он отписался. (в doOnCompleted()), это приведет к тому, что departmentInfoFromDb и departmentInfoExsitInCache снова подпишутся и перейдут к DB/Cache.

Рассмотрите возможность использования оператора cache()/reply() для сохранения извлеченного значения из БД/кэша между подписками.

person yosriz    schedule 27.04.2017
comment
на самом деле, я не знаю никакого метода, позволяющего разрешить saveResult, когда весь возвращаемый наблюдаемый объект получает подписку, кроме как поместить его в основной метод возвращаемого наблюдаемого объекта doOnCompleted(), чтобы он также был подписан. - person junfei wang; 27.04.2017
comment
если вы хотите, чтобы saveResult подписывался после объединенного Obesrvable в последней строке, вы можете использовать оператор concat(). но все равно это не решит этот вопрос - person yosriz; 27.04.2017