Ошибка компиляции RxJava в следующем случае

Я хотел бы попробовать очень простой код RxJava, например следующий

 int[] test = {1,2,3,4};
 Observable<Integer> findAverage = Observable.fromArray(test);
 averageInteger(findAverage).subscribe(System.out::println); 

Сначала я встретил ошибку компиляции (несовместимые границы) для Observeable.fromArray (тест),

Во-вторых, кажется, что severageInteger больше не может быть найден. Я использую версию RxJava 2.0.8.


person Adam Lee    schedule 22.04.2017    source источник
comment
Вам нужно как минимум Integer[] test = {1, 2, 3, 4};, потому что примитивные массивы не поддерживаются самой RxJava.   -  person akarnokd    schedule 22.04.2017


Ответы (1)


Это из-за неправильного типа findAverage:

Так должно быть:

Observable<int[]> observable = Observable.fromArray(array);

Полный пример со следующим пакетом Gradle: 'com.github.akarnokd:rxjava2-extensions:0.16.4'

List<Integer> integers = Arrays.asList(1, 2, 3, 4);

Observable<Integer> integerObservable = Observable.fromIterable(integers);

Observable<Double> doubleObservable = MathObservable.averageDouble(integerObservable);

doubleObservable.subscribe(System.out::println);

Реализовать как оператор:

@Test
public void averageDoubleOperator() throws Exception {
    List<Integer> integers = Arrays.asList(1, 2, 3, 4);

    Observable<Integer> integerObservable = Observable.fromIterable(integers);

    integerObservable.lift(new AvgOperator())
            .test()
            .assertResult(2.5);
}

@Test
public void averageDoubleOperator_empty() throws Exception {
    List<Integer> integers = Collections.emptyList();

    Observable<Integer> integerObservable = Observable.fromIterable(integers);

    integerObservable.lift(new AvgOperator())
            .test()
            .assertTerminated()
            .assertError(IllegalStateException.class);
}

class Avg {
    long count;
    int sum;

    Avg(long count, int sum) {
        this.count = count;
        this.sum = sum;
    }
}

class AvgOperator implements ObservableOperator<Double, Integer> {
    @Override
    public Observer<? super Integer> apply(Observer<? super Double> observer) throws Exception {
        return new Operation(observer);
    }

    class Operation implements Observer<Integer> {
        private final Observer<? super Double> observer;
        private Subscription s;
        private long sum;
        private int count;

        Operation(Observer<? super Double> observer) {
            this.observer = observer;
        }

        @Override
        public void onSubscribe(Disposable d) {
            observer.onSubscribe(d);
        }

        @Override
        public void onNext(Integer integer) {
            ++count;
            sum = sum + integer;
        }

        @Override
        public void onError(Throwable t) {
            this.observer.onError(t);
        }

        @Override
        public void onComplete() {
            if (count == 0) {
                this.onError(new IllegalStateException("Average is not defined for 0 values."));
                return;
            }
            this.observer.onNext(sum / (double) count);
            this.observer.onComplete();
        }
    }
}
person Hans Wurst    schedule 22.04.2017
comment
Похоже, MathObservable взят из rxjava-math, который не поддерживается rxjava2. - person Adam Lee; 23.04.2017
comment
Как указано, MathObservable взят из расширений rxjava2, потому что он не поддерживается в собственном rxjava2. Вы можете скопировать реализацию MathObservable или написать свою собственную, используя оператор сокращения. - person Hans Wurst; 23.04.2017