RxJava и долгоиграющий слушатель

Итак, я новичок в RxJava2 (ну, я тоже не знаю RxJava) и пытаюсь разработать приложение для Android, используя структуру RxJava2 и MVP.

В этом приложении я делаю асинхронные вызовы к библиотеке, которая использует прослушиватели. Я устанавливаю слушателя, используя «стандартный» метод setListener/registerListener.

Один из методов возвращает значения «в реальном времени» -> я вызываю метод start() моей библиотеки, а затем буду получать уведомления от моего слушателя при каждой модификации списка (когда происходит добавление/удаление элементов).

Я действительно не понимаю, как я могу добиться такого поведения с помощью RxJava, поскольку слушатель подписан в определении эмиттера/подписчика? Где я должен объявить слушателя? Где мне отписаться? Какой объект я должен использовать?

Я начал разработку с помощью Nucleus, но могу переключиться на другой шаблон или сделать его самостоятельно.

Вот некоторый псевдокод, иллюстрирующий мой вопрос:

До

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    ...

    mMyLib.setListener(this);
    mMyLib.startDiscovery();
}

@Override
public void itemListChanged(List<Dummy> items) {
    // update the UI with the list items
}

@Override
protected void onDestroy() {
    super.onDestroy();
    mMyLib.setListener(null);
}

Использование Nucleus в моем презентере

Где я должен отказаться от подписки, если я хочу получать изменения в списке, пока мои действия/ведущие активны? Использую ли я правильный синтаксис/объекты?

private static final int REQUEST_ITEMS = 1;
private PublishSubject<Integer> pageRequests = PublishSubject.create();

...

@Override
public void onCreate(Bundle savedState) {
    super.onCreate(savedState);

    restartableReplay(REQUEST_ITEMS,
            () -> Observable.create(e ->
                    {
                        mMyLib.setListener(new MyLib.Listener() {
                            @Override
                            public void itemListChanged(List<Dummy> items) {
                                Log.d(TAG, "meh itemListChanged");
                                e.onNext(items);
                                e.onComplete();
                            }
                        });
                        mMyLib.startDiscovery();
                    }
            )
            ,
            MainFragment::onItems,
            MainFragment::onNetworkError);
}

void request() {
    start(REQUEST_ITEMS);
}

person w00ly    schedule 12.04.2017    source источник


Ответы (1)


Вы в правильном направлении, это правильный способ обернуть асинхронные обратные вызовы с помощью RxJava, несколько комментариев:

  • Вы вызываете e.onComplete() на itemListChanged, это неправильно, так как это завершит вашу последовательность Observable, вам может вообще не понадобиться вызывать onComplete(), так как он никогда не завершает уведомления из внешнего источника, или вызывать его один раз после реального окончания уведомлений ( внешний источник заканчивает производить предметы), не путайте это с отпиской от источника.
  • Чтобы логика отписки что-то делала, вы должны определить ее в create(), вызвать e.setCancellable() с логикой отмены (mMyLib.setListener(null) или любым дополнительным кодом ресурсов очистки)
  • в этом случае кажется, что у вас есть только 1 подписчик, но в противном случае рассмотрите возможность использования share(), чтобы иметь «горячий» Observable, который может выполнять многоадресную рассылку нескольким подписчикам.
  • что касается библиотеки Nucleus, насколько я помню restartableReplay, будет кэшировать ваши Observable и воспроизведение испущенных элементов, это может быть неправильно с таким «горячим» потоком событий (если вам не нужно воспроизвести, возможно, последний выпуск или что-то в этом роде), кроме того, кэш будет проблематичным, так как вы потеряете возможность отписки, поэтому обязательно используйте Nucleus прямо здесь.

Где я должен отписаться, если я хочу получать изменения списка, пока моя активность/ведущий жив? Использую ли я правильный синтаксис/объекты?

Вам просто нужно отказаться от подписки везде, где вы хотите перестать получать уведомления, будь то onStop() или onDestory() зависит от вас.

person yosriz    schedule 13.04.2017
comment
Хороший ответ, спасибо :) Как вы думаете, было бы лучше для меня избавиться от Nucleus и просто реализовать свой собственный MVP (я нашел много шаблонов вокруг)? Я быстро отредактирую свой вопрос с предложенным исправлением. - person w00ly; 13.04.2017
comment
ну, я плохо знаком с Nucleus, это может быть полезно для использования MVP, просто будьте осторожны, когда вы используете его возможности повторного подключения к функциям активности/фрагмента в вашем случае - person yosriz; 13.04.2017