WebSocket отправляет и получает данные асинхронно (используя веб-клиент NuGet)

Я борюсь с шаблоном наблюдателя с пакетом NuGet websocket-client (https://github.com/Marfusios/websocket-client)

Соединение с сервером WebSocket стабильное и работает.

Каждый запрос имеет идентификатор запроса внутри полезной нагрузки. Клиент отправляет его на сервер, а сервер отвечает идентификатором и реальными данными.

На стороне клиента мне нужно назначить каждый ответ на соответствующий запрос.

Я думал, что могу сделать это так:

public Task<Data> GetDataAsync()
{
    var webSocket = new WebsocketClient(Uri);
    await webSocket.Start();

    var requestId = Guid.NewGuid();

    var tcs = new TaskCompletionSource<Data>();

    var disposable = webSocket
    .MessageReceived
    .Where(message => message.Text.Contains(requestId))
    .Subscribe(message=>
    {
        var data = ParseData(message.Text);
        tcs.SetResult(data);
    });

    return tcs.Task;
}

Но на самом деле он никогда не переходит в метод подписки. Я использую это неправильно?


person BlackMatrix    schedule 13.02.2021    source источник


Ответы (2)


Я думаю

public Task<Data> GetDataAsync(string request)
{
    var requestId = Guid.NewGuid().ToString();

    var responseTask = WebSocket
    .MessageReceived
    .Timeout(TimeSpan.FromSeconds(5))
    .FirstOrDefaultAsync(message => message.Text.Contains(requestId));

    WebSocket.Send(request);

    var responseMessage = await responseTask;

    return ParseMessage(responseMessage);
}

это путь. Я бы даже предпочел SingleOrDefaultAsync вместо FirstOrDefaultAsync, потому что будет только одно сообщение с этим идентификатором запроса. Но это не работает. Он всегда запускается в тайм-аут.

person BlackMatrix    schedule 16.02.2021
comment
Обратите внимание, что вы внесли изменения в способ запуска кода из вашего вопроса. В вопросе не было вызова WebSocket.Send(request);, а вы делаете Guid.NewGuid().ToString();, но только Guid.NewGuid();. Send, безусловно, меняет то, как я представил бы свой ответ. - person Enigmativity; 17.02.2021
comment
Так является ли правильной реализацией подписка на одно специальное сообщение (предикат по идентификатору запроса) и удаление его сразу после анализа информации? Сделать это асинхронно и не влиять на других слушателей (отправить и получить ответ)? - person BlackMatrix; 17.02.2021
comment
Часто нет такой вещи, как правильная реализация. Буду ли я использовать этот? Нет. Мне это не нравится. - person Enigmativity; 18.02.2021
comment
Любые рекомендации, чтобы сделать его лучше? У меня все еще есть проблемы с выбором этого специального сообщения с идентификатором запроса из сотен других сообщений, чтобы получить соответствующий ответ во время работы WebSocket. Лучше всего было бы не воздействовать на других подписчиков или лучше всего, если бы другим подписчикам даже не нужно было больше проверять сообщение, когда его уже выбрал один подписчик. - person BlackMatrix; 18.02.2021

Вы сделали свой код намного сложнее, чем мне нужно. Rx давайте подождем наблюдаемого, чтобы получить последнее произведенное значение. Вы можете написать свой код следующим образом:

public async Task<Data> GetDataAsync() =>
    await
        Observable
            .Using(
                () => new WebsocketClient(Uri),
                ws =>
                    from x in Observable.FromAsync(() => ws.Start())
                    let requestId = Guid.NewGuid()
                    from m in ws.MessageReceived
                    where m.Text.Contains(requestId)
                    select ParseData(m.Text))
            .Take(1)
            .Timeout(TimeSpan.FromSeconds(5.0));
person Enigmativity    schedule 15.02.2021
comment
Мне это не кажется интуитивным. Нужен ли мне блок Observable.Using, если мой WebSocket стабилен и работает и имеет его в качестве ссылки в качестве частного поля? - person BlackMatrix; 15.02.2021
comment
Я надеялся, что var requestId = Guid.NewGuid().ToString(); var responseMessageTask = Socket .MessageReceived .SingleOrDefaultAsync(message => message.Text.Contains(requestId)); Socket.Send(requestMessage); var responseMessage = await responseMessageTask; // Parse responseMessage мог бы сработать. Но это не так. - person BlackMatrix; 15.02.2021
comment
@BlackMatrix - ваш существующий код создавал Websocket, поэтому я включил Observable.Using. Это просто прямой перевод вашего кода. - person Enigmativity; 16.02.2021
comment
Я дал ответ с новой реализацией. - person BlackMatrix; 16.02.2021