Я работаю над простой службой чата, запускаемой Spring Boot 2.1.1 с WebFlux, Reactor 3.2.3, Mongo 3.8.2 и Netty 4.1.31.
Каждая чат-комната имеет 2 коллекции - архив сообщений и ограниченную коллекцию с текущими событиями (например, событие нового сообщения, индикаторы ввода пользователем и т. Д.). Ограниченная коллекция состоит из 100 элементов, и я использую метод tail () ReactiveMongoTemplate для получения последних событий.
Служба предоставляет 2 типа конечных точек для получения последних событий: SSE и для опроса. Я провел несколько стресс-тестов с 2000 одновременными пользователями, которые, помимо прослушивания чата, рассылали спам о множестве событий.
Наблюдения:
- опрос каждые 2 секунды вызывает небольшую нагрузку на службу (~ 40% использования ЦП во время теста) и почти не вызывает нагрузки на MongoDB (~ 4%)
- прослушивание через SSE приводит к максимальному использованию MongoDB (~ 90%), а также вызывает нагрузку на службу (которая пытается использовать остальные доступные ресурсы), но Mongo испытывает особые трудности, и в целом служба перестает отвечать на запросы.
Наблюдение кажется очевидным, потому что, когда я подключился через SSE во время теста, он почти мгновенно обновил меня, когда появилось новое событие - в основном SSE был в сотни раз более отзывчивым, чем опрос каждые 2 секунды.
Вопрос:
Учитывая, что клиент в конечном итоге является подписчиком (или, по крайней мере, я думаю, что это связано с ограниченными знаниями), могу ли я как-то ограничить скорость публикации сообщений с помощью ReactiveMongoTemplate? Или как-то снизить спрос на новые события без необходимости делать это на стороне клиента?
Я испытывал удачу с буферизацией Flux и кешированием, но это вызвало еще больше стресса ...
Код:
// ChatRepository.java
private static final Query chatEventsQuery = new Query();
public Flux<ChatEvent> getChatEventsStream(String chatId) {
return reactiveMongoTemplate.tail(
chatEventsQuery,
ChatEvent.class,
chatId
);
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvent(username))
.map(event -> ServerSentEvent.<ChatEvent>builder()
.event(event.getType().getEventName())
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
// ChatRouter.java
RouterFunction<ServerResponse> routes(ChatHandler handler) {
return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}