Сводка по разрешению:
В большинстве примеров RSocket, которые в настоящее время существуют, акцептор на стороне сервера просто создается как новый объект (например, new MqttMessageService () ниже) даже в руководствах, связанных с SpringBoot. Это нормально, если вы создаете пример содержимого прямо в классе акцептора, но может привести к путанице, связанной с инъекцией зависимостей ниже, когда акцептор зависит от других bean-компонентов в контейнере.
Исходный вопрос:
Я получаю исключение NullPointerException при попытке потоковой передачи записей базы данных с использованием репозитория Spring Data Reactive Mongodb через сервер Java Rsocket.
Проблема в том, что во время отладки все компоненты работают отдельно: я могу получить запрошенные данные через тот же репозиторий Mongodb, а также могу передавать произвольно сгенерированные данные между тем же сервером и клиентом с помощью Rsocket.
Так что мне либо не хватает чего-то действительно простого, либо может возникнуть проблема с совместным использованием Reactive Mongodb и Rsocket.
Вот исходная конфигурация Rsocket на стороне сервера:
@Configuration
public class RsocketConfig {
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
А вот рабочая конфигурация Rsocket на стороне сервера с правильным DI:
@Configuration
public class RsocketConfig {
@Autowired
MqttMessageService messageService;
@PostConstruct
public void startServer() {
RSocketFactory.receive()
.acceptor((setup, sendingSocket) -> Mono.just(messageService))
.transport(TcpServerTransport.create(8802))
.start()
.block()
.onClose()
}
}
Вот реализация AbstractRSocket на стороне сервера, где при возврате service.findAll () выдается исключение NullPointerException.
@Service
public class MqttMessageService extends AbstractRSocket {
@Autowired
private MqttMessageEntityService service;
@Override
public Flux<Payload> requestStream(Payload payload) {
return service.findAll()
.map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));
}
}
Вот реактивный репозиторий и связанный с ним сервис. Служба возвращает null при внедрении в реализацию AbstractRSocket сервера, но отлично работает при внедрении в другие классы:
@Service
public class MqttMessageEntityService {
@Autowired
private MqttMessageEntityRepository repository;
public Flux<MqttMessageEntity> findAll() {
return repository.findAll();
}
}
public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {
}
А вот код на стороне клиента, который отлично работает с содержимым теста:
@Configuration
public class RsocketConfig {
@PostConstruct
public void testRsocket() {
RSocket rSocketClient = RSocketFactory
.connect()
.transport(TcpClientTransport.create(8802))
.start()
.block();
rSocketClient
.requestStream(DefaultPayload.create(""))
.blockLast();
}
}
Я могу быть немного выше моего уровня знаний здесь, и ресурсы по этой теме очень ограничены, поэтому я ценю любые подсказки к решению :)