Клиент RSocket Js не получает элементы, созданные с сервера Spring Boot RSocket

Пример проекта доступен на Github: https://github.com/codependent/rsocket-rating-service < / а>

Отображение сообщений сервера Spring Boot RSocket ожидает запрос requestResponse, возвращающий простой POJO:

    @MessageMapping("request-rating")
    fun getRatingWebSocket(ratingRequest: RatingRequest): Mono<Rating> {
        return Mono.just(Rating(ratingRequest.songId, (0..10).random())).log()
                .doOnNext {
                    logger.info("Next {}", it)
                }
                .doOnCancel {
                    logger.info("Cancel")
                }
                .doOnSuccess {
                    logger.info("Success {}", it)
                }
                .doOnError { throwable ->
                    logger.error("Error {}", throwable)
                }
                .doOnTerminate { logger.info("Terminate") }
    }

На стороне клиента у меня есть следующий JS-код для подключения к серверу RSocket и запроса значения:

const {
    RSocketClient,
    JsonSerializer,
    IdentitySerializer,
} = require('rsocket-core');
const RSocketWebSocketClient = require('rsocket-websocket-client').default;
const route = 'request-rating';
let client = undefined;
let rSocket = undefined;

function main() {
    if (client !== undefined) {
        client.close();
    }
    client = new RSocketClient({
        serializers: {
            data: JsonSerializer,
            metadata: IdentitySerializer
        },
        setup: {
            // ms btw sending keepalive to server
            keepAlive: 60000,
            // ms timeout if no keepalive response
            lifetime: 180000,
            // format of `data`
            dataMimeType: 'application/json',
            // format of `metadata`
            metadataMimeType: 'message/x.rsocket.routing.v0',
        },
        transport: new RSocketWebSocketClient({
            url: 'ws://localhost:8080/rating-ws'
        }),
    });

    // Open the connection
    client.connect().subscribe({
        onComplete: socket => {
            // socket provides the rsocket interactions fire/forget, request/response,
            // request/stream, etc as well as methods to close the socket.
            rSocket = socket;
        },
        onError: error => {
            console.log("Connection has been refused due to ", error);
        },
        onSubscribe: cancel => {
            /* call cancel() to abort */
        }
    });
    document.getElementById('sendButton').addEventListener('click', requestRating);
}

function requestRating() {
    rSocket.requestResponse({
        data: {
            'songId': document.getElementById("songId").value
        },
        metadata: String.fromCharCode(route.length) + route
    }).subscribe({
        onComplete: () => {
            console.log('Complete')
        },
        onError: error => {
            console.log("Connection has been closed due to " + error);
        },
        onNext: payload => {
            console.log(payload.data);
        },
        onSubscribe: subscription => {
            //subscription.request(1)
            console.log("Subscribed")
        }
    });
}

document.addEventListener('DOMContentLoaded', main);

index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Rating Service</title>
    <link href="/webjars/bootstrap/css/bootstrap.min.css" rel="stylesheet">
    <script src="/webjars/jquery/jquery.min.js"></script>
    <script src="/webjars/stomp-websocket/stomp.min.js"></script>
    <script src="bundle.js"></script>
</head>
<body>
<div>Request your rating</div>
<div>
    <label> ClientId:
        <input id="songId" type="text" name="songId"/>
    </label>
</div>
<div><input id="sendButton" type="button" name="send" value="Send"/></div>
</body>
</html>

После доступа к http://localhost:8080/index.html введите текст и нажмите кнопку отправки.

Запрос попадает на сервер, который регистрирует правильную генерацию значения onNext:

[ctor-http-nio-6] reactor.Mono.Just.1                      : | onSubscribe([Synchronous Fuseable] Operators.ScalarSubscription)
[ctor-http-nio-6] reactor.Mono.Just.1                      : | request(1)
[ctor-http-nio-6] reactor.Mono.Just.1                      : | onNext(Rating(songId=asdfas, value=0))
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Next Rating(songId=asdfas, value=0)
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Success Rating(songId=asdfas, value=0)
[ctor-http-nio-6] c.c.r.r.c.RatingServiceRestController    : Terminate
[ctor-http-nio-6] reactor.Mono.Just.1                      : | onComplete()

Однако на стороне клиента

        onNext: payload => {
            console.log(payload.data);
        },

никогда не вызывается, в журналах браузера отображаются только:

Subscribed
Complete

Почему он не получает сгенерированное значение с сервера?


person codependent    schedule 31.05.2020    source источник


Ответы (1)


В RSocket JS, в отличие от Reactive Streams, сигнал onComplete получает данные в режиме взаимодействия requestResponse.

Похоже, это не соответствует спецификации протокола (https://rsocket.io/about/protocol ):

Both (C)omplete and (N)ext set meaning PAYLOAD contains data and signals stream completion.
    For example: An Observable stream receiving onNext(payload) followed by onComplete().
Just (C)omplete set meaning PAYLOAD contains no data and only signals stream completion.
    For example: An Observable stream receiving onComplete().
Just (N)ext set meaning PAYLOAD contains data stream is NOT completed.
    For example: An Observable stream receiving onNext(payload).

Чтобы обойти это, мне пришлось изменить функцию обратного вызова на это:

        onComplete: completeData => {
            console.log('Complete '+ completeData.data)
        },
person codependent    schedule 31.05.2020