Проблема загрузки параллельного файла Android Retrofit

В моем приложении есть вариант использования для загрузки двух zip-файлов с сервера. Для этого я использовал модификацию + rxjava (создал две отдельные службы модификации). Для параллельного выполнения я подписывался на услугу модернизации в новом потоке, а затем объединял ее с помощью оператора zip. Это работает нормально. Но позже я добавил оператор карты в обе службы для операции распаковки, но он не выполняет код, написанный в операторе карты, и управление передается непосредственно операции zip. Я не знаю, как с этим справиться, и я новичок в реактивном мире.

Что я пробовал до сих пор

    Observable<Response<ResponseBody>> dFileObservable = dbDownloadApi.downloadDealerData(WebServiceConstants.ACTION_DEALER_DATA,
            params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());
    dFileObservable.map(new Function<Response<ResponseBody>, String>() {
        @Override
        public String apply(Response<ResponseBody> responseBody) throws Exception {
            String header = responseBody.headers().get("Content-Disposition");
            String filename = header.replace("attachment; filename=", "");
            String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
            String dealerZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.DEALER_FOLDER_NAME);
            fileManager.writeDownloadedFileToDisk(dealerZipPath,filename, responseBody.body().source());
            String dealerFilePath = dealerZipPath+File.separator+filename;
            unzipUtility.unzip(dealerFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
            return dealerFilePath;
        }
    });

   Observable<Response<ResponseBody>> generalFileObservable = dbDownloadApi.downloadGeneralData(WebServiceConstants.ACTION_GENERAL_DATA,
            params.getDealerNumber(),params.getUserId(),params.getClientId(), params.getSessionId()).subscribeOn(Schedulers.newThread());;
    generalFileObservable.map(new Function<Response<ResponseBody>, String>() {
        @Override
        public String apply(Response<ResponseBody> responseBody) throws Exception {
            String header = responseBody.headers().get("Content-Disposition");
            String filename = header.replace("attachment; filename=", "");
            String downloadFolderPath = fileManager.makeAndGetDownloadFolderPath();
            String generalZipPath = fileManager.makeFolder(downloadFolderPath, StrConstants.GENERAL_FOLDER_NAME);
            fileManager.writeDownloadedFileToDisk(generalZipPath,filename, responseBody.body().source());
            String generalFilePath = generalZipPath+File.separator+filename;
            unzipUtility.unzip(generalFilePath, fileManager.makeAndGetDownloadFolderPath()+File.separator+ StrConstants.GENERAL_FOLDER_NAME);
            return generalFilePath;
        }
    });

   Observable<String> zipped = Observable.zip(dealerFileObservable, generalFileObservable, new BiFunction<Response<ResponseBody>, Response<ResponseBody>, String>() {
        @Override
        public String apply(Response<ResponseBody> responseBodyResponse, Response<ResponseBody> responseBodyResponse2) throws Exception {
            System.out.println("zipped yess");
            return null;
        }
    }).observeOn(Schedulers.io());

    zipped.subscribe(getObserver());

и функция getObserver()

    private Observer<String> getObserver(){

    return new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String value) {

            System.out.println("------------total time-----------");
            System.out.println("result value-->"+value);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    };
}

Когда код выполняется, управление передается функции apply() в zip-операторе, а оператор карты в обоих наблюдаемых не выполняется.

И есть еще вопрос

Я объединяю/архивирую два наблюдаемых объекта, и оператору передается тип Response‹"ResponseBody">. На самом деле мне нужен путь к загруженному файлу (строковый тип), и что для этого мне делать?

**

Обновил решение, как описано @Yaroslav Stavnichiy, и теперь оно работает.

**

    Observable<String> deObservable =  dbDownloadApi.downloaddData(WebServiceConstants.ACTION_DATA,
            params.getNumber(),params.getId(),params.getCtId(), params.getSessionId())
            .flatMap(new Function<Response<ResponseBody>, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Response<ResponseBody> responseBody) throws Exception {
                    String zipPath = fileManager.processDownloadedFile(StrConstants.FOLDER_NAME,
                            StrConstants.FILE_NAME,responseBody.body().source());
                    return Observable.just(zipPath);
                }
            }).map(new Function<String, String>() {
                @Override
                public String apply(String filePath) throws Exception {
                    String unzipDestinationPath = fileManager.makeAndGetDownloadFolderPath()+
                            File.separator+ StrConstants.FOLDER_NAME;
                    unzipUtility.unzip(filePath, unzipDestinationPath);
                    return unzipDestinationPath;
                }
            }).subscribeOn(Schedulers.newThread());

person Sunny    schedule 11.01.2017    source источник


Ответы (1)


То, что вы эффективно делаете, это:

Observable a = ...;
Observable b = ...;
a.map(...);
b.map(...);
Observable.zip(a, b).subscribe(f);

map() (как и все остальные rx-операторы) не мутирует источник. Он возвращает новую наблюдаемую, которую вы можете использовать в дальнейших вычислениях. В вашем коде вы игнорируете эти возвращаемые объекты. Вы архивируете исходные наблюдаемые, а не сопоставленные, поэтому функции сопоставления не вызываются.

Я думаю, вы хотели сделать следующее:

Observable a = ... .map(...);
Observable b = ... .map(...);
Observable.zip(a, b).subscribe(f);
person Yaroslav Stavnichiy    schedule 11.01.2017
comment
Вы хотите написать определение во время объявления (например, написать карту и весь оператор) - person Sunny; 11.01.2017
comment
@Sunny Я имею в виду не потерять возвращаемое значение вызова map(), потому что это наблюдаемое, которое вы действительно хотите заархивировать. В настоящее время вы архивируете исходные наблюдаемые данные, игнорируя результат map(). - person Yaroslav Stavnichiy; 11.01.2017