Использование IObservable (Rx) в качестве замены INotifyCollectionChanged для MVVM?

Я изучал использование Rx в рамках MVVM. Идея состоит в том, чтобы использовать «живые» запросы LINQ к наборам данных в памяти для проецирования данных в модели представления для привязки.

Ранее это было возможно с использованием INotifyPropertyChanged/INotifyCollectionChanged и библиотеки с открытым исходным кодом под названием CLINQ. Потенциал Rx и IObservable заключается в переходе к гораздо более декларативной ViewModel с использованием классов Subject для распространения измененных событий из исходной модели в представление. Для последнего шага потребуется преобразование IObservable в обычные интерфейсы привязки данных.

Проблема в том, что Rx не поддерживает уведомление об удалении объекта из потока. Пример ниже.
В коде показан POCO, в котором для состояния поля используется класс BehaviorSubject. Далее код создает коллекцию этих сущностей и использует Concat для объединения потоков фильтров. Это означает, что о любых изменениях в POCO сообщается в один поток.

Фильтр для этого потока настроен на фильтрацию для рейтинга == 0. Подписка просто выводит результат в окно отладки, когда происходит даже.

Настройки Rating=0 для любого элемента вызовут событие. Но установка рейтинга обратно на 5 не приведет ни к каким событиям.

В случае CLINQ вывод запроса будет поддерживать INotifyCollectionChanged, так что элементы, добавленные и удаленные из результата запроса, вызовут правильное событие, указывающее, что результат запроса изменился (элемент добавлен или удален).

Единственный способ, которым я могу придумать адрес, - настроить два потока с противоположными (двойными) запросами. Элемент, добавленный в противоположный поток, подразумевает удаление из набора результатов. В противном случае я мог бы просто использовать FromEvent и не делать ни одну из моделей сущностей наблюдаемой, что делает Rx больше просто агрегатором событий. Любые указатели?

using System;
using System.ComponentModel;
using System.Linq;
using System.Collections.Generic;

namespace RxTest
{

    public class TestEntity : Subject<TestEntity>, INotifyPropertyChanged
    {
        public IObservable<string> FileObservable { get; set; }
        public IObservable<int> RatingObservable { get; set; }

        public string File
        {
            get { return FileObservable.First(); }
            set { (FileObservable as IObserver<string>).OnNext(value); }
        }

        public int Rating
        {
            get { return RatingObservable.First(); }
            set { (RatingObservable as IObserver<int>).OnNext(value); }
        }

        public event PropertyChangedEventHandler PropertyChanged;

        public TestEntity()
        {
            this.FileObservable = new BehaviorSubject<string>(string.Empty);
            this.RatingObservable = new BehaviorSubject<int>(0);
            this.FileObservable.Subscribe(f => { OnNotifyPropertyChanged("File"); });
            this.RatingObservable.Subscribe(f => { OnNotifyPropertyChanged("Rating"); });
        }

        private void OnNotifyPropertyChanged(string property)
        {
            if (PropertyChanged != null) PropertyChanged(this, new PropertyChangedEventArgs(property));
            // update the class Observable
            OnNext(this);
        }

    }

    public class TestModel
    {
        private List<TestEntity> collection { get; set; }
        private IDisposable sub;

        public TestModel()
        {
            this.collection = new List<TestEntity>() {
            new TestEntity() { File = "MySong.mp3", Rating = 5 },
            new TestEntity() { File = "Heart.mp3", Rating = 5 },
            new TestEntity() { File = "KarmaPolice.mp3", Rating = 5 }};

            var observableCollection = Observable.Concat<TestEntity>(this.collection.Cast<IObservable<TestEntity>>());
            var filteredCollection = from entity in observableCollection
                                     where entity.Rating==0
                                     select entity;
            this.sub = filteredCollection.Subscribe(entity =>
                {
                    System.Diagnostics.Debug.WriteLine("Added :" + entity.File);
                }
            );
            this.collection[0].Rating = 0;
            this.collection[0].Rating = 5;
        }
    };
}

person Joe Wood    schedule 18.01.2011    source источник
comment
Проблема в том, что Rx, похоже, не поддерживает уведомление об удалении объекта из потока — это потому, что IObservable не представляет постоянную коллекцию, а только асинхронный поток значений.   -  person Richard Szalay    schedule 18.01.2011


Ответы (5)


На самом деле я нашел библиотеку Reactive-UI полезной для этого (доступна в NuGet). Эта библиотека включает специальные темы IObservable для коллекций и средство для создания одной из этих «ReactiveCollections» поверх традиционной коллекции INCC. Благодаря этому у меня есть потоки для новых, удаленных элементов и изменяющихся элементов в коллекции. Затем я использую Zip, чтобы объединить потоки вместе и изменить целевую наблюдаемую коллекцию ViewModel. Это обеспечивает динамическую проекцию на основе запроса к исходной модели.

Следующий код решил проблему (этот код был бы еще проще, но есть некоторые проблемы с версией Reactive-UI для Silverlight, требующие обходных путей). Код запускает события изменения коллекции, просто настраивая значение «Рейтинг» в одном из элементов коллекции:

using System;
using System.ComponentModel;
using System.Linq;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Collections.Specialized;
using ReactiveUI;

namespace RxTest
{

    public class TestEntity :  ReactiveObject, INotifyPropertyChanged, INotifyPropertyChanging
    {
        public string _File;
        public int _Rating = 0;
        public string File
        {
            get { return _File; }
            set { this.RaiseAndSetIfChanged(x => x.File, value); }
        }

        public int Rating
        {
            get { return this._Rating; }
            set { this.RaiseAndSetIfChanged(x => x.Rating, value); }
        }

        public TestEntity()
        {
        }
    }

    public class TestModel
    {
        private IEnumerable<TestEntity> collection { get; set; }
        private IDisposable sub;

        public TestModel()
        {
            this.collection = new ObservableCollection<TestEntity>() {
            new TestEntity() { File = "MySong.mp3", Rating = 5 },
            new TestEntity() { File = "Heart.mp3", Rating = 5 },
            new TestEntity() { File = "KarmaPolice.mp3", Rating = 5 }};

            var filter = new Func<int, bool>( Rating => (Rating == 0));

            var target = new ObservableCollection<TestEntity>();
            target.CollectionChanged += new NotifyCollectionChangedEventHandler(target_CollectionChanged);
            var react = new ReactiveCollection<TestEntity>(this.collection);
            react.ChangeTrackingEnabled = true;

            // update the target projection collection if an item is added
            react.ItemsAdded.Subscribe( v => { if (filter.Invoke(v.Rating)) target.Add(v); } );
            // update the target projection collection if an item is removed (and it was in the target)
            react.ItemsRemoved.Subscribe(v => { if (filter.Invoke(v.Rating) && target.Contains(v)) target.Remove(v); });

            // track items changed in the collection.  Filter only if the property "Rating" changes
            var ratingChangingStream = react.ItemChanging.Where(i => i.PropertyName == "Rating").Select(i => new { Rating = i.Sender.Rating, Entity = i.Sender });
            var ratingChangedStream = react.ItemChanged.Where(i => i.PropertyName == "Rating").Select(i => new { Rating = i.Sender.Rating, Entity = i.Sender });
            // pair the two streams together for before and after the entity has changed.  Make changes to the target
            Observable.Zip(ratingChangingStream,ratingChangedStream, 
                (changingItem, changedItem) => new { ChangingRating=(int)changingItem.Rating, ChangedRating=(int)changedItem.Rating, Entity=changedItem.Entity})
                .Subscribe(v => { 
                    if (filter.Invoke(v.ChangingRating) && (!filter.Invoke(v.ChangedRating))) target.Remove(v.Entity);
                    if ((!filter.Invoke(v.ChangingRating)) && filter.Invoke(v.ChangedRating)) target.Add(v.Entity);
                });

            // should fire CollectionChanged Add in the target view model collection
            this.collection.ElementAt(0).Rating = 0;
            // should fire CollectionChanged Remove in the target view model collection
            this.collection.ElementAt(0).Rating = 5;
        }

        void target_CollectionChanged(object sender, NotifyCollectionChangedEventArgs e)
        {
            System.Diagnostics.Debug.WriteLine(e.Action);
        }
    }
}
person Joe Wood    schedule 20.01.2011
comment
Классное использование RxUI! Я заметил одну вещь: ReactiveCollection не всегда является производной коллекцией, это подкласс ObservableCollection, поэтому вы можете просто использовать его напрямую. - person Ana Betts; 20.01.2011
comment
Спасибо, Пол. Заметил пару ошибок, которые, я думаю, специфичны для Silverlight. Свойство .Value не заполняется из ReactiveObject для ItemChanging/Changed (для него установлено значение NULL). У меня также были проблемы с тем, чтобы ReactiveCollection отслеживал изменения в обычных объектах INPC — использование ReactiveObject исправило это. - person Joe Wood; 20.01.2011
comment
Это по причинам производительности - ItemChanging.Value() даст вам поток значений - person Ana Betts; 25.01.2011

Что не так с использованием ObservableCollection<T>? Rx — это очень простой фреймворк, которым можно злоупотреблять; Я считаю, что если вы обнаружите, что боретесь с основной предпосылкой асинхронного потока, вам, вероятно, не следует использовать Rx для этой конкретной проблемы.

person Richard Szalay    schedule 18.01.2011
comment
Rx идеально подходит для распространения изменений из модели в ViewModel через View. Функции Rx, такие как маршалирование потоков, объединение и т. д., делают его идеальным. - person Joe Wood; 18.01.2011
comment
Основываясь на опыте (я использовал Rx в производственном приложении WPF), я бы рекомендовал рассматривать свойства (INotifyPropertyChanged) ViewModel как пользовательский интерфейс, поскольку их не следует изменять из фонового потока. - person Richard Szalay; 18.01.2011
comment
Функции в Rx, такие как маршалирование потоков, объединение, субъекты и т. д., делают его идеальным. Просто использование Rx для самих событий ограничивает это использование и означает поддержку двух парадигм в вашем коде. Я думаю, что основная проблема здесь в том, что IObservable не подходит для коллекций, а только для событий из коллекции. Я все еще думаю, что общее решение возможно, если поток событий из коллекции «заархивирован» с потоком concat из содержимого коллекции. - person Joe Wood; 18.01.2011
comment
Ричард – В большинстве случаев я соглашусь. Но, к сожалению, это не масштабируется при больших или частых изменениях модели. Диспетчер переполнен, и пользовательский интерфейс перестает отвечать на запросы. В этом привлекательность Rx и некоторых функций потока событий. - person Joe Wood; 18.01.2011

Все реализации INPC, которые я когда-либо видел, лучше всего назвать ярлыками или хаками. Тем не менее, я не могу винить разработчиков, поскольку механизм INPC, который разработчики .NET выбрали для поддержки, ужасен. С учетом вышесказанного я недавно обнаружил, на мой взгляд, лучшую реализацию INPC и лучшее дополнение к любой существующей среде MVVM. В дополнение к предоставлению десятков чрезвычайно полезных функций и расширений, он также поддерживает самый элегантный шаблон INPC, который я когда-либо видел. Он чем-то напоминает фреймворк ReactiveUI, но не задумывался как комплексная платформа MVVM. Чтобы создать ViewModel, поддерживающую INPC, не требуется базового класса или интерфейсов, yes по-прежнему может поддерживать полное уведомление об изменении и двустороннюю привязку, и, что самое приятное, все ваши свойства могут быть автоматическими!

Он НЕ использует такие утилиты, как PostSharp или NotifyPropertyWeaver, а построен на базе Reactive Extensions. Имя этой новой платформы — ReactiveProperty. Я предлагаю посетить сайт проекта (на codeplex) и скачать пакет NuGet. Кроме того, просматривает исходный код, потому что это действительно удовольствие.

Я никоим образом не связан с разработчиком, и проект еще достаточно новый. Я просто в восторге от функций, которые он предлагает.

person Dylan Vester    schedule 08.01.2012
comment
ReactiveProperty выглядит круто, но ни один из включенных примеров не использует коллекцию ViewModels или представление master-details, поэтому неясно, как эта библиотека применима к этому вопросу (и полезна ли она в реальном мире, где нам часто нужен пользовательский интерфейс). для редактирования набора объектов). - person Qwertie; 08.02.2012
comment
Библиотека несколько новая, поэтому отсутствие документации понятно. По общему признанию, когда я разместил свой ответ здесь, я просто собирал стек Silverlight/Xaml. После долгих исследований других методов я вернулся в библиотеку и все еще согласен со своим первоначальным постом. Который большую часть времени критиковал реализацию INPC, но все еще в рамках этого обсуждения. Глядя на исходный код, существует тип под названием ReactiveCollection, который должен напрямую связать и укрепить мои мысли относительно OP. - person Dylan Vester; 18.02.2012

На мой взгляд, это неподходящее использование Rx. Rx Observable — это поток «событий», на который вы можете подписаться. Вы можете реагировать на эти события в своей модели представления, например, добавляя их в ObservableCollection, который привязан к вашему представлению. Однако Observable нельзя использовать для представления фиксированного набора элементов, из которых вы добавляете/удаляете элементы.

person ColinE    schedule 18.01.2011
comment
Нет, но смысл ObservableCollection в том, что он предоставляет несколько субъектов, которые представляют операции, которые вы можете выполнять с коллекцией. Это очень элегантное решение. - person DanH; 24.06.2011

Проблема в том, что вы просматриваете уведомления из списка TestEntity, а не из самих TestEntity. Таким образом, вы видите добавления, но не изменения в любом TestEntity. Чтобы увидеть этот комментарий:

        if (PropertyChanged != null) PropertyChanged(this, new PropertyChangedEventArgs(property));

и вы увидите, что программа работает так же! Ваши уведомления в ваших TestEntity ни к чему не привязаны. Как утверждают другие, использование ObservableCollection добавит вам эту проводку.

person jyoung    schedule 18.01.2011
comment
К вашему сведению, вы всегда должны назначать событие локальной переменной, прежде чем ее вызывать. В противном случае вы можете столкнуться с состоянием гонки, которое может вызвать исключение NullReferenceException. - person Richard Szalay; 18.01.2011
comment
Согласен, просто пытаюсь сделать код простым (хотя поддержка INPC на самом деле не требуется). - person Joe Wood; 18.01.2011