RSocket работает с сгенерированными данными, но не с Spring Reactive MongoDB

Сводка по разрешению:

В большинстве примеров RSocket, которые в настоящее время существуют, акцептор на стороне сервера просто создается как новый объект (например, new MqttMessageService () ниже) даже в руководствах, связанных с SpringBoot. Это нормально, если вы создаете пример содержимого прямо в классе акцептора, но может привести к путанице, связанной с инъекцией зависимостей ниже, когда акцептор зависит от других bean-компонентов в контейнере.

Исходный вопрос:

Я получаю исключение NullPointerException при попытке потоковой передачи записей базы данных с использованием репозитория Spring Data Reactive Mongodb через сервер Java Rsocket.

Проблема в том, что во время отладки все компоненты работают отдельно: я могу получить запрошенные данные через тот же репозиторий Mongodb, а также могу передавать произвольно сгенерированные данные между тем же сервером и клиентом с помощью Rsocket.

Так что мне либо не хватает чего-то действительно простого, либо может возникнуть проблема с совместным использованием Reactive Mongodb и Rsocket.

Вот исходная конфигурация Rsocket на стороне сервера:

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

А вот рабочая конфигурация Rsocket на стороне сервера с правильным DI:

@Configuration
public class RsocketConfig {

    @Autowired
    MqttMessageService messageService;

    @PostConstruct
    public void startServer() {
        RSocketFactory.receive()
                .acceptor((setup, sendingSocket) -> Mono.just(messageService))
                .transport(TcpServerTransport.create(8802))
                .start()
                .block()
                .onClose()
    }
}

Вот реализация AbstractRSocket на стороне сервера, где при возврате service.findAll () выдается исключение NullPointerException.

@Service
public class MqttMessageService extends AbstractRSocket {



    @Autowired 
    private MqttMessageEntityService service;

    @Override
    public Flux<Payload> requestStream(Payload payload) {
        return service.findAll()
            .map(mqttMessageEntity -> DefaultPayload.create(mqttMessageEntity.toString()));

    }
}

Вот реактивный репозиторий и связанный с ним сервис. Служба возвращает null при внедрении в реализацию AbstractRSocket сервера, но отлично работает при внедрении в другие классы:

@Service
public class MqttMessageEntityService {

    @Autowired
    private MqttMessageEntityRepository repository;

    public Flux<MqttMessageEntity> findAll() {
        return repository.findAll();
    }

}

public interface MqttMessageEntityRepository extends ReactiveMongoRepository<MqttMessageEntity, String> {

}

А вот код на стороне клиента, который отлично работает с содержимым теста:

@Configuration
public class RsocketConfig {

    @PostConstruct
    public void testRsocket() {

        RSocket rSocketClient = RSocketFactory
                .connect()
                .transport(TcpClientTransport.create(8802))
                .start()
                .block();

        rSocketClient
                .requestStream(DefaultPayload.create(""))
                .blockLast();
    }        
}

Я могу быть немного выше моего уровня знаний здесь, и ресурсы по этой теме очень ограничены, поэтому я ценю любые подсказки к решению :)


person Péter Veres    schedule 01.01.2019    source источник


Ответы (1)


Касательно

@PostConstruct
public void startServer() {
    RSocketFactory.receive()
            .acceptor((setup, sendingSocket) -> Mono.just(new MqttMessageService()))
            .transport(TcpServerTransport.create(8802))
            .start()
            .block()
            .onClose();
}

Вы используете для поддержания сервера в рабочем состоянии? Если да, добавьте еще один блок после onClose ().

Значение messageEntityService равно нулю? Потому что, похоже, это единственное, что может вызвать ошибку, если переменные topicStart и module не совпадают. Особенно, если работает другой код - я не вижу ничего, что могло бы вызвать проблемы со стороны RSocket.

person Robert Roeser    schedule 29.01.2019
comment
Ах да, вы правы! MessageEntityService не возвращает нулевое значение, сама служба имеет нулевое значение, что неожиданно, поскольку оно отлично работает при введении в другой класс. Ты хоть представляешь, как это могло происходить? Я обновляю свой вопрос, добавляя более подробную информацию о реактивном репозитории mongo и messageEntityService, поскольку он все еще может быть актуален в этом вопросе. - person Péter Veres; 03.02.2019
comment
Я думаю, этот поток может помочь вам понять, почему Spring вводит нуль в вашу переменную @Autowired: stackoverflow.com/questions/19896870/ - person Robert Roeser; 06.02.2019
comment
Хаха спасибо! Первое, что я проверил, это то, что служба управляется контейнером, но пропустил ту часть, где я должен был ввести ее в качестве акцептора на стороне сервера, вместо того, чтобы создавать новый экземпляр вне контейнера Spring. Я был немного сбит с толку, так как все примеры, которые я видел, даже те, которые связаны с SpringBoot, использовали этот способ добавления акцептора, но я только что понял, что в этих случаях на самом деле не имеет значения, поддерживается ли он Spring, поскольку все они использовали данные, сгенерированные внутри акцептора, и не зависели от других служб (например, db в моем случае) - person Péter Veres; 07.02.2019