RxJS Observable forkJoin не выполняется параллельно

У меня есть следующий код, и, хоть убей, я не могу понять, почему запросы не выполняются одновременно. Я все еще новичок в RxJS и наблюдаемых, поэтому я буду очень признателен за любую помощь в улучшении кода ниже. По сути, я вызываю REST API на бэкэнде, чтобы получить данные. Затем для каждого элемента в этом массиве данных я делаю еще один запрос к другой конечной точке (следовательно, использую оператор forkJoin). Все запросы отправляются сразу, но кажется, что они выполняются один за другим, а не одновременно.

this.sites$.subscribe(data => {

    // data.forEach(element => {
    //     this.siteCaptureMap[element.id] = new CaptureData();
            
    //     this.sitesService.getCaptureData(element.nameOrNumber, element.owner.name).subscribe(data => {
    //         this.siteCaptureMap[element.id].count = data.length;
    //     });
    // });

    var obs: Observable<any>[] = [];
    for (var _i = 0; _i < data.length; _i++) {
        this.siteCaptureMap[data[_i].id] = new CaptureData();
        this.siteCaptureMap[data[_i].id].id = _i;
        obs.push(this.sitesService.getCaptureData(data[_i].nameOrNumber, data[_i].owner.name));
    }

    forkJoin(obs).subscribe(results => {
        for (var _i = 0; _i < results.length; _i++) {
            this.siteCaptureMap[data[_i].id].count = results[_i].length;
        }
    });


    this.dataSource.data = data;
    this.dataSource.filteredData = data;
});

Опять же, любая помощь будет принята с благодарностью. Если мне нужно что-то уточнить или предоставить дополнительные фрагменты кода, дайте мне знать! Спасибо!


person mitch.lotierzo    schedule 14.12.2020    source источник


Ответы (2)


Наличие вложенных subscribes может привести к утечкам памяти и может быть трудным для отмены подписки, поэтому всякий раз, когда у вас есть вложенные subscribe, подумайте о switchMap, concatMap и mergeMap.

Все они имеют небольшие вариации, но они переключаются на новую наблюдаемую из предыдущей наблюдаемой. В этом сообщении объясняются различия.

Для вас я бы попробовал сделать:

import { switchMap } from 'rxjs/operators';

...
this.sites$.pipe(
  switchMap(data => {
    let obs: Observable<any>[] = [];
    for (let _i = 0; _i < data.length; _i++) {
        this.siteCaptureMap[data[_i].id] = new CaptureData();
        this.siteCaptureMap[data[_i].id].id = _i;
        obs.push(this.sitesService.getCaptureData(data[_i].nameOrNumber, data[_i].owner.name));
    }

    return forkJoin(obs);
  }),
).subscribe(results => {
        for (let_i = 0; _i < results.length; _i++) {
            this.siteCaptureMap[data[_i].id].count = results[_i].length;
        }

        this.dataSource.data = data;
        this.dataSource.filteredData = data;
    });

Примечание: используйте let и const вместо var.

Кроме того, если вы видите, что все запросы отправляются одновременно, это все, на что вы можете надеяться. Если он возвращается последовательно, это может быть либо браузер, либо сервер, вызывающий это.

person AliF50    schedule 14.12.2020

Сначала я бы изменил код, сделав его более идиоматическим, удалив вложенные подписки и используя pipe, сделав его более функциональным. Встроенные комментарии пытаются объяснить изменения

this.sites$.pipe(
  // this is the rxjs map operator that transform the data received from
  // the rest api into something different
  map(data => {
    // I create the obs array using the map method of JS arrays - this is 
    // NOT the rxjs map operator
    obs = data.map((element, _i) => {
       this.siteCaptureMap[element.id] = new CaptureData();
       this.siteCaptureMap[element.id].id = _i;
       return this.sitesService.getCaptureData(element.nameOrNumber, element.owner.name)
    });
    // return both the array of observables and the data
    return [data, obs];
  }),
  // concatMap makes sure that you wait for the completion of the rest api
  // call and the emission of the data fetched before you execute another
  // async operation via forkJoin
  concatMap(([data, obs]) => forkJoin(obs).pipe(
    // this is the rxjs map operator used to return not only the results
    // received as result of forkJoin, but also the data received with the
    // first rest call - I use this operator to avoid having to define 
    // a variable where to store 'data' and keep the logic stateless 
    map(results => ([data, results]))
  ))
).subscribe(([data, results]) => {
  // here I use forEach to loop through the results and perform
  // your logic
  results.forEach((res, _i) => this.siteCaptureMap[data[_i].id].count = res.length)
})

Я думаю, что эта форма больше соответствует rxjs и его функциональному духу и должна быть эквивалентна вашей исходной форме.

В то же время мне было бы интересно узнать, почему вы говорите, что ваш код выполняет вызовы внутри forkJoin по одному за раз.

Кстати, если вам интересно посмотреть на общие шаблоны использования rxjs с http, вы можете прочтите эту статью.

person Picci    schedule 14.12.2020