Spring Boot RSocket отправляет сообщение в сопоставлении сообщений

Глядя на код учебника на benwilcock / spring-rsocket-demo, я пытаюсь написать сервер, который реплицирует сообщения на второй сервер, прежде чем отвечать клиенту.

Чтобы попытаться отладить свои проблемы, я просто пытаюсь тривиально обмениваться пинг-понгом между серверами. Только когда второй сервер ответит на сообщение pong, первый сервер должен ответить клиенту:

@MessageMapping("request-response")
Mono<Message> requestResponse(final Message request) {
    // register a mono that will be completed when replication to another server has happened
    String uuid = UUID.randomUUID().toString();
    Mono<Message> deferred = Mono.create(sink -> replicationNexus.registerRequest(uuid, sink));

    // FIXME attempt to send a nested request-response message that will complete the outer message later
    this.requesterMono.flatMap(requester -> requester.route("pong")
            .data(uuid)
            .retrieveMono(String.class))
            .subscribeOn(Schedulers.elastic())
            .subscribe( uuid2 -> replicationNexus.complete(uuid2, new Message(SERVER, RESPONSE)));

    // return the deferred work that will be completed by the pong response
    return deferred;
}

Эта логика пытается использовать этот ответ для создания соединения со вторым сервером, который будет подключаться повторно:

    this.requesterMono = builder.rsocketConnector(connector -> connector
            .reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
            .connectTcp("localhost", otherPort).cache();

Чтобы завершить картину, вот тривиальная логика пинг-понга:

@MessageMapping("pong")
public Mono<String> pong(String m) {
    return Mono.just(m);
}

и вот логика, которая хранит состояние ответа внешнего клиента, который завершается, когда другой сервер отвечает:

public class ReplicationNexus<T> {
final Map<String, MonoSink<T>> requests = new ConcurrentHashMap<>();

public void registerRequest(String v, MonoSink<T> sink) {
    requests.put(v, sink);
}

public boolean complete(String uuid, T message) {
    Optional<MonoSink<T>> sink = Optional.of(requests.get(uuid));
    if( sink.isPresent() ){
        sink.get().success(message);
    }
    return sink.isPresent();
}
}

Отлаживая второй сервер, он никогда не запускает метод pong. Кажется, что первый сервер на самом деле не отправляет внутреннее сообщение запроса.

Как правильно запустить внутренний обмен запрос-ответ, который завершает внешний обмен сообщениями с автоматической логикой переподключения?


person simbo1905    schedule 02.12.2020    source источник
comment
Ваш вопрос может быть достаточно сложным, чтобы, задав его непосредственно в community.reactive.foundation, можно было получить ответ раньше.   -  person Yuri Schimke    schedule 02.12.2020


Ответы (1)


Не уверен, что мне не хватает сложности вашего вопроса, но если средний сервер просто активируется как прокси, я бы начал с простейшего случая цепочки вызовов. Я чувствую, что упускаю какой-то нюанс в вопросе, так что давайте разберемся с ним дальше.

  @MessageMapping("runCommand")
  suspend fun runCommandX(
    request: CommandRequest,
  ): Mono<String> {
    val uuid = UUID.randomUUID().toString()

    return requesterMono
      .flatMap { requester: RSocketRequester ->
        requester.route("pong")
          .data("TEST")
          .retrieveMono(String::class.java)
      }
      .doOnSubscribe {
        // register request with uuid
      }
      .doOnSuccess {
        // register completion
      }
      .doOnError {
        // register failure
      }
  }

Как правило, если вы можете избежать вызова, подпишитесь самостоятельно в типичном коде spring / reactive / rsocket. Вы хотите, чтобы фреймворк делал это за вас.

person Yuri Schimke    schedule 02.12.2020
comment
спасибо Юрий, я попробую это в следующий раз. Могу я спросить, что значит !! как я не знаком с котлином? - person simbo1905; 03.12.2020
comment
принудительно проверяет ненулевое значение - бросает NPE или продолжает. в противном случае вам понадобится? .b? .c? .d - person Yuri Schimke; 03.12.2020
comment
извините за задержку с возвращением. если я попробую ваш подход здесь github.com/simbo1905/spring-rsocket-demo/blob/, если точка останова установлена ​​в flatMap и doOnSubscribe, то они не вызываются. похоже, что через моно плоскую карту лениво и не вызывается? - person simbo1905; 11.12.2020
comment
Ага, да, я не горела желанием. Честно говоря, я почти всегда считаю Mono, у которого есть побочные эффекты до подписки, вероятно, ошибкой. Это нарушает семантику Rx ИМХО. - person Yuri Schimke; 11.12.2020
comment
хорошо, у меня есть решение проблемы, связанное с загрузкой в ​​поток приложения, чтобы выполнить работу репликации. Я опубликую его здесь как указатель для других, а затем продолжу работу на некоторых специализированных форумах, как вы предлагаете. Спасибо! - person simbo1905; 12.12.2020