Глядя на код учебника на benwilcock / spring-rsocket-demo, я пытаюсь написать сервер, который реплицирует сообщения на второй сервер, прежде чем отвечать клиенту.
Чтобы попытаться отладить свои проблемы, я просто пытаюсь тривиально обмениваться пинг-понгом между серверами. Только когда второй сервер ответит на сообщение pong, первый сервер должен ответить клиенту:
@MessageMapping("request-response")
Mono<Message> requestResponse(final Message request) {
// register a mono that will be completed when replication to another server has happened
String uuid = UUID.randomUUID().toString();
Mono<Message> deferred = Mono.create(sink -> replicationNexus.registerRequest(uuid, sink));
// FIXME attempt to send a nested request-response message that will complete the outer message later
this.requesterMono.flatMap(requester -> requester.route("pong")
.data(uuid)
.retrieveMono(String.class))
.subscribeOn(Schedulers.elastic())
.subscribe( uuid2 -> replicationNexus.complete(uuid2, new Message(SERVER, RESPONSE)));
// return the deferred work that will be completed by the pong response
return deferred;
}
Эта логика пытается использовать этот ответ для создания соединения со вторым сервером, который будет подключаться повторно:
this.requesterMono = builder.rsocketConnector(connector -> connector
.reconnect(Retry.fixedDelay(Integer.MAX_VALUE, Duration.ofSeconds(1))))
.connectTcp("localhost", otherPort).cache();
Чтобы завершить картину, вот тривиальная логика пинг-понга:
@MessageMapping("pong")
public Mono<String> pong(String m) {
return Mono.just(m);
}
и вот логика, которая хранит состояние ответа внешнего клиента, который завершается, когда другой сервер отвечает:
public class ReplicationNexus<T> {
final Map<String, MonoSink<T>> requests = new ConcurrentHashMap<>();
public void registerRequest(String v, MonoSink<T> sink) {
requests.put(v, sink);
}
public boolean complete(String uuid, T message) {
Optional<MonoSink<T>> sink = Optional.of(requests.get(uuid));
if( sink.isPresent() ){
sink.get().success(message);
}
return sink.isPresent();
}
}
Отлаживая второй сервер, он никогда не запускает метод pong. Кажется, что первый сервер на самом деле не отправляет внутреннее сообщение запроса.
Как правильно запустить внутренний обмен запрос-ответ, который завершает внешний обмен сообщениями с автоматической логикой переподключения?