Как суммировать внутреннее свойство вложенных наблюдаемых массивов с помощью Angular / RXJS?

У меня возникли проблемы с получением суммы (или любого уменьшения) внутреннего числового свойства Observable в другом Observable.

У меня есть наблюдаемый массив объектов Account (Observable<AppAccount[]>).

export interface AppAccount {
    _id?: string;
    name: string;
}

И наблюдаемый массив объектов Balance, каждый из которых имеет accountId. с учетной записью может быть связано много балансов (отсортировано / отфильтровано по дате, но эта часть удалена для краткости)

export interface AccountBalance {
    _id?: string;
    accountId: string;
    amount: number;
}

У меня есть вспомогательный метод, который возвращает только последнюю сумму объекта Balance для данной учетной записи.

getLastAmount(account: AppAccount): Observable<number> {
    return this.balanceService.balances$.pipe(
      map(balances => {
        let last = balances.filter(balance => {
          return balance.accountId === account._id;
        }).sort().pop();
        //console.log(last)
        return last ? last.amount : 0;
      }),
      tap(amount => console.log(`getLastAmount() => ${amount}`)),
    );
  }

Теперь я пытаюсь написать метод, который будет перебирать учетные записи, вызывать getLastAmount () для каждого, а затем суммировать их все и возвращать Observable. Вот что мне удалось до сих пор:

getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
    return accounts$.pipe(
      map(accounts => from(accounts)),
      mergeAll(),
      mergeMap(account => this.getLastAmount(account)),
      reduce((sum, current) => {
        console.log(`${sum} + ${current}`);
        return sum + current;
      }, 0)
    );
  }

Но это, кажется, никогда не вернется и застревает в бесконечном цикле ??

Имея только одну учетную запись и один связанный баланс, с балансом, имеющим «сумму» «10», я получаю это из моего журнала консоли: «0 + 10» снова и снова, и сетевой журнал также подтверждает, что он вызывает getBalances ( ) непрерывно.

Я на правильном пути? Есть ли способ лучше? Почему этот канал RXJS застревает в петле?

ИЗМЕНИТЬ: я внес некоторые изменения на основе предложений Пиччи:

getTotalBalance(accounts$: Observable<AppAccount[]>): Observable<number> {
    return accounts$.pipe(
      map(accounts => accounts.map(account => this.getLastAmount(account))),
      concatMap(balances$ => { console.log('balances$', balances$); return forkJoin(balances$); }),
      tap(balances => console.log('balances', balances)),
      map(balances => balances.reduce(
        (amountSum, amount) => {
          console.log(`${amountSum} + ${amount}`)
          amountSum = amountSum + amount;
          return amountSum
        }, 0))
    );
  }

Но это все равно не возвращается, или труба не достраивает? Я сделал stackblitz здесь: https://stackblitz.com/edit/angular-rxjs-nested-obsv Если вы проверите вывод консоли, он, похоже, не продвинется дальше вызова forkJoin ...


person Jonathon Hoaglin    schedule 21.07.2020    source источник
comment
Не уверен насчет бесконечного цикла, но я думаю, что он не возвращается, потому что вы используете reduce, который отправит уменьшенное значение, когда источник (accounts$ в данном случае) завершится. Если вы хотите получать значения на каждой итерации уменьшения, вы можете использовать scan.   -  person Andrei Gătej    schedule 21.07.2020


Ответы (2)


Если я правильно понимаю, вы можете поступить так

// somehow you start with the observable which returns the array of accounts
const accounts$: Observable<AppAccount[]> = getAccounts$()
// you also set the date you are interested in
const myDate: Moment = getDate()

// now you build the Observable<number> which will emit the sum of the last balance amounts
const amountSum$: Observable<number> = accounts$.pipe(
  // you transform an array of accounts in an array of Observable<number> representing the last Balance amount
  map((accounts: Account[]) => {
    // use the getLastAmount function you have coded
    return accounts.map(account => getLastAmount(account, myDate))
  }),
  // now we trigger the execution of the Observable in parallel using concatMap, which basically mean wait for the source Observable to complete
  // and forkJoin which actually executes the Observables in parallel
  concatMap(accounts$ => forkJoin(accounts$)),
  // now that we have an array of balances, we reduce them to the sum using the Array reduce method
  map(balances => balances.reduce(
    (amountSum, amount) => {
      amountSum = amountSum + amount;
      return amountSum
    }, 0)
  )
)

// eventually you subscribe to the amountSum$ Observable to get the result
amountSum$.subscribe({
  next: amountSum => console.log(`The sum of the last balances is: ${amountSum}`),
  error: console.err,
  complete: () => console.log("I am done")
})

Могут быть и другие комбинации, которые приводят к тому же результату, но, похоже, это работает, и его можно проверить в этом stackblitz.

Если вас интересуют некоторые частые шаблоны RxJS с HTTP-вызовами, вы можете прочитать этот блог.

person Picci    schedule 22.07.2020
comment
строка concatMap(accounts$ => forkJoin(accounts$)), - полезный трюк, но канал, похоже, не проходит мимо этой функции rxjs. Я создал stackblitz здесь: stackblitz.com/edit/angular-rxjs-nested- obsv - person Jonathon Hoaglin; 22.07.2020
comment
Вы моделируете http с помощью BehaviorSubject, а не с помощью of функции RxJS. Это вызывает проблему. Вот объяснение. forkJoin излучает, когда все входные Observables завершены. Observable, возвращаемый http-клиентом, выдает одно единственное значение, а затем завершает, так же как и функция of. Subject никогда не завершается. Итак, если вы передадите Subject в forkJoin, результат никогда не будет выдан, и канал не будет продолжен. Замените get balances$() { return this._balances$.asObservable(); } на get balances$() { return of(this.balances); } в AccountBalanceService, и все заработает. - person Picci; 23.07.2020
comment
Ага! проблема в том, что BehaviourSubjects никогда не завершались. Я не хотел менять структуру своего сервиса, но из этого намёка я обнаружил, что просто использование combineLatest вместо forkJoin отлично работает! - person Jonathon Hoaglin; 24.07.2020

Ну, во-первых, я не думаю, что вам следует использовать такие наблюдаемые.

Если вам нужен только totalBalance, вы можете использовать что-то вроде этого (:

  private appAcount$ = from<AppAccount[]>([
    { _id: '1', name: 'user-1' },
    { _id: '2', name: 'user-2' },
    { _id: '3', name: 'user-3' },
  ]);

  // this would be your http call
  public getBalances(accountId: string): Observable<AccountBalance[]> {
    const ab = [
      { _id: '1', accountId: '1', amount: 100 },
      { _id: '2', accountId: '2', amount: 200 },
      { _id: '3', accountId: '2', amount: 300 },
      { _id: '4', accountId: '3', amount: 400 },
      { _id: '5', accountId: '3', amount: 500 },
      { _id: '6', accountId: '3', amount: 600 },
    ];

    return of(ab.filter(x => x.accountId === accountId));
  }

  lastAmounts$: Observable<AccountBalance[]> = this.appAcount$
    .pipe(
      switchMap(appAccount => 
        this.getBalances(appAccount._id)
          .pipe(
            // replace this with your date filter
            map(balances => [balances[balances.length - 1]]) 
          )
      ),
      scan((acc, c) => [ ...acc, ...c ])
    );

  totalBalance$ = this.lastAmounts$
    .pipe(
      map(x => x.reduce((p, c) => p += c.amount, 0))
    )

Если вам нужен только общий баланс, вы можете просто подписаться на наблюдаемый totalBalance $.

Однако позвольте мне сказать, что я бы не рекомендовал выполнять HTTP-вызов для каждого appAccount, если вы можете выполнить пакетную выборку всех AccountBalance из всех ваших appAccounts - таким образом вы можете просто использовать combineLatest как для appAccounts$, так и для balances$.

person arvil    schedule 21.07.2020
comment
В итоге я последовал совету пакетной выборки балансов, и хитроумное использование combLatest помогло мне добиться цели. Спасибо! - person Jonathon Hoaglin; 24.07.2020