spring webflux: чисто функциональный способ подключения адаптера websocket к серверу response-netty

Я не могу понять, как подключить WebSocketHandlerAdapter к сетевому серверу реактора.

Требования: я хочу запустить netty-сервер реактора и присоединить конечные точки http (REST) ​​и конечные точки websocket к одному и тому же серверу. Я просмотрел документацию и несколько примеров демонстрационного приложения, упомянутых в документации. Они показывают, как прикрепить HttpHandlerAdapter к HttpServer с помощью функции newHandler (). Но когда дело доходит до веб-сокетов, они снова переключаются на использование примеров загрузки Spring и аннотаций. Я не могу найти, как подключить веб-сокеты с помощью функциональных конечных точек.

Пожалуйста, укажите мне правильное направление, как это реализовать. 1. как мне подключить адаптер websocket к серверу netty? 2. Что использовать: HttpServer или TcpServer?

Примечание: 1. Я не использую пружинный ботинок. 2. Я не использую аннотации. 3. Пытаться достичь этого только с помощью функциональных конечных точек webflux.

Образец кода:

public HandlerMapping webSocketMapping() 
{
  Map<String, WebSocketHandler> map = new HashMap<>();
  map.put("/echo", new EchoTestingWebSocketHandler());
  SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
  mapping.setUrlMap(map);
  mapping.setOrder(-1);
  return mapping;
}
public WebSocketHandlerAdapter wsAdapter() 
{
  HandshakeWebSocketService wsService = new HandshakeWebSocketService(new ReactorNettyRequestUpgradeStrategy());
  return new WebSocketHandlerAdapter(wsService);
}

  protected void startServer(String host, int port) 
  {
    HttpServer server = HttpServer.create(host, port);
    server.newHandler(wsAdapter()).block();    //how do I attach the websocket adapter to the netty server
  }

person chirpi    schedule 29.12.2017    source источник


Ответы (2)


К сожалению, нет простого способа сделать это, не запустив весь SpringBootApplication. В противном случае вам нужно будет самостоятельно написать всю иерархию обработчиков Spring WebFlux. Рассмотрите возможность составления функциональной маршрутизации с помощью SpringBootApplication:

    @SpringBootApplication
    public class WebSocketApplication {

        public static void main(String[] args) {
            SpringApplication.run(WebSocketApplication.class, args);
        }


        @Bean
        public RouterFunction<ServerResponse> routing() {
            return route(
                    POST("/api/orders"),
                    r -> ok().build()
            );
        }

        @Bean
        public HandlerMapping wsHandlerMapping() {
            HashMap<String, WebSocketHandler> map = new HashMap<>();

            map.put("/ws", new WebSocketHandler() {
                @Override
                public Mono<Void> handle(WebSocketSession session) {
                    return session.send(
                            session.receive()
                                  .map(WebSocketMessage::getPayloadAsText)
                                  .map(tMessage -> "Response From Server: " + tMessage)
                                  .map(session::textMessage)
                    );
                }
            });

            SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
            mapping.setUrlMap(map);
            mapping.setOrder(-1);
            return mapping;
        }

        @Bean
        HandlerAdapter wsHandlerAdapter() {
            return new WebSocketHandlerAdapter();
        }
    }

В случае, если SpringBoot infra не так

вместо этого попробуйте рассмотреть прямое взаимодействие с ReactorNetty. Reactor Netty Обеспечивает довольно хорошую абстракцию вокруг родной Netty, и вы можете взаимодействовать с ней таким же функциональным образом:

ReactorHttpHandlerAdapter handler =
                    new ReactorHttpHandlerAdapter(yourHttpHandlers);

            HttpServer.create()
                      .startRouterAndAwait(routes -> {
                                  routes.ws("/pathToWs", (in, out) -> out.send(in.receive()))
                                        .file("/static/**", ...)
                                        .get("**", handler)
                                        .post("**", handler)
                                        .put("**", handler)
                                        .delete("**", handler);
                              }
                      );
person Oleh Dokuka    schedule 04.01.2018
comment
Спасибо Олег. Я пытаюсь интегрировать webflux в существующее приложение Spring, поэтому использование SpringBoot или аннотации не вариант. Я должен либо использовать контекст приложения конфигурации xml, либо использовать функциональную маршрутизацию. Я попробую ваше второе предложение. Не могли бы вы проработать эту строку: (in, out) - ›out.send (in.receive ())). Я предполагаю, что вы имеете в виду реактор websocketinbound и webcoketoutboud. Должен ли я реализовать какой-либо интерфейс реактора в моем текущем обработчике веб-сокетов? - person chirpi; 06.01.2018

Я так с этим справляюсь. и использовать родной реактор-netty

routes.get(rootPath, (req, resp)->{
        // doFilter check the error
        return this.doFilter(request, response, new RequestAttribute())
                .flatMap(requestAttribute -> {
                    WebSocketServerHandle handleObject = injector.getInstance(GameWsHandle.class);
                    return response
                        .header("content-type", "text/plain")
                        .sendWebsocket((in, out) ->
                            this.websocketPublisher3(in, out, handleObject, requestAttribute)
                        );
                });
    })
private Publisher<Void> websocketPublisher3(WebsocketInbound in, WebsocketOutbound out, WebSocketServerHandle handleObject, RequestAttribute requestAttribute) {
        return out
                .withConnection(conn -> {
                    // on connect
                    handleObject.onConnect(conn.channel());
                    conn.channel().attr(AttributeKey.valueOf("request-attribute")).set(requestAttribute);
                    conn.onDispose().subscribe(null, null, () -> {
                            conn.channel().close();
                            handleObject.disconnect(conn.channel());
                            // System.out.println("context.onClose() completed");
                        }
                    );
                    // get message
                    in.aggregateFrames()
                            .receiveFrames()
                            .map(frame -> {
                                if (frame instanceof TextWebSocketFrame) {
                                    handleObject.onTextMessage((TextWebSocketFrame) frame, conn.channel());
                                } else if (frame instanceof BinaryWebSocketFrame) {
                                    handleObject.onBinaryMessage((BinaryWebSocketFrame) frame, conn.channel());
                                } else if (frame instanceof PingWebSocketFrame) {
                                    handleObject.onPingMessage((PingWebSocketFrame) frame, conn.channel());
                                } else if (frame instanceof PongWebSocketFrame) {
                                    handleObject.onPongMessage((PongWebSocketFrame) frame, conn.channel());
                                } else if (frame instanceof CloseWebSocketFrame) {
                                    conn.channel().close();
                                    handleObject.disconnect(conn.channel());
                                }
                                return "";
                            })
                            .blockLast();
                });
    }
person Henry    schedule 13.03.2019