Модель потоков Netty

  1. Иллюстрированная модель многопоточности Netty

Когда дело доходит до модели потоков Netty, мы должны повторить модель потоков Reactor master-slave. Модель многопоточности Netty в основном основана на реализации модели Reactor master-slave. Модель многопоточности Netty будет расширена из следующих двух диаграмм:

Модель потока Reactor master-slave

 public void bind(int port) throws Exception {
        EventLoopGroup bossGroup = new NioEven | tLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            boot st rap.group(bossGroup., workerGroup)
                .channel(NioServerSocketChannel.class)
                .option(Channeloption.SOLB4C / CLOG., 1024)
                .childHandler(new ChildChannelHandler());
            
            System.out.printIn("The server started successfully...");
            ChannelFuture f = bootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        } finally {
            //Thread pool resource release
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

Пример кода сервера Netty.

Для объяснения многопоточной модели Reactor master-slave, пожалуйста, сосредоточьтесь на предварительной главе о модели многопоточности Netty и на реализации исходного кода модели Reactor master-slave. Из примера кода сервера Netty видно, что модель потоков Netty должна быть неотделима от EventLoopGroup. Да, указанная выше bossGroup эквивалентна Main Reactor в режиме Reactor, а рабочая группа эквивалентна SubReactor.

Иерархия классов модели потоковой передачи Netty:

Диаграмма наследования классов модели потоков Netty:

Как видно из рисунка выше, в потоковой модели Netty есть четыре основных интерфейса: EventLoopGroup, EventLoop, EventExecuteGroup и EventExecute. Среди них EventExecute расширяется от интерфейса java.util.concurrent.ScheduledExecutorService, который аналогичен ответственности пула потоков (выполнение), в то время как EventLoop сначала наследуется от EventExecute и в основном расширяет метод регистрации, который является методом регистрации канала. Канал на селектор.

NioEventLoop — это реализация, основанная на Nio. Одно яркое пятно в этом классе — избежать ошибки JDK nio, пустого опроса метода выбора Selector. Основная идея заключается в том, что если есть много последовательных раз (по умолчанию 512), он вернется без тайм-аута и будет готов. Если количество ключей равно 0, считается, что произошел пустой опрос. Если возникает пустой опрос, будет создан новый селектор, соответствующий канал и события будут зарегистрированы в новом селекторе, а старый селектор будет закрыт. Исходный код реализован следующим образом:

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
            for (;;) {
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
 
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
 
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it's client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }
 
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The selector returned prematurely many times in a row.
                    // Rebuild the selector to work around the problem.
                    logger.warn(
                            "Selector.select() returned prematurely {} times in a row; rebuilding selector.",
                            selectCnt);
 
                    rebuildSelector();
                    selector = this.selector;
 
                    // Select again to populate selectedKeys.
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
 
                currentTimeNanos = time;
            }
 
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e);
            }
            // Harmless exception - log anyway
        }
    }
 
 
/**
     * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work
     * around the infamous epoll 100% CPU bug.
     */
    public void rebuildSelector() {
        if (!inEventLoop()) {
            execute(new Runnable() {
                @Override
                public void run() {
                    rebuildSelector();
                }
            });
            return;
        }
 
        final Selector oldSelector = selector;
        final Selector newSelector;
 
        if (oldSelector == null) {
            return;
        }
 
        try {
            newSelector = openSelector();
        } catch (Exception e) {
            logger.warn("Failed to create a new Selector.", e);
            return;
        }
 
        // Register all channels to the new Selector.
        int nChannels = 0;
        for (;;) {
            try {
                for (SelectionKey key: oldSelector.keys()) {
                    Object a = key.attachment();
                    try {
                        if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
                            continue;
                        }
 
                        int interestOps = key.interestOps();
                        key.cancel();
                        SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
                        if (a instanceof AbstractNioChannel) {
                            // Update SelectionKey
                            ((AbstractNioChannel) a).selectionKey = newKey;
                        }
                        nChannels ++;
                    } catch (Exception e) {
                        logger.warn("Failed to re-register a Channel to the new Selector.", e);
                        if (a instanceof AbstractNioChannel) {
                            AbstractNioChannel ch = (AbstractNioChannel) a;
                            ch.unsafe().close(ch.unsafe().voidPromise());
                        } else {
                            @SuppressWarnings("unchecked")
                            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                            invokeChannelUnregistered(task, key, e);
                        }
                    }
                }
            } catch (ConcurrentModificationException e) {
                // Probably due to concurrent modification of the key set.
                continue;
            }
 
            break;
        }
 
        selector = newSelector;
 
        try {
            // time to close the old selector as everything else is registered to the new one
            oldSelector.close();
        } catch (Throwable t) {
            if (logger.isWarnEnabled()) {
                logger.warn("Failed to close the old Selector.", t);
            }
        }
 
        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
    }