rxjs: объединить результат наблюдаемых при отображении первого с помощью async pipe

Каков наилучший способ в angular с rxjs уже отображать результат первого наблюдаемого и объединять данные, когда другие наблюдаемые завершены?

Пример:

@Component({
    selector: 'app-component',
    template: `
<div *ngFor="let group of groups$ | async">
    <div *ngFor="let thisItem of group.theseItems">
        ...
    </div>
    <div *ngFor="let thatItem of group.thoseItems">
        ...
    </div>
</div>
`
})
export class AppComponent implements OnInit {
    ...
    ngOnInit() {
        this.groups$ = this.http.get<IThisItem[]>('api/theseItems').pipe(
            map(theseItems => {
                return theseItems.groupBy('groupCode');
            })
        );

        // combine these results? This operation can take 5 seconds
        this.groups$$ = this.http.get<IThatItem[]>('api/thoseItems').pipe(
            map(thoseItems => {
                return thoseItems.groupBy('groupCode');
            })
        );
    }
}

Я понимаю, что это можно сделать, подписавшись на оба и объединив результаты. Но можно ли использовать для этого конвейерные операторы, а также канал async?


person devqon    schedule 02.12.2019    source источник
comment
Чтобы прояснить, вы хотите получить одно излучение с этими элементами, а после того, как вы хотите, чтобы одно излучение с этими элементами было объединено с этими элементами?   -  person MoxxiManagarm    schedule 02.12.2019
comment
Да, но теоретически могло быть и наоборот; Это thoseItems испускается до theseItems испускается   -  person devqon    schedule 02.12.2019


Ответы (3)


Я думаю, вы могли бы использовать оператор combineLatest rxjs. Это может означать, что вы также немного измените обработку в своем шаблоне.

Я не мог использовать ваш пример, поскольку я не знаю ваших функций get, но в основном применяется тот же принцип.

Проверьте stackblitz здесь, пример:

export class AppComponent  {

  private firstObservable = of([{name: 'name1'}]).pipe(startWith([]));
  private secondObservable = of([{name: 'name2'}]).pipe(startWith([]));

  combined = combineLatest(this.firstObservable, this.secondObservable).pipe(
        map(([firstResult, secondResult]) => {
          return [].concat(firstResult).concat(secondResult)
        }) 
   );
}

Вывод в формате HTML:

<span *ngFor="let item of combined | async">{{item.name}}<span>
person Athanasios Kataras    schedule 02.12.2019
comment
Разве этот метод не ждет, пока оба наблюдаемых объекта испускаются, прежде чем отобразить их вместе? - person devqon; 02.12.2019
comment
Нет, это оператор forkJoin (который ожидает завершения наблюдаемого). Комбайн будет излучать каждый раз, когда какой-нибудь наблюдаемый выдает значение. Конечно, вы всегда должны проверять, есть ли какое-то значение для объединения. - person Athanasios Kataras; 02.12.2019
comment
@devqon правильно говорит о том, что оба Observable должны излучать перед объединенными излучениями, но это можно решить с помощью простого startsWith. Взято из rxjs docs: Имейте в виду, что combLatest не будет генерировать начальное значение до тех пор, пока каждый наблюдаемый объект не выдаст хотя бы одно значение. - person Daniel B; 02.12.2019
comment
@AthanasiosKataras, похоже, это не работает, см. этот раздвоенный стек. Значения отображаются только после разрешения обеих наблюдаемых - person devqon; 02.12.2019
comment
Это потому, что вы применяете задержку к первому значению. Проверьте обновленный stackblitz с правильно примененной задержкой. - person Athanasios Kataras; 02.12.2019
comment
Задержка применяется ко всему потоку, а не только к первому значению, поэтому он не будет имитировать истинный пример. Я отредактировал ссылку в ответе, если вы хотите по ней перейти. - person Athanasios Kataras; 02.12.2019
comment
Замечательно, кажется, я действительно неправильно использовал delay - person devqon; 02.12.2019

Вы можете использовать слияние и сканирование.

  first$: Observable<Post[]> = this.http.get<Post[]>('https://jsonplaceholder.typicode.com/posts?userId=1');
  second$: Observable<Post[]>  = this.http.get<Post[]>('https://jsonplaceholder.typicode.com/posts?userId=2');
  combinedPosts$: Observable<Post[]> = merge(this.first$, this.second$).pipe(
    scan((acc: Post[], curr: Post[]) => [...acc, ...curr], [])
  )

https://www.learnrxjs.io/operators/combination/merge.html Сделайте одно наблюдаемым из многих.

https://www.learnrxjs.io/operators/transformation/scan.html Сканирование похоже на array.reduce ... вы можете накапливать результаты каждого наблюдаемого излучения.

Рабочий пример: https://stackblitz.com/edit/angular-lrwwxw

Оператор combineLatest менее идеален, поскольку он требует, чтобы каждый наблюдаемый объект излучал, прежде чем объединенный объект наблюдения может излучать: https://www.learnrxjs.io/operators/combination/combinelatest.html

Имейте в виду, чтоcommonLatest не будет генерировать начальное значение, пока каждый наблюдаемый объект не испускает хотя бы одно значение.

person spierala    schedule 02.12.2019
comment
В отличие от других ответов, похоже, это именно то, что я хочу, спасибо :) - person devqon; 02.12.2019

Асинхронный канал - это просто подписчик на наблюдаемый ... Чтобы ответить на ваш вопрос, вы можете использовать любые возможные способы ... Например:

<div *ngFor="let group of groups$ | async as groups">
    <div *ngFor="let thisItem of group.theseItems">
        ...
    </div>
</div>

public groups$: Observable<type> = this.http.get<IThatItem[]>.pipe(
  startWith(INITIAL_VALUE)
);

or

public groups$: Observable<type> = combineLatest(
  of(INITIAL_VALUE),
  this.http.get<IThatItem[]>
)
person Timothy    schedule 02.12.2019