Модель потоков Netty
- Иллюстрированная модель многопоточности Netty
Когда дело доходит до модели потоков Netty, мы должны повторить модель потоков Reactor master-slave. Модель многопоточности Netty в основном основана на реализации модели Reactor master-slave. Модель многопоточности Netty будет расширена из следующих двух диаграмм:
Модель потока Reactor master-slave
public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEven | tLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); boot st rap.group(bossGroup., workerGroup) .channel(NioServerSocketChannel.class) .option(Channeloption.SOLB4C / CLOG., 1024) .childHandler(new ChildChannelHandler()); System.out.printIn("The server started successfully..."); ChannelFuture f = bootstrap.bind(port).sync(); f.channel().closeFuture().sync(); } finally { //Thread pool resource release bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
Пример кода сервера Netty.
Для объяснения многопоточной модели Reactor master-slave, пожалуйста, сосредоточьтесь на предварительной главе о модели многопоточности Netty и на реализации исходного кода модели Reactor master-slave. Из примера кода сервера Netty видно, что модель потоков Netty должна быть неотделима от EventLoopGroup. Да, указанная выше bossGroup эквивалентна Main Reactor в режиме Reactor, а рабочая группа эквивалентна SubReactor.
Иерархия классов модели потоковой передачи Netty:
Диаграмма наследования классов модели потоков Netty:
Как видно из рисунка выше, в потоковой модели Netty есть четыре основных интерфейса: EventLoopGroup, EventLoop, EventExecuteGroup и EventExecute. Среди них EventExecute расширяется от интерфейса java.util.concurrent.ScheduledExecutorService, который аналогичен ответственности пула потоков (выполнение), в то время как EventLoop сначала наследуется от EventExecute и в основном расширяет метод регистрации, который является методом регистрации канала. Канал на селектор.
NioEventLoop — это реализация, основанная на Nio. Одно яркое пятно в этом классе — избежать ошибки JDK nio, пустого опроса метода выбора Selector. Основная идея заключается в том, что если есть много последовательных раз (по умолчанию 512), он вернется без тайм-аута и будет готов. Если количество ключей равно 0, считается, что произошел пустой опрос. Если возникает пустой опрос, будет создан новый селектор, соответствующий канал и события будут зарегистрированы в новом селекторе, а старый селектор будет закрыт. Исходный код реализован следующим образом:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector.", selectCnt); rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row.", selectCnt - 1); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?", e); } // Harmless exception - log anyway } } /** * Replaces the current {@link Selector} of this event loop with newly created {@link Selector}s to work * around the infamous epoll 100% CPU bug. */ public void rebuildSelector() { if (!inEventLoop()) { execute(new Runnable() { @Override public void run() { rebuildSelector(); } }); return; } final Selector oldSelector = selector; final Selector newSelector; if (oldSelector == null) { return; } try { newSelector = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (;;) { try { for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelector) != null) { continue; } int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } } catch (ConcurrentModificationException e) { // Probably due to concurrent modification of the key set. continue; } break; } selector = newSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); }