Rx: как буферизовать элементы до тех пор, пока не будет выполнено условие

У меня есть поток элементов, и я хочу их буферизовать, пока один из элементов не будет соответствовать условию. После выполнения условия буферизованные элементы должны быть доставлены подписчику.

Если наблюдаемый источник завершается, но условие еще не выполнено, я хочу, чтобы возникла ошибка.

Можно ли этого добиться с помощью операторов по умолчанию в RxJava или другом языке семейства Rx?


person Kirill Rakhman    schedule 10.02.2016    source источник
comment
Вы можете ввести тему, чтобы узнать поведение, это приемлемо?   -  person Whymarrh    schedule 11.02.2016
comment
Любое работающее решение приемлемо, однако я не очень хорошо разбираюсь в предметах.   -  person Kirill Rakhman    schedule 11.02.2016


Ответы (1)


Я написал приведенный ниже пример, используя RxJS, чтобы мы могли запустить этот пример прямо здесь. Концепции идентичны RxJava.

A BufferedSubject:

class BufferedSubject extends Rx.Subject {
    constructor(predicate) {
        super();
        this.predicate = predicate;
        this.buffer = [];
        this.unbuffered = false;
    }

    onNext(e) {
        if (this.unbuffered) {
            super.onNext(e);
            return;
        }

        if (this.predicate(e)) {
            while (this.buffer.length) {
                super.onNext(this.buffer.shift());
            }

            super.onNext(e);
            this.unbuffered = true;
            return;
        }

        this.buffer.push(e);
    }
}

Вы можете использовать BufferedSubject, как обычный объект, в качестве посредника между некоторым наблюдаемым источником и желаемым буферизованным наблюдаемым - вы подписываетесь на объект, а субъект подписывается на наблюдаемый источник, при этом субъект поддерживает буфер элементов до тех пор, пока условие выполняется. По примеру:

const timer = Rx.Observable.interval(5000);
const subject = new BufferedSubject((item) => item > 5);
subject.subscribe(
    x => $('.list').append(`<li>${x} seconds</li>`)
);
timer.subscribe(subject);

В качестве исполняемого фрагмента (извините, скомпилированный из ES6):

'use strict';
function _classCallCheck(instance, Constructor) {
    if (!(instance instanceof Constructor)) {
        throw new TypeError('Cannot call a class as a function');
    }
}
function _possibleConstructorReturn(self, call) {
    if (!self) {
        throw new ReferenceError('this hasn\'t been initialised - super() hasn\'t been called');
    }
    return call && (typeof call === 'object' || typeof call === 'function') ? call : self;
}
function _inherits(subClass, superClass) {
    if (typeof superClass !== 'function' && superClass !== null) {
        throw new TypeError('Super expression must either be null or a function, not ' + typeof superClass);
    }
    subClass.prototype = Object.create(superClass && superClass.prototype, {
        constructor: {
            value: subClass,
            enumerable: false,
            writable: true,
            configurable: true
        }
    });
    if (superClass)
        Object.setPrototypeOf ? Object.setPrototypeOf(subClass, superClass) : subClass.__proto__ = superClass;
}
var BufferedSubject = function (_Rx$Subject) {
    _inherits(BufferedSubject, _Rx$Subject);
    function BufferedSubject(predicate) {
        _classCallCheck(this, BufferedSubject);
        var _this = _possibleConstructorReturn(this, _Rx$Subject.call(this));
        _this.predicate = predicate;
        _this.buffer = [];
        _this.unbuffered = false;
        return _this;
    }
    BufferedSubject.prototype.onNext = function onNext(e) {
        if (this.unbuffered) {
            _Rx$Subject.prototype.onNext.call(this, e);
            return;
        }
        if (this.predicate(e)) {
            while (this.buffer.length) {
                _Rx$Subject.prototype.onNext.call(this, this.buffer.shift());
            }
            _Rx$Subject.prototype.onNext.call(this, e);
            this.unbuffered = true;
            return;
        }
        this.buffer.push(e);
    };
    return BufferedSubject;
}(Rx.Subject);
var timer = Rx.Observable.interval(1000).skip(1).take(50).publish();
var subject = new BufferedSubject(function (item) {
    return item > 5;
});
subject.subscribe(function (x) {
    return $('.list').append('<li>' + x + ' seconds</li>');
}, function (e) {
    return $('.list').append('<li>Errror ' + e + '</li>');
}, function (_) {
    return $('.list').append('<li>Sequence complete</li>');
});
timer.subscribe(subject);
timer.subscribe(function (x) {
    return $('.timer').text(x + ' seconds elapsed');
});
timer.connect();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/4.0.7/rx.all.min.js"></script>
<script src="https://ajax.googleapis.com/ajax/libs/jquery/2.1.3/jquery.min.js"></script>

<p>Output (<span class='timer'>-</span>):</p>

<ul class="list">
</ul>

(Также доступно на CodePen.)

Примечание: я бы сказал, что эта реализация некорректно поддерживает противодавление.

person Whymarrh    schedule 11.02.2016
comment
Спасибо за ваш ответ. В вашем решении каждый элемент после элемента, выполняющего предикат, также должен выполнять предикат. То, что я искал, было единственным элементом, выполняющим предикат, после чего ничего не должно быть буферизировано. Однако адаптировать ваш код будет не так уж сложно. - person Kirill Rakhman; 12.02.2016
comment
Хороший улов, я это исправил! - person Whymarrh; 12.02.2016