Как создать подписку rxJS с динамическим массивом?

  • Есть функция, которая вызывается из любого места в приложении, которому передается объект.
  • Что нам нужно сделать, так это вызвать серверный API с этим элементом как Post
    Body.
  • Однако я бы хотел сопоставить эти обновления в течение определенного периода времени, а затем отправить их вместе, чтобы
    уменьшить количество обращений к бэкэнду.

Итак, я думаю о добавлении этих элементов в массив. Создайте наблюдаемый объект с этим массивом и выделите bufferTime для буферизации значений в течение определенного периода времени перед их отправкой.

Я создал Stackblitz - https://stackblitz.com/edit/typescript-fzezep?file=index.ts&devtoolsheight=100, и если вы видите результат - выводятся первые 20 значений, те, которые добавляются перед подпиской. Но последние 20 значений никогда не выдаются.

Итак, подписка завершена, но тогда как создать подписку с динамическим массивом?


person Roshan Khandelwal    schedule 23.10.2020    source источник


Ответы (1)


Это довольно распространенная ошибка rxjs, которую вы там сделали.

Итак, сначала давайте пойдем по вашему примеру построчно

// You create an empty array
const dataArray = [];

// You create an observable that will point to the created dataArray
const bufferBy = from(dataArray);
let myBufferedInterval;

// You fill the dataArray with 20 elements
for (var i = 0; i <= 20; i++) {
  dataArray.push(i);
}
// You create an observable that has will have for it's source from(dataArray)
myBufferedInterval = bufferBy.pipe(bufferCount(5));
// You create a SUBSCRIPTION on myBufferInterval which will go upstream to the observable that points to the dataArray with 20 element and make everything alive
const buffered = myBufferedInterval.subscribe(val => console.log(val));

// Here you add more element to the dataArray, but they will not result with new values in your subscription as your subscription points to the instance of dataArray with 20 elements (so each subscribe makes a individual subscription pointing to it's own instance of the dataArray)
for (var i = 21; i < 40; i++) {
  dataArray.push(i);
}

console.log(dataArray)

// If you create new subscriton here it will have as reference an array that holds 40 elements. so you will be able to see the rest of the numbers (20+)

myBufferedInterval.subscribe(val => console.log(val));

rxjs способ обработки этого будет примерно таким, как в следующем фрагменте. Разница в том, что вместо использования array im используется объект rxjs Subject, который поддерживает метод next.

Итак, то, что я делаю, - это создать subscriton для этого Subject, направить его через buffer, чтобы получить партии из 5 элементов, и я начинаю ждать заполнения буфера.

После этого всякий раз, когда я использую метод next, Subject будет выдавать новое значение своим подписчикам, и в нашем случае он начнет заполнять наш буфер.

let { from, interval, Subject } = rxjs;
let { bufferCount, bufferTime } = rxjs.operators;

const requestHolder = new Subject();
requestHolder.pipe(bufferCount(5)).subscribe(val => console.log(val));

for (var i = 0; i <= 10; i++) {
  requestHolder.next(i);
}

for (var i = 11; i < 20; i++) {
  requestHolder.next(i);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.6.3/rxjs.umd.min.js"></script>

person Християн Христов    schedule 23.10.2020