В рамках своего исследования я пишу высоконагруженный эхо-сервер TCP/IP на Java. Я хочу обслуживать около 3-4к клиентов и видеть максимально возможные сообщения в секунду, которые я могу из него выжать. Размер сообщения совсем небольшой - до 100 байт. Эта работа не имеет практической цели - только исследование.
Согласно многочисленным презентациям, которые я видел (тесты HornetQ, доклады LMAX Disruptor и т. д.), реальные высоконагруженные системы, как правило, обслуживают миллионы транзакций в секунду (я полагаю, что Disruptor упоминал о 6 милах, а Hornet — 8,5). Например, в этом сообщении говорится, что можно Достигните до 40M MPS. Поэтому я воспринял это как приблизительную оценку того, на что должно быть способно современное оборудование.
Я написал простейший однопоточный NIO-сервер и запустил нагрузочный тест. Я был немного удивлен, что я могу получить только около 100 000 MPS на локальном хосте и 25 000 с реальной сетью. Цифры выглядят совсем маленькими. Я тестировал на Win7 x64, core i7. Глядя на загрузку ЦП - занято только одно ядро (что ожидается в однопоточном приложении), а остальные бездействуют. Однако даже если я загружу все 8 ядер (включая виртуальные), у меня будет не более 800 тыс. MPS — даже близко не 40 миллионов :)
Мой вопрос: каков типичный шаблон для обслуживания большого количества сообщений клиентам? Должен ли я распределять сетевую нагрузку на несколько разных сокетов внутри одной JVM и использовать какой-то балансировщик нагрузки, такой как HAProxy, для распределения нагрузки на несколько ядер? Или мне следует обратить внимание на использование нескольких селекторов в моем коде NIO? Или, может быть, даже распределить нагрузку между несколькими JVM и использовать Chronicle для построения межпроцессного взаимодействия между ними? Будет ли тестирование на подходящей серверной ОС, такой как CentOS, иметь большое значение (может быть, Windows замедляет работу)?
Ниже приведен пример кода моего сервера. Он всегда отвечает «ок» на любые входящие данные. Я знаю, что в реальном мире мне нужно будет отслеживать размер сообщения и быть готовым к тому, что одно сообщение может быть разделено между несколькими чтениями, однако я бы хотел, чтобы сейчас все было очень просто.
public class EchoServer {
private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
private InetAddress hostAddress = null;
private int port;
private Selector selector;
private long loopTime;
private long numMessages = 0;
public EchoServer() throws IOException {
this(DEFAULT_PORT);
}
public EchoServer(int port) throws IOException {
this.port = port;
selector = initSelector();
loop();
}
private void loop() {
while (true) {
try{
selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
read(key);
} else if (key.isWritable()) {
write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
socketChannel.register(selector, SelectionKey.OP_READ);
System.out.println("Client is connected");
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(readBuffer);
} catch (IOException e) {
key.cancel();
socketChannel.close();
System.out.println("Forceful shutdown");
return;
}
if (numRead == -1) {
System.out.println("Graceful shutdown");
key.channel().close();
key.cancel();
return;
}
socketChannel.register(selector, SelectionKey.OP_WRITE);
numMessages++;
if (numMessages%100000 == 0) {
long elapsed = System.currentTimeMillis() - loopTime;
loopTime = System.currentTimeMillis();
System.out.println(elapsed);
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));
socketChannel.write(dummyResponse);
if (dummyResponse.remaining() > 0) {
System.err.print("Filled UP");
}
key.interestOps(SelectionKey.OP_READ);
}
private Selector initSelector() throws IOException {
Selector socketSelector = SelectorProvider.provider().openSelector();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
serverChannel.socket().bind(isa);
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public static void main(String[] args) throws IOException {
System.out.println("Starting echo server");
new EchoServer();
}
}