Создание наблюдаемого Rx.NET в концентраторе SignalR

У меня есть концентратор SignalR, который прослушивает запросы клиентов и использует Rx.NET для наблюдения за таблицей базы данных, чтобы отправлять обновления запрашивающему их клиенту, как только они становятся доступными. Но похоже, что экземпляр Observer, созданный в хабе по запросу клиента, уничтожается (GC?) сразу после завершения вызова метода; поэтому я не получаю обновлений.

Это моя текущая реализация Hub:

public class BookHub : Hub {
    private readonly BookService _service = new BookService();

    public void RequestBookUpdate(string author) {
        BookObserver observer = new BookObserver(Context.connectionId);
        IDisposable unsubscriber = _service.RequestBookUpdate(author, observer);
    }
}

BookService возвращает запрос LINQ, преобразованный в Observable:

public IDisposable RequestBookUpdate(string author, BookObserver observer) {
    var query = from b in db.Book where b.Author.Contains(author) select b;
    IObservable<Book> observable = query.ToObservable();
    IDisposable unsubscriber = observable.Subscribe(observer);
    return unsubscriber;
}

BookObserver просто отправляет новые извлеченные элементы конкретному клиенту, запросившему обновления (идентифицированному connectionId):

// omissis

private static readonly IHubContext _context = GlobalHost.ConnectionManager.GetHubContext<BookHub>();
private readonly string _connectionId;

public BookObserver(string connectionId) {
    connectionId = _connectionId:
}

public void OnNext(Book value) {
    _context.Clients.Client(_connectionId).foundNewBook(value);
}

Меня не волнует уничтожение экземпляра BookService, но я хочу, чтобы экземпляр BookObserver оставался в живых, поэтому я могу вызывать unsubscriber.Dispose() только тогда, когда клиент отключается. Это возможно?


person frapontillo    schedule 15.04.2014    source источник
comment
Было бы полезно иметь немного больше контекста. С какой целью вы хотите, чтобы Observer оставался неиспользованным до тех пор, пока соединение не прервется?   -  person cwharris    schedule 15.04.2014
comment
Я хотел бы прослушивать тот же SQL-запрос и получать обновления, как только будут зафиксированы новые данные. Поскольку данные фиксируются в базе данных из отдельного процесса (который я не могу контролировать), единственными решениями для этого будут либо опрос (как базы данных, так и веб-службы), либо наблюдение за IQueryable с помощью Rx.NET и отправка обновления через WebSocket.   -  person frapontillo    schedule 15.04.2014
comment
Создание запроса к IQueryable не гарантирует, что вы получите все будущие элементы. Для этого вам нужен Query Provider, способный предоставить будущие элементы. db, похоже, не поддерживает эту функцию.   -  person cwharris    schedule 15.04.2014
comment
Кроме того, возможно ли удалить эти элементы из базы данных, и собираетесь ли вы обрабатывать этот случай?   -  person cwharris    schedule 15.04.2014
comment
Удаление элемента в настоящее время не является одним из наших требований.   -  person frapontillo    schedule 15.04.2014
comment
Какого типа db? (Какой тип базы данных вы используете)   -  person cwharris    schedule 15.04.2014
comment
db — это DbContext, в настоящее время использующий версию 4 Entity Framework. Экземпляр базы данных — Microsoft SQL Server 2012.   -  person frapontillo    schedule 15.04.2014
comment
Каково время жизни экземпляра db? Если экземпляр db удаляется, его потребители запросов также будут очищены (или завершатся ошибкой).   -  person cwharris    schedule 15.04.2014
comment
Срок жизни db привязан только к BookService, который создает его с помощью MyDatabaseEntities db = new MyDatabaseEntities();. Конечно, MyDatabasEntities автоматически создается Entity Framework и расширяет DbContext.   -  person frapontillo    schedule 15.04.2014
comment
Похоже, у вас сложилось впечатление, что .ToObservable() превратит один запрос к базе данных в нечто, которое будет повторно запрашивать базу данных для изменений. Это не так. Вам нужно создать запрос на основе Observable.Interval и сделать повторные вызовы базы данных самостоятельно, чтобы получить новые записи, а затем вернуть их. Ваша наблюдаемая подпись должна иметь вид IObservable<IEnumerable<Book>>, так как каждый вызов базы данных будет возвращать ноль или более книг.   -  person Enigmativity    schedule 16.04.2014
comment
Вы правы, я неверно истолковал этот пост. То, что я хочу, по-прежнему достижимо с помощью комбинации -framework-to-observe-changes-to-a-table?forum=rx" rel="nofollow noreferrer">OnChangeObservable и SqlCommand. Кто-нибудь когда-нибудь использовал его?   -  person frapontillo    schedule 18.04.2014


Ответы (1)


Observer автоматически удаляется при получении вызова OnComplete. На самом деле это очень хороший шаблон, поскольку он означает, что вам не нужно вручную удалять Subscription следующим образом:

Observable.Range(0, 100)
    .Subscribe(...);

or

Observable.Interval(TimeSpan.FromSeconds(1))
    .Take(10)
    .Subscribe();

Таким образом, чтобы убедиться, что ваш наблюдатель не будет удален, пока он не захочет быть удаленным, вы можете объединить еще один пустой, никогда не завершающийся, наблюдаемый для вашего источника.

IObservable<Book> observable = query.ToObservable()
    .Concat(Observable.Never<Book>());

Однако, в зависимости от того, что вы пытаетесь сделать, может быть лучше обработать это в другом месте, например, на клиенте.

person cwharris    schedule 15.04.2014
comment
Весь смысл в том, чтобы сделать что-то вроде непрерывного запроса (в данном случае наблюдаемого), который уведомляет наблюдателя, как только некоторые данные изменяются. Таким образом, наблюдатель может уведомить и обновить клиента через SignalR. Сила RX (и WebSockets) теряется, если наблюдатель автоматически удаляется, когда возвращаются только обычные запрашиваемые данные; вы бы не согласились? - person frapontillo; 15.04.2014
comment
Если Observable завершается, это означает, что в вашем запросе больше нет доступных элементов. Проблема не в Observable или Observer, а в производителе. В этом случае производителем является база данных, и база данных передает вам только существующие элементы, а не будущие элементы. - person cwharris; 15.04.2014
comment
Вы в этом уверены? Потому что выполнение того же метода из обычной службы Rest и добавление строки в базу данных приводит к вызову OnNext. - person frapontillo; 15.04.2014
comment
Можете ли вы привести сравнительный пример кода Rest в вопросе? - person cwharris; 15.04.2014
comment
Это точно такой же вызов того же BookService, но он сделан из ApiController. Конечно, это было сделано только для тестирования, так как новые результаты не могли быть переданы клиентам :-) - person frapontillo; 15.04.2014
comment
Я обновил исходный вопрос некоторым кодом из файла ApiController. - person frapontillo; 15.04.2014
comment
Для записи причина, по которой наблюдаемое завершается, заключается в том, что EntityFramework не поддерживает непрерывные запросы. Что происходит в вашем коде, так это то, что IQueryable преобразуется в IObservable, но базовый поиск данных не меняется. Он возвращает только существующие записи. Таким образом, Rx делает именно то, для чего он предназначен, и то, что он должен делать. Entity Framework просто не предоставляет ожидаемую функциональность. - person cwharris; 25.05.2018