Сервер не получает данные в случае RSocket Fire and Forget

У меня следующий сервер RSocket

@Log4j2
public class NativeRsocketServerFnF {
  public static void main(String[] args) {
    RSocketFactory.receive()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .acceptor((setup, clientHandleRsocket) -> {
          return Mono.just(
              new AbstractRSocket() {
                @Override
                public Mono<Void> fireAndForget(Payload payload) {
                  CharSequence message = payload.data().readCharSequence(payload.data().readableBytes(), forName("UTF-8"));
                  payload.release();
                  log.info("> from client: {}", message);
                  return Mono.empty();
                }
              }
          );
        })
        .transport(TcpServerTransport.create(8000))
        .start()
        .block()
        .onClose()
        .block();
  }
}

и следующий клиент RSocket

@Log4j2
public class NativeRsocketClientFnF {
  public static void main(String[] args) {
    RSocketFactory.connect()
        .frameDecoder(ZERO_COPY)
        .errorConsumer(log::error)
        .transport(TcpClientTransport.create(8000))
        .start()
        .flatMap(rSocket -> rSocket.fireAndForget(DefaultPayload.create("ping")))
        .block();
  }
}

Как видите, я пытаюсь отправить "ping" как данные полезной нагрузки от клиента на сервер.

Когда я запускаю сервер и запускаю клиент в первый раз, я вижу > from client: ping

Если я перезапущу клиент снова, я не увижу никаких сообщений на сервере. Точка останова даже не попадает на сервер

Насколько я понимаю, Fire and Forget просто отправляет данные и не беспокоится о том, чтобы ждать и видеть, успешно ли сервер обрабатывает данные, но в моем случае сам сервер не получает данные при последующих запусках клиента (так же хорошо, как новые клиенты)

Что-то мне не хватает?

Я использую версию 1.0.0-RC5 из rsocket-core & rsocket-transport-netty

ОС: Ubuntu 16.04


person Ashok Koyi    schedule 21.10.2019    source источник


Ответы (1)


Вы правильно понимаете паттерн «огонь и забыть».

Проблема в том, что io.rsocket.RSocketRequester#fireAndForget успешно завершился до того, как данные были отправлены в сеть. И когда вы его заблокируете и выйдете из клиентского приложения, соединение закрывается.

См. io.rsocket.RSocketRequester#handleFireAndForget

    return emptyUnicastMono()
        .doOnSubscribe(
            new OnceConsumer<Subscription>() {
              @Override
              public void acceptOnce(@Nonnull Subscription subscription) {
                ByteBuf requestFrame =
                    RequestFireAndForgetFrameFlyweight.encode(
                        allocator,
                        streamId,
                        false,
                        payload.hasMetadata() ? payload.sliceMetadata().retain() : null,
                        payload.sliceData().retain());
                payload.release();

                sendProcessor.onNext(requestFrame);
              }
            });

sendProcessor - процессор промежуточного потока. Он используется фактическим подключением, и отправка кадра запроса в сеть может быть отложена.

Вы можете добавить .then(Mono.delay(Duration.ofMillis(100))) перед своим .block() вызовом на стороне клиента в целях тестирования. Я думаю, что большинство реальных приложений не закрываются сразу после вызова fireAndForget.

person Alexander Pankin    schedule 23.10.2019
comment
Итак, если мне нужна уверенность в том, что данные отправлены, я должен использовать запрос-ответ. Это верно? - person Ashok Koyi; 03.11.2019
comment
Сейчас да. Но вы можете создать проблему для проекта rsocket-java на GitHub. Может быть, команда rsocket знает, как исправить это и забыть. - person Alexander Pankin; 03.11.2019