Как написать клиент RSocket на JavaScript

Я пытаюсь реализовать сервер RSocket на Java и клиент на JavaScript, но не могу вызвать ни один из методов в своем бэкэнде.

Java-сервер

public final class RawServer {
    public static void main(String[] args) {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(new DefaultSimpleService()))
                .transport(WebsocketServerTransport.create("localhost", 8801))
                .start()
                .block()
                .onClose()
                .block();
    }

    private static final class DefaultSimpleService extends AbstractRSocket {
        private ObjectMapper jsonMapper = new ObjectMapper();

        @Override
        public Flux<Payload> requestStream(Payload payload) {
            return Mono.just(payload.getDataUtf8())
                    .map(json -> {
                        try {
                            return jsonMapper.readValue(json, Message.class);
                        } catch (IOException e) {
                            e.printStackTrace();
                            return null;
                        }
                    })
                    .doOnNext(msg -> System.out.println("got message " + msg.message))
                    .flatMapMany(msg -> Flux.range(0, 5)
                            .map(count -> msg.message + " #" + count))
                    .map(message -> DefaultPayload.create(message));
        }
    }
}

public class Message {

    public final String message;

    @JsonCreator
    public Message(@JsonProperty("message") String message) {
        this.message = message;
    }
}

Клиент JavaScript

    import { RSocketClient, JsonSerializers } from "rsocket-core";
    import RSocketWebSocketClient from "rsocket-websocket-client";

    const transport = new RSocketWebSocketClient({
        url: "ws://localhost:8801"
      });

      const client = new RSocketClient({
        // send/receive JSON objects instead of strings/buffers
        serializers: JsonSerializers,
        setup: {
          // ms btw sending keepalive to server
          keepAlive: 60000,
          // ms timeout if no keepalive response
          lifetime: 180000,
          // format of `data`
          dataMimeType: "application/json",
          // format of `metadata`
          metadataMimeType: "application/json"
        },
        transport
      });
      client.connect().subscribe({
        onComplete: socket => {
          socket.requestStream({
            data: { message: "hello from javascript!" },
            metadata: null
          });
        },
        onError: error => {
          console.log("got error");
          console.error(error);
        },
        onSubscribe: cancel => {
          /* call cancel() to abort */
          console.log("subscribe!");
          console.log(cancel);
          // cancel.cancel();
        }
      });

Похоже, что соединение WebSocket установлено, но на сервер не отправляется сообщение. Как я могу это сделать?

Я также реализовал клиентскую часть на Java, и она отлично работает. Я нашел пример JavaScript: https://github.com/rsocket/rsocket-js/blob/master/docs/01-client-configuration.md, но я не могу заставить его работать.


person kojot    schedule 18.09.2019    source источник


Ответы (1)


Дополнительные примеры с RSocket можно найти в моем личном блоге http://kojotdev.com/2019/09/rsocket-examples-java-javascript-spring-webflux/

Хорошо, я понял это. Во-первых, нам нужно исправить наш сервер, чтобы он возвращал правильный объект JSON.

@Override
public Flux<Payload> requestStream(Payload payload) {
    log.info("got requestStream in Server");
    log.info(payload.getDataUtf8());
    return Mono.just(payload.getDataUtf8())
            .map(payloadString -> MessageMapper.jsonToMessage(payloadString))
            .flatMapMany(msg -> Flux.range(0, 5)
                    .map(count -> msg.message + " | requestStream from Server #" + count)
                    .map(responseText -> new Message(responseText))
                    .map(responseMessage -> MessageMapper.messageToJson(responseMessage)))
            .map(message -> DefaultPayload.create(message));
}

Затем в нашем клиенте JavaScript нам нужно изменить метод socket.requestStream на это:

socket
  .requestStream({
    data: { message: "request - stream from javascript!" },
    metadata: ""
  })
  .subscribe({
    onComplete: () => console.log("requestStream done"),
    onError: error => {
      console.log("got error with requestStream");
      console.error(error);
    },
    onNext: value => {
      // console.log("got next value in requestStream..");
      console.log(value.data);
    },
    // Nothing happens until `request(n)` is called
    onSubscribe: sub => {
      console.log("subscribe request Stream!");
      sub.request(7);
    }
  });

Все остальное остается как в предыдущем примере. Полезные ссылки:

person kojot    schedule 20.09.2019
comment
Почему, если я выполню этот код дважды, он даст мне понять, что клиент уже подключен? - person Phate; 03.05.2020
comment
Потому что, вероятно, вы вызываете client.connect() дважды. В событии onComplete для client.connect() сохраните socket в глобальной переменной. Затем просто вызовите такие методы, как socket.requestResponse() и т. д. - person kojot; 04.05.2020