Java высоконагруженный TCP-сервер NIO

В рамках своего исследования я пишу высоконагруженный эхо-сервер TCP/IP на Java. Я хочу обслуживать около 3-4к клиентов и видеть максимально возможные сообщения в секунду, которые я могу из него выжать. Размер сообщения совсем небольшой - до 100 байт. Эта работа не имеет практической цели - только исследование.

Согласно многочисленным презентациям, которые я видел (тесты HornetQ, доклады LMAX Disruptor и т. д.), реальные высоконагруженные системы, как правило, обслуживают миллионы транзакций в секунду (я полагаю, что Disruptor упоминал о 6 милах, а Hornet — 8,5). Например, в этом сообщении говорится, что можно Достигните до 40M MPS. Поэтому я воспринял это как приблизительную оценку того, на что должно быть способно современное оборудование.

Я написал простейший однопоточный NIO-сервер и запустил нагрузочный тест. Я был немного удивлен, что я могу получить только около 100 000 MPS на локальном хосте и 25 000 с реальной сетью. Цифры выглядят совсем маленькими. Я тестировал на Win7 x64, core i7. Глядя на загрузку ЦП - занято только одно ядро ​​(что ожидается в однопоточном приложении), а остальные бездействуют. Однако даже если я загружу все 8 ядер (включая виртуальные), у меня будет не более 800 тыс. MPS — даже близко не 40 миллионов :)

Мой вопрос: каков типичный шаблон для обслуживания большого количества сообщений клиентам? Должен ли я распределять сетевую нагрузку на несколько разных сокетов внутри одной JVM и использовать какой-то балансировщик нагрузки, такой как HAProxy, для распределения нагрузки на несколько ядер? Или мне следует обратить внимание на использование нескольких селекторов в моем коде NIO? Или, может быть, даже распределить нагрузку между несколькими JVM и использовать Chronicle для построения межпроцессного взаимодействия между ними? Будет ли тестирование на подходящей серверной ОС, такой как CentOS, иметь большое значение (может быть, Windows замедляет работу)?

Ниже приведен пример кода моего сервера. Он всегда отвечает «ок» на любые входящие данные. Я знаю, что в реальном мире мне нужно будет отслеживать размер сообщения и быть готовым к тому, что одно сообщение может быть разделено между несколькими чтениями, однако я бы хотел, чтобы сейчас все было очень просто.

public class EchoServer {

private static final int BUFFER_SIZE = 1024;
private final static int DEFAULT_PORT = 9090;

// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);

private InetAddress hostAddress = null;

private int port;
private Selector selector;

private long loopTime;
private long numMessages = 0;

public EchoServer() throws IOException {
    this(DEFAULT_PORT);
}

public EchoServer(int port) throws IOException {
    this.port = port;
    selector = initSelector();
    loop();
}

private void loop() {
    while (true) {
        try{
            selector.select();
            Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
            while (selectedKeys.hasNext()) {
                SelectionKey key = selectedKeys.next();
                selectedKeys.remove();

                if (!key.isValid()) {
                    continue;
                }

                // Check what event is available and deal with it
                if (key.isAcceptable()) {
                    accept(key);
                } else if (key.isReadable()) {
                    read(key);
                } else if (key.isWritable()) {
                    write(key);
                }
            }

        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
    }
}

private void accept(SelectionKey key) throws IOException {
    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

    SocketChannel socketChannel = serverSocketChannel.accept();
    socketChannel.configureBlocking(false);
    socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE, true);
    socketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);
    socketChannel.register(selector, SelectionKey.OP_READ);

    System.out.println("Client is connected");
}

private void read(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();

    // Clear out our read buffer so it's ready for new data
    readBuffer.clear();

    // Attempt to read off the channel
    int numRead;
    try {
        numRead = socketChannel.read(readBuffer);
    } catch (IOException e) {
        key.cancel();
        socketChannel.close();

        System.out.println("Forceful shutdown");
        return;
    }

    if (numRead == -1) {
        System.out.println("Graceful shutdown");
        key.channel().close();
        key.cancel();

        return;
    }

    socketChannel.register(selector, SelectionKey.OP_WRITE);

    numMessages++;
    if (numMessages%100000 == 0) {
        long elapsed = System.currentTimeMillis() - loopTime;
        loopTime = System.currentTimeMillis();
        System.out.println(elapsed);
    }
}

private void write(SelectionKey key) throws IOException {
    SocketChannel socketChannel = (SocketChannel) key.channel();
    ByteBuffer dummyResponse = ByteBuffer.wrap("ok".getBytes("UTF-8"));

    socketChannel.write(dummyResponse);
    if (dummyResponse.remaining() > 0) {
        System.err.print("Filled UP");
    }

    key.interestOps(SelectionKey.OP_READ);
}

private Selector initSelector() throws IOException {
    Selector socketSelector = SelectorProvider.provider().openSelector();

    ServerSocketChannel serverChannel = ServerSocketChannel.open();
    serverChannel.configureBlocking(false);

    InetSocketAddress isa = new InetSocketAddress(hostAddress, port);
    serverChannel.socket().bind(isa);
    serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
    return socketSelector;
}

public static void main(String[] args) throws IOException {
    System.out.println("Starting echo server");
    new EchoServer();
}
}

person Juriy    schedule 09.07.2013    source источник
comment
40 миллионов транзакций в секунду на сервер?! Они должны отвечать одним байтом.   -  person Sotirios Delimanolis    schedule 09.07.2013
comment
Полагаю, что без бизнес-логики — только обмен сообщениями. Но да, это то, что я видел в том посте. Удивительные цифры.   -  person Juriy    schedule 09.07.2013
comment
Вам не нужно ждать OP_WRITE, прежде чем вы сможете писать. Вам нужно сделать это только после записи нулевой длины. Вам не нужно отменять ключ до или после закрытия канала.   -  person user207421    schedule 22.01.2014


Ответы (3)


what is a typical pattern for serving massive amounts of messages to clients?

Существует много возможных шаблонов: простой способ использовать все ядра без использования нескольких jvms:

  1. Пусть один поток принимает соединения и читает с помощью селектора.
  2. Как только у вас будет достаточно байтов для создания одного сообщения, передайте его другому ядру, используя конструкцию, подобную кольцевому буферу. Платформа Disruptor Java хорошо подходит для этого. Это хороший шаблон, если обработка, необходимая для определения того, что является полным сообщением, является легкой. Например, если у вас есть протокол с префиксом длины, вы можете подождать, пока не получите ожидаемое количество байтов, а затем отправить его в другой поток. Если синтаксический анализ протокола очень тяжелый, вы можете перегрузить этот единственный поток, не позволяя ему принимать соединения или читать байты сети.
  3. В ваших рабочих потоках, которые получают данные из кольцевого буфера, выполните фактическую обработку.
  4. Вы записываете ответы либо в свои рабочие потоки, либо через какой-либо другой поток агрегатора.

В этом суть. Здесь есть еще много возможностей, и ответ действительно зависит от типа приложения, которое вы пишете. Вот несколько примеров:

  1. Приложение без сохранения состояния, интенсивно использующее ЦП, например, приложение для обработки изображений. Объем работы ЦП/ГП, выполняемой для каждого запроса, вероятно, будет значительно выше, чем накладные расходы, генерируемые очень простым решением для межпоточной связи. В этом случае простым решением является группа рабочих потоков, вытягивающих работу из одной очереди. Обратите внимание, что это одна очередь, а не одна очередь на каждого работника. Преимущество в том, что это по своей сути балансирует нагрузку. Каждый рабочий завершает свою работу, а затем просто опрашивает очередь одного производителя и нескольких потребителей. Несмотря на то, что это является источником разногласий, работа по обработке изображений (секунды?) должна быть намного дороже, чем любая альтернатива синхронизации.
  2. Простое приложение ввода-вывода, например. сервер статистики, который просто увеличивает некоторые счетчики для запроса: здесь вы почти не выполняете тяжелую работу процессора. Большая часть работы — это просто чтение байтов и запись байтов. Многопоточное приложение может не дать вам здесь значительных преимуществ. На самом деле это может даже замедлить работу, если время, необходимое для постановки элементов в очередь, превышает время, необходимое для их обработки. Однопоточный сервер Java должен быть в состоянии легко насытить канал 1G.
  3. Приложения с отслеживанием состояния, которые требуют умеренных объемов обработки, например. типичное бизнес-приложение: здесь каждый клиент имеет некоторое состояние, которое определяет, как обрабатывается каждый запрос. Предполагая, что мы используем многопоточность, поскольку обработка нетривиальна, мы могли бы связать клиентов с определенными потоками. Это вариант архитектуры актора:

    i) Когда клиент впервые подключает хэш к рабочему. Возможно, вы захотите сделать это с некоторым идентификатором клиента, чтобы при отключении и повторном подключении он по-прежнему был назначен одному и тому же работнику/актеру.

    ii) Когда поток чтения читает полный запрос, поместите его в кольцевой буфер для правильного работника/актера. Поскольку один и тот же рабочий процесс всегда обрабатывает конкретного клиента, все состояние должно быть локальным для потока, что делает всю логику обработки простой и однопоточной.

    iii) Рабочий поток может записывать запросы. Всегда пытайтесь просто выполнить write(). Если все ваши данные не могут быть записаны только тогда, вы регистрируетесь в OP_WRITE. Рабочий поток должен выполнять вызовы select только в том случае, если на самом деле есть что-то незавершенное. Большинство записей должно просто преуспеть, что делает это ненужным. Хитрость здесь заключается в балансировании между вызовами select и опросом кольцевого буфера для получения дополнительных запросов. Вы также можете использовать один поток записи, единственной обязанностью которого является запись запросов. Каждый рабочий поток может помещать свои ответы в кольцевой буфер, соединяющий его с этим единственным потоком записи. Циклический цикл с одним потоком записи опрашивает каждый входящий кольцевой буфер и записывает данные клиентам. Снова предостережение о попытке записи перед выбором применяется, как и трюк с балансировкой между несколькими кольцевыми буферами и вызовами выбора.

Как вы указываете, есть много других вариантов:

Should I distribute networking load over several different sockets inside a single JVM and use some sort of load balancer like HAProxy to distribute load to multiple cores?

Вы можете сделать это, но ИМХО это не лучшее использование балансировщика нагрузки. Это действительно дает вам независимые JVM, которые могут выйти из строя сами по себе, но, вероятно, будут медленнее, чем написание одного приложения JVM, которое является многопоточным. Само приложение может быть проще написать, поскольку оно будет однопоточным.

Or I should look towards using multiple Selectors in my NIO code?

Ты тоже можешь это сделать. Посмотрите на архитектуру Ngnix, чтобы узнать, как это сделать.

Or maybe even distribute the load between multiple JVMs and use Chronicle to build an inter-process communication between them? Это тоже вариант. Преимущество Chronicle заключается в том, что файлы с отображением памяти более устойчивы к завершению процесса в середине. Вы по-прежнему получаете высокую производительность, поскольку все коммуникации осуществляются через разделяемую память.

Will testing on a proper serverside OS like CentOS make a big difference (maybe it is Windows that slows things down)?

Я не знаю об этом. Навряд ли. Если Java в полной мере использует собственные API-интерфейсы Windows, это не должно иметь большого значения. Я очень сомневаюсь в цифре 40 миллионов транзакций в секунду (без сетевого стека пользовательского пространства + UDP), но перечисленные мной архитектуры должны работать довольно хорошо.

Эти архитектуры, как правило, преуспевают, поскольку они представляют собой архитектуры с одной записью, которые используют структуры данных на основе ограниченного массива для межпотокового взаимодействия. Определите, является ли многопоточность ответом. Во многих случаях это не нужно и может привести к замедлению.

Еще одна область, на которую следует обратить внимание, — это схемы распределения памяти. В частности, стратегия выделения и повторного использования буферов может привести к значительным преимуществам. Правильная стратегия повторного использования буфера зависит от приложения. Посмотрите на такие схемы, как распределение памяти приятелей, распределение арен и т. д., чтобы увидеть, могут ли они принести вам пользу. JVM GC отлично справляется с большинством рабочих нагрузок, поэтому всегда измеряйте, прежде чем идти по этому пути.

Дизайн протокола также оказывает большое влияние на производительность. Я предпочитаю протоколы с префиксом длины, потому что они позволяют выделять буферы нужного размера, избегая списков буферов и/или слияния буферов. Протоколы с префиксом длины также упрощают принятие решения о передаче запроса — просто отметьте num bytes == expected. Фактический синтаксический анализ может быть выполнен рабочим потоком. Сериализация и десериализация выходит за рамки протоколов с префиксом длины. Здесь помогают такие шаблоны, как шаблоны легковеса над буферами вместо аллокаций. Посмотрите на SBE некоторые из этих принципов.

Как вы понимаете, здесь можно написать целый трактат. Это должно направить вас в правильном направлении. Предупреждение: всегда измеряйте и убедитесь, что вам нужно больше производительности, чем самый простой вариант. Легко попасть в бесконечную черную дыру улучшений производительности.

person Rajiv    schedule 08.01.2014

Ваша логика написания ошибочна. Вы должны попытаться записать сразу же, как только у вас появятся данные для записи. Если write() возвращает ноль, тогда пора зарегистрироваться для OP_WRITE, повторить попытку записи, когда канал станет доступным для записи, и отменить регистрацию для OP_WRITE, когда запись будет успешной. Вы добавляете огромное количество задержек здесь. Вы добавляете еще большую задержку, отменяя регистрацию для OP_READ, пока делаете все это.

person user207421    schedule 07.01.2014
comment
Спасибо @EJP. не могли бы вы привести мне несколько примеров? как правильно достичь максимальной производительности с помощью NIO? - person FaNaJ; 20.07.2015
comment
Я сделал лучше, чем привел несколько примеров. Я дал вам общий принцип. Избегание задержки — один из способов добиться максимума во всем. - person user207421; 16.05.2016
comment
EJP, можете ли вы предложить способ интуитивно понять, почему оставление канала в режиме OP_WRITE, когда нет срочной необходимости записи, является массовым скрытым? Я могу себе представить, что процессор должен быть готов к чтению или записи, но не ожидал, что это заметно повлияет на производительность. Почему так медленно проверяется готовность селектора к записи? - person Adam Hughes; 16.12.2016
comment
@AdamHughes Это очень скрыто, потому что вы бессмысленно откладываете запись до тех пор, пока не будет выполнено Selector, не будет обработано любое количество других готовых каналов и т. д. Просто нет причин заниматься этой неэффективностью. - person user207421; 18.05.2017
comment
Очень интересное замечание. Большое спасибо. Но почему отмена регистрации OP_READ вызывает большую задержку? - person St.Antario; 19.06.2017
comment
Потому что вы не обнаружите OP_READ, пока не перерегистрируете его. - person user207421; 19.06.2017
comment
@EJP Но мы можем побитово или READ_OP и WRITE_OP, чтобы мы могли обрабатывать готовые к чтению/записи. - person St.Antario; 19.06.2017
comment
@St.Antario Конечно, можешь. Почему «но»? Какое это имеет отношение к отмене регистрации OP_READ? Какую точку вы пытаетесь сделать? - person user207421; 20.06.2017

Вы достигнете нескольких сотен тысяч запросов в секунду на обычном оборудовании. По крайней мере, таков мой опыт создания подобных решений, и Tech Empower Web Frameworks Benchmark, кажется, тоже согласен.

Наилучший подход, как правило, зависит от того, есть ли у вас нагрузки, связанные с вводом-выводом или с привязкой к процессору.

Для нагрузок, связанных с вводом-выводом (большая задержка), вам необходимо выполнять асинхронный ввод-вывод со многими потоками. Для наилучшей производительности вы должны попытаться как можно больше аннулировать передачу обслуживания между потоками. Таким образом, наличие выделенного потока выбора и другого пула потоков для обработки медленнее, чем наличие пула потоков, в котором каждый поток выполняет либо выбор, либо обработку, так что в лучшем случае запрос обрабатывается одним потоком (если io сразу доступен). Этот тип настройки более сложен для кода, но быстр, и я не верю, что какой-либо асинхронный веб-фреймворк использует это в полной мере.

Для загрузки с привязкой к процессору один поток на запрос обычно является самым быстрым, поскольку вы избегаете переключения контекста.

person Alexander Torstling    schedule 08.01.2014