NioEventLoopGroup的无参构造:
public NioEventLoopGroup() { this(0); }调用了单参的构造:
public NioEventLoopGroup(int nThreads) { this(nThreads, (Executor)null); }继续看到双参构造:
public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); }在这里是使用JDK中NIO的原生API:SelectorProvider的provider,产生了一个SelectorProvider对象调用,继续调用三参构造。 关于SelectorProvider在我前面的博客中有介绍过:【Java】NIO中Selector的创建源码分析,在Windows下默认创建了WindowsSelectorProvider对象。
继续看三参构造:
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider) { this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE); }在这里创建了一个单例的DefaultSelectStrategyFactory 对象:
public final class DefaultSelectStrategyFactory implements SelectStrategyFactory { public static final SelectStrategyFactory INSTANCE = new DefaultSelectStrategyFactory(); private DefaultSelectStrategyFactory() { } public SelectStrategy newSelectStrategy() { return DefaultSelectStrategy.INSTANCE; } }DefaultSelectStrategyFactory实现的是SelectStrategyFactory 接口:
public interface SelectStrategyFactory { SelectStrategy newSelectStrategy(); }该接口提供一个用来产生Select策略的方法,SelectStrategy接口如下:
public interface SelectStrategy { int SELECT = -1; int CONTINUE = -2; int calculateStrategy(IntSupplier var1, boolean var2) throws Exception; }根据IntSupplier 和一个boolean值为Select策略提供了一个计算策略的方法。 在Netty中只提供了DefaultSelectStrategy这一种默认实现:
final class DefaultSelectStrategy implements SelectStrategy { static final SelectStrategy INSTANCE = new DefaultSelectStrategy(); private DefaultSelectStrategy() { } public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : -1; } }其中IntSupplier :
public interface IntSupplier { int get() throws Exception; }结合上面的来看,最终的选择策略主要是根据IntSupplier的get值来得到的。
再回到构造:
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, SelectorProvider selectorProvider, SelectStrategyFactory selectStrategyFactory) { super(nThreads, threadFactory, new Object[]{selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()}); }这里产生了一个拒绝策略:
public static RejectedExecutionHandler reject() { return REJECT; } private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() { public void rejected(Runnable task, SingleThreadEventExecutor executor) { throw new RejectedExecutionException(); } }; public interface RejectedExecutionHandler { void rejected(Runnable var1, SingleThreadEventExecutor var2); }将selectorProvider、selectStrategyFactory以及这个拒绝策略封装在一个Object数组里,再调用了父类MultithreadEventLoopGroup的构造:
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }在这里对nThreads的大小进行了调整:
private static final int DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));SystemPropertyUtil.getInt是根据key值"io.netty.eventLoopThreads",获取系统配置值,在没用设置时使用NettyRuntime.availableProcessors() * 2的值 NettyRuntime的availableProcessors实现如下:
synchronized int availableProcessors() { if (this.availableProcessors == 0) { int availableProcessors = SystemPropertyUtil.getInt("io.netty.availableProcessors", Runtime.getRuntime().availableProcessors()); this.setAvailableProcessors(availableProcessors); } return this.availableProcessors; }还是一样,根据key值"io.netty.availableProcessors",获取系统配置值,在没用设置时使用Runtime.getRuntime().availableProcessors(),是用来获取处理器的个数。
这样保证了在默认情况下nThreads的大小是总是cpu个数的2倍。
继续回到构造,MultithreadEventLoopGroup继续调用父类MultithreadEventExecutorGroup的构造:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) { this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }在这里又初始化了一个单例的DefaultEventExecutorChooserFactory对象:
public static final DefaultEventExecutorChooserFactory INSTANCE = new DefaultEventExecutorChooserFactory();DefaultEventExecutorChooserFactory 实现的是EventExecutorChooserFactory接口:
public interface EventExecutorChooserFactory { EventExecutorChooserFactory.EventExecutorChooser newChooser(EventExecutor[] var1); public interface EventExecutorChooser { EventExecutor next(); } }DefaultEventExecutorChooserFactory 的具体实现:
public EventExecutorChooser newChooser(EventExecutor[] executors) { return (EventExecutorChooser)(isPowerOfTwo(executors.length) ? new DefaultEventExecutorChooserFactory.PowerOfTwoEventExecutorChooser(executors) : new DefaultEventExecutorChooserFactory.GenericEventExecutorChooser(executors)); } private static boolean isPowerOfTwo(int val) { return (val & -val) == val; }isPowerOfTwo是用来检查executors的大小是否是二的整数次方,若是二的整数次方,产生PowerOfTwoEventExecutorChooser,反之产生GenericEventExecutorChooser:
private static final class GenericEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; GenericEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } public EventExecutor next() { return this.executors[Math.abs(this.idx.getAndIncrement() % this.executors.length)]; } } private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser { private final AtomicInteger idx = new AtomicInteger(); private final EventExecutor[] executors; PowerOfTwoEventExecutorChooser(EventExecutor[] executors) { this.executors = executors; } public EventExecutor next() { return this.executors[this.idx.getAndIncrement() & this.executors.length - 1]; } }这两种其实都是用了取模运算,只不过因为二的整数次方的特殊性而使用位运算。
回到构造,MultithreadEventExecutorGroup继续调用本省的构造:
private final EventExecutor[] children; private final Set<EventExecutor> readonlyChildren; private final AtomicInteger terminatedChildren; private final Promise<?> terminationFuture; private final EventExecutorChooser chooser; protected MultithreadEventExecutorGroup(int nThreads, Executor executor, EventExecutorChooserFactory chooserFactory, Object... args) { this.terminatedChildren = new AtomicInteger(); this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); if (nThreads <= 0) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads)); } else { if (executor == null) { executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory()); } this.children = new EventExecutor[nThreads]; int j; for(int i = 0; i < nThreads; ++i) { boolean success = false; boolean var18 = false; try { var18 = true; this.children[i] = this.newChild((Executor)executor, args); success = true; var18 = false; } catch (Exception var19) { throw new IllegalStateException("failed to create a child event loop", var19); } finally { if (var18) { if (!success) { int j; for(j = 0; j < i; ++j) { this.children[j].shutdownGracefully(); } for(j = 0; j < i; ++j) { EventExecutor e = this.children[j]; try { while(!e.isTerminated()) { e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var20) { Thread.currentThread().interrupt(); break; } } } } } if (!success) { for(j = 0; j < i; ++j) { this.children[j].shutdownGracefully(); } for(j = 0; j < i; ++j) { EventExecutor e = this.children[j]; try { while(!e.isTerminated()) { e.awaitTermination(2147483647L, TimeUnit.SECONDS); } } catch (InterruptedException var22) { Thread.currentThread().interrupt(); break; } } } } this.chooser = chooserFactory.newChooser(this.children); FutureListener<Object> terminationListener = new FutureListener<Object>() { public void operationComplete(Future<Object> future) throws Exception { if (MultithreadEventExecutorGroup.this.terminatedChildren.incrementAndGet() == MultithreadEventExecutorGroup.this.children.length) { MultithreadEventExecutorGroup.this.terminationFuture.setSuccess((Object)null); } } }; EventExecutor[] var24 = this.children; j = var24.length; for(int var26 = 0; var26 < j; ++var26) { EventExecutor e = var24[var26]; e.terminationFuture().addListener(terminationListener); } Set<EventExecutor> childrenSet = new LinkedHashSet(this.children.length); Collections.addAll(childrenSet, this.children); this.readonlyChildren = Collections.unmodifiableSet(childrenSet); } }首先是对terminatedChildren的初始化,没什么好说的,对terminationFuture的初始化使用DefaultPromise,用来异步处理终止事件。executor初始化产生一个线程池。
接下来就是对children的操作,根据nThreads的大小,产生一个EventExecutor数组,然后遍历这个数组,调用newChild给每一个元素初始化。
newChild是在NioEventLoopGroup中实现的:
protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider)args[0], ((SelectStrategyFactory)args[1]).newSelectStrategy(), (RejectedExecutionHandler)args[2]); }在这里直接使用executor,和之前放在args数组中的SelectorProvider、SelectStrategyFactory(newSelectStrategy方法产生DefaultSelectStrategy)和RejectedExecutionHandler产生了一个NioEventLoop对象:
private Selector selector; private Selector unwrappedSelector; private SelectedSelectionKeySet selectedKeys; private final SelectorProvider provider; private final AtomicBoolean wakenUp = new AtomicBoolean(); private final SelectStrategy selectStrategy; NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } else if (strategy == null) { throw new NullPointerException("selectStrategy"); } else { this.provider = selectorProvider; NioEventLoop.SelectorTuple selectorTuple = this.openSelector(); this.selector = selectorTuple.selector; this.unwrappedSelector = selectorTuple.unwrappedSelector; this.selectStrategy = strategy; } }NioEventLoop首先在继承链上调用父类的构造,都是一些成员的赋值操作,简单看一看:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); this.tailTasks = this.newTaskQueue(maxPendingTasks); } protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.threadLock = new Semaphore(0); this.shutdownHooks = new LinkedHashSet(); this.state = 1; this.terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = (Executor)ObjectUtil.checkNotNull(executor, "executor"); this.taskQueue = this.newTaskQueue(this.maxPendingTasks); this.rejectedExecutionHandler = (RejectedExecutionHandler)ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); } protected AbstractScheduledEventExecutor(EventExecutorGroup parent) { super(parent); } protected AbstractEventExecutor(EventExecutorGroup parent) { this.selfCollection = Collections.singleton(this); this.parent = parent; }在经过这继承链上的一系列调用后,给provider成员赋值selectorProvider,就是之前创建好的WindowsSelectorProvider,然后使用openSelector方法,最终创建JDK原生的Selector:
private NioEventLoop.SelectorTuple openSelector() { final AbstractSelector unwrappedSelector; try { unwrappedSelector = this.provider.openSelector(); } catch (IOException var7) { throw new ChannelException("failed to open a new selector", var7); } if (DISABLE_KEYSET_OPTIMIZATION) { return new NioEventLoop.SelectorTuple(unwrappedSelector); } else { final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { try { return Class.forName("sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable var2) { return var2; } } }); if (maybeSelectorImplClass instanceof Class && ((Class)maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { final Class<?> selectorImplClass = (Class)maybeSelectorImplClass; Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } else { cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } else { selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } } } catch (NoSuchFieldException var4) { return var4; } catch (IllegalAccessException var5) { return var5; } } }); if (maybeException instanceof Exception) { this.selectedKeys = null; Exception e = (Exception)maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new NioEventLoop.SelectorTuple(unwrappedSelector); } else { this.selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); return new NioEventLoop.SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); } } else { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable)maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new NioEventLoop.SelectorTuple(unwrappedSelector); } } }可以看到在一开始就使用provider的openSelector方法,即WindowsSelectorProvider的openSelector方法,创建了WindowsSelectorImpl对象(【Java】NIO中Selector的创建源码分析)
然后根据DISABLE_KEYSET_OPTIMIZATION判断:
private static final boolean DISABLE_KEYSET_OPTIMIZATION = SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);可以看到这个系统配置在没有设置默认是false,如果设置了则直接创建一个SelectorTuple对象返回:
private static final class SelectorTuple { final Selector unwrappedSelector; final Selector selector; SelectorTuple(Selector unwrappedSelector) { this.unwrappedSelector = unwrappedSelector; this.selector = unwrappedSelector; } SelectorTuple(Selector unwrappedSelector, Selector selector) { this.unwrappedSelector = unwrappedSelector; this.selector = selector; } }可以看到仅仅是将unwrappedSelector和selector封装了,unwrappedSelector对应的是JDK原生Selector没有经过更改的,而selector对应的是经过更改修饰操作的。
在没有系统配置下,就对Selector进行更改修饰操作: 首先创建SelectedSelectionKeySet对象,这个SelectedSelectionKeySet继承自AbstractSet:
final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys = new SelectionKey[1024]; int size; SelectedSelectionKeySet() { } ...... }后面是通过反射机制,使得WindowsSelectorImpl的selectedKeys和publicSelectedKeys成员直接赋值为SelectedSelectionKeySet对象。
WindowsSelectorImpl的这两个成员是在SelectorImpl中定义的:
protected Set<SelectionKey> selectedKeys = new HashSet(); private Set<SelectionKey> publicSelectedKeys;从这里就可以明白,在JDK原生的Selector中,selectedKeys和publicSelectedKeys这两个Set的初始化大小都为0,而在这里仅仅就是使其初始化大小变为1024。 后面就是对一些异常的处理,没什么好说的。
openSelector结束后,就可以分别对包装过的Selector和未包装过的Selector,即selector和unwrappedSelector成员赋值,再由selectStrategy保存刚才产生的选择策略,用于Selector的轮询。
回到MultithreadEventExecutorGroup的构造,在调用newChild方法时即NioEventLoop创建的过程中可能出现异常情况,就需要遍历children数组,将之前创建好的NioEventLoop使用shutdownGracefully优雅地关闭掉: shutdownGracefully在AbstractEventExecutor中实现:
public Future<?> shutdownGracefully() { return this.shutdownGracefully(2L, 15L, TimeUnit.SECONDS); }这里设置了超时时间,继续调用SingleThreadEventExecutor的shutdownGracefully方法:
public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) { if (quietPeriod < 0L) { throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)"); } else if (timeout < quietPeriod) { throw new IllegalArgumentException("timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))"); } else if (unit == null) { throw new NullPointerException("unit"); } else if (this.isShuttingDown()) { return this.terminationFuture(); } else { boolean inEventLoop = this.inEventLoop(); while(!this.isShuttingDown()) { boolean wakeup = true; int oldState = this.state; int newState; if (inEventLoop) { newState = 3; } else { switch(oldState) { case 1: case 2: newState = 3; break; default: newState = oldState; wakeup = false; } } if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { this.gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod); this.gracefulShutdownTimeout = unit.toNanos(timeout); if (oldState == 1) { try { this.doStartThread(); } catch (Throwable var10) { STATE_UPDATER.set(this, 5); this.terminationFuture.tryFailure(var10); if (!(var10 instanceof Exception)) { PlatformDependent.throwException(var10); } return this.terminationFuture; } } if (wakeup) { this.wakeup(inEventLoop); } return this.terminationFuture(); } } return this.terminationFuture(); } }前三个判断没什么好说的,isShuttingDown判断:
public boolean isShuttingDown() { return this.state >= 3; }在之前NioEventLoop创建的时候,调用了一系列的继承链,其中state是在SingleThreadEventExecutor的构造方法中实现的,初始值是1,state有如下几种状态:
private static final int ST_NOT_STARTED = 1; private static final int ST_STARTED = 2; private static final int ST_SHUTTING_DOWN = 3; private static final int ST_SHUTDOWN = 4; private static final int ST_TERMINATED = 5;可见在NioEventLoop初始化后处于尚未启动状态,并没有Channel的注册,也就不需要轮询。
isShuttingDown就必然是false,就进入了else块: 首先得到inEventLoop的返回值,该方法在AbstractEventExecutor中实现:
public boolean inEventLoop() { return this.inEventLoop(Thread.currentThread()); }他传入了一个当前线程,接着调用inEventLoop的重载,这个方法是在SingleThreadEventExecutor中实现:
public boolean inEventLoop(Thread thread) { return thread == this.thread; }通过观察之前的SingleThreadEventExecutor构造方法,发现并没有对thread成员初始化,此时的this.thread就是null,那么返回值就是false,即inEventLoop为false。
在while循环中又对isShuttingDown进行了判断,shutdownGracefully当让不仅仅使用在创建NioEventLoop对象失败时才调用的,无论是在EventLoopGroup的关闭,还是Bootstrap的关闭,都会关闭绑定的NioEventLoop,所以在多线程环境中,有可能会被其他线程关闭。
在while循环中,结合上面可知满足进入switch块,在switch块中令newState为3; 然后调用STATE_UPDATER的compareAndSet方法,STATE_UPDATER是用来原子化更新state成员的:
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(SingleThreadEventExecutor.class, "state");所以这里就是使用CAS操作,原子化更新state成员为3,也就是使当前状态由ST_NOT_STARTED 变为了ST_SHUTTING_DOWN 状态。
gracefulShutdownQuietPeriod和gracefulShutdownTimeout分别保存quietPeriod和timeout的纳秒级颗粒度。
前面可知oldState使1,调用doStartThread方法:
private void doStartThread() { assert this.thread == null; this.executor.execute(new Runnable() { public void run() { SingleThreadEventExecutor.this.thread = Thread.currentThread(); if (SingleThreadEventExecutor.this.interrupted) { SingleThreadEventExecutor.this.thread.interrupt(); } boolean success = false; SingleThreadEventExecutor.this.updateLastExecutionTime(); boolean var112 = false; int oldState; label1685: { try { var112 = true; SingleThreadEventExecutor.this.run(); success = true; var112 = false; break label1685; } catch (Throwable var119) { SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119); var112 = false; } finally { if (var112) { int oldStatex; do { oldStatex = SingleThreadEventExecutor.this.state; } while(oldStatex < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldStatex, 3)); if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) { SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); } try { while(!SingleThreadEventExecutor.this.confirmShutdown()) { ; } } finally { try { SingleThreadEventExecutor.this.cleanup(); } finally { SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); SingleThreadEventExecutor.this.threadLock.release(); if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) { SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); } SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); } } } } do { oldState = SingleThreadEventExecutor.this.state; } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3)); if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) { SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); } try { while(!SingleThreadEventExecutor.this.confirmShutdown()) { ; } return; } finally { try { SingleThreadEventExecutor.this.cleanup(); } finally { SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); SingleThreadEventExecutor.this.threadLock.release(); if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) { SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); } SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); } } } do { oldState = SingleThreadEventExecutor.this.state; } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3)); if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L) { SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); } try { while(!SingleThreadEventExecutor.this.confirmShutdown()) { ; } } finally { try { SingleThreadEventExecutor.this.cleanup(); } finally { SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); SingleThreadEventExecutor.this.threadLock.release(); if (!SingleThreadEventExecutor.this.taskQueue.isEmpty()) { SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); } SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); } } } }); }刚才说过this.thread并没有初始化,所以等于null成立,断言可以继续。
然后直接使executor运行了一个线程,这个executor其实就是在刚才的MultithreadEventExecutorGroup中产生的ThreadPerTaskExecutor对象。
在线程中,首先将SingleThreadEventExecutor的thread成员初始化为当前线程。
在这里可能就有疑问了,为什么会在关闭时会调用名为doStartThread的方法,这个方法不因该在启动时调用吗? 其实doStartThread在启动时是会被调用的,当在启动时被调用的话,每一个NioEventLoop都会被绑定一个线程用来处理真正的Selector操作,根据之前的说法就可以知道,每个EventLoopGroup在创建后都会被绑定cpu个数的二倍个NioEventLoop,而每个NioEventLoop都会绑定一个Selector对象,上面又说了在启动时SingleThreadEventExecutor绑定了一个线程,即NioEventLoop绑定了一个线程来处理其绑定的Selector的轮询。 至于关闭时还会启动Selector的轮询,就是为了解决注册了的Channel没有被处理的情况。
回到doStartThread方法,其实这个doStartThread方法的核心是SingleThreadEventExecutor.this.run(),这个方法就是正真的Selector的轮询操作,在NioEventLoop中实现:
protected void run() { while(true) { while(true) { try { switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) { case -2: continue; case -1: this.select(this.wakenUp.getAndSet(false)); if (this.wakenUp.get()) { this.selector.wakeup(); } default: this.cancelledKeys = 0; this.needsToSelectAgain = false; int ioRatio = this.ioRatio; if (ioRatio == 100) { try { this.processSelectedKeys(); } finally { this.runAllTasks(); } } else { long ioStartTime = System.nanoTime(); boolean var13 = false; try { var13 = true; this.processSelectedKeys(); var13 = false; } finally { if (var13) { long ioTime = System.nanoTime() - ioStartTime; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } } long ioTime = System.nanoTime() - ioStartTime; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } } } catch (Throwable var21) { handleLoopException(var21); } try { if (this.isShuttingDown()) { this.closeAll(); if (this.confirmShutdown()) { return; } } } catch (Throwable var18) { handleLoopException(var18); } } } }进入switch块,首先调用之前准备好的选择策略,其中this.selectNowSupplier在NioEventLoop创建的时候就被创建了:
private final IntSupplier selectNowSupplier = new IntSupplier() { public int get() throws Exception { return NioEventLoop.this.selectNow(); } };实际上调用了selectNow方法:
int selectNow() throws IOException { int var1; try { var1 = this.selector.selectNow(); } finally { if (this.wakenUp.get()) { this.selector.wakeup(); } } return var1; }这里就直接调用了JDK原生的selectNow方法。 之前说过的选择策略:
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception { return hasTasks ? selectSupplier.get() : -1; }其中hasTasks是根据hasTasks方法来判断,而hasTasks方法就是判断任务队列是否为空,那么在一开始初始化,必然是空的,所以这里calculateStrategy的返回值就是-1;
那么case为-1条件成立,执行this.select(this.wakenUp.getAndSet(false)),其中wakenUp是一个原子化的Boolean,用来表示是需要唤醒Selector的轮询阻塞,初始化是为true,这里通过CAS操作设置为false代表不需要唤醒,后面在select执行完后,又判断wakenUp是否需要唤醒,说明在select中对Selector的阻塞进行了检查,若是需要唤醒,就通过Selector的原生API完成唤醒【Java】NIO中Selector的select方法源码分析
来看看这里的select实现:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + this.delayNanos(currentTimeNanos); while(true) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0L) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (this.hasTasks() && this.wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } int selectedKeys = selector.select(timeoutMillis); ++selectCnt; if (selectedKeys != 0 || oldWakenUp || this.wakenUp.get() || this.hasTasks() || this.hasScheduledTasks()) { break; } if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because Thread.currentThread().interrupt() was called. Use NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } selectCnt = 1; break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); this.rebuildSelector(); selector = this.selector; selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } if (selectCnt > 3 && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } } catch (CancelledKeyException var13) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, var13); } } }这个方法虽然看着很长,但核心就是判断这个存放任务的阻塞队列是否还有任务,若是有,就直接调用Selector的selectNow方法获取就绪的文件描述符,若是没有就绪的文件描述符该方法也会立即返回;若是阻塞队列中没有任务,就调用Selector的select(timeout)方法,尝试在超时时间内取获取就绪的文件描述符。
因为现在是在执行NioEventLoopGroup的创建,并没有Channel的注册,也就没有轮询到任何文件描述符就绪。 在轮询结束后,回到run方法,进入default块: 其中ioRatio是执行IO操作和执行任务队列的任务用时比率,默认是50。若是ioRatio设置为100,就必须等到tasks阻塞队列中的所有任务执行完毕才再次进行轮询;若是小于100,那么就根据(100 - ioRatio) / ioRatio的比值乘以ioTime计算出的超时时间让所有任务尝试在超时时间内执行完毕,若是到达超时时间还没执行完毕,就在下一轮的轮询中执行。
processSelectedKeys方法就是获取Selector轮询的SelectedKeys结果:
private void processSelectedKeys() { if (this.selectedKeys != null) { this.processSelectedKeysOptimized(); } else { this.processSelectedKeysPlain(this.selector.selectedKeys()); } }selectedKeys 在openSelector时被初始化过了,若是在openSelector中出现异常selectedKeys才会为null。
processSelectedKeysOptimized方法:
private void processSelectedKeysOptimized() { for(int i = 0; i < this.selectedKeys.size; ++i) { SelectionKey k = this.selectedKeys.keys[i]; this.selectedKeys.keys[i] = null; Object a = k.attachment(); if (a instanceof AbstractNioChannel) { this.processSelectedKey(k, (AbstractNioChannel)a); } else { NioTask<SelectableChannel> task = (NioTask)a; processSelectedKey(k, task); } if (this.needsToSelectAgain) { this.selectedKeys.reset(i + 1); this.selectAgain(); i = -1; } } }这里就通过遍历在openSelector中注入进Selector的SelectedKeys,得到SelectionKey 对象。 在这里可以看到Netty很巧妙地通过SelectionKey的attachment附件,将JDK中的Channel和Netty中的Channel联系了起来。 根据得到的附件Channel的类型,执行不同的processSelectedKey方法,去处理IO操作。
processSelectedKey(SelectionKey k, AbstractNioChannel ch)方法:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { NioEventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable var6) { return; } if (eventLoop == this && eventLoop != null) { unsafe.close(unsafe.voidPromise()); } } else { try { int readyOps = k.readyOps(); if ((readyOps & 8) != 0) { int ops = k.interestOps(); ops &= -9; k.interestOps(ops); unsafe.finishConnect(); } if ((readyOps & 4) != 0) { ch.unsafe().forceFlush(); } if ((readyOps & 17) != 0 || readyOps == 0) { unsafe.read(); } } catch (CancelledKeyException var7) { unsafe.close(unsafe.voidPromise()); } } }这里的主要核心就是根据SelectedKey的readyOps值来判断,处理不同的就绪事件,有如下几种事件:
public static final int OP_READ = 1 << 0; public static final int OP_WRITE = 1 << 2; public static final int OP_CONNECT = 1 << 3; public static final int OP_ACCEPT = 1 << 4;结合来看上面的判断就对应:连接就绪、写就绪、侦听或者读就绪(或者缺省状态0,该状态是未来注册时的默认状态,后续博客会介绍),交由Netty的AbstractNioChannel的NioUnsafe去处理不同事件的byte数据,NioUnsafe会将数据再交由ChannelPipeline双向链表去处理。 关于ChannelPipeline会在后续的博客中详细介绍。
processSelectedKey(SelectionKey k, NioTask task)这个方法的实现细节需要由使用者实现NioTask接口,就不说了。
回到processSelectedKeys方法,在this.selectedKeys等于null的情况下:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) { if (!selectedKeys.isEmpty()) { Iterator i = selectedKeys.iterator(); while(true) { SelectionKey k = (SelectionKey)i.next(); Object a = k.attachment(); i.remove(); if (a instanceof AbstractNioChannel) { this.processSelectedKey(k, (AbstractNioChannel)a); } else { NioTask<SelectableChannel> task = (NioTask)a; processSelectedKey(k, task); } if (!i.hasNext()) { break; } if (this.needsToSelectAgain) { this.selectAgain(); selectedKeys = this.selector.selectedKeys(); if (selectedKeys.isEmpty()) { break; } i = selectedKeys.iterator(); } } } }这是在openSelector中注入进Selector的SelectedKeys失败的情况下,直接遍历Selector本身的SelectedKeys,和processSelectedKeysOptimized没有差别。
继续回到run方法,在调用完processSelectedKeys方法后,就需要调用runAllTasks处理任务队列中的任务: runAllTasks()方法:
protected boolean runAllTasks() { assert this.inEventLoop(); boolean ranAtLeastOne = false; boolean fetchedAll; do { fetchedAll = this.fetchFromScheduledTaskQueue(); if (this.runAllTasksFrom(this.taskQueue)) { ranAtLeastOne = true; } } while(!fetchedAll); if (ranAtLeastOne) { this.lastExecutionTime = ScheduledFutureTask.nanoTime(); } this.afterRunningAllTasks(); return ranAtLeastOne; }首先调用fetchFromScheduledTaskQueue方法:
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for(Runnable scheduledTask = this.pollScheduledTask(nanoTime); scheduledTask != null; scheduledTask = this.pollScheduledTask(nanoTime)) { if (!this.taskQueue.offer(scheduledTask)) { this.scheduledTaskQueue().add((ScheduledFutureTask)scheduledTask); return false; } } return true; }这里就是通过pollScheduledTask不断地从延时任务队列获取到期的任务,将到期的任务添加到taskQueue任务队列中,为上面的runAllTasksFrom执行做准备;若是添加失败,再把它放进延时任务队列。
pollScheduledTask方法:
protected final Runnable pollScheduledTask(long nanoTime) { assert this.inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : (ScheduledFutureTask)scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } else if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } else { return null; } }从延时任务队列中获取队首的任务scheduledTask,若是scheduledTask的deadlineNanos小于等于nanoTime,说明该任务到期。
回到runAllTasks,将到期了的延时任务放在了任务队列,由runAllTasksFrom执行这些任务:
protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) { Runnable task = pollTaskFrom(taskQueue); if (task == null) { return false; } else { do { safeExecute(task); task = pollTaskFrom(taskQueue); } while(task != null); return true; } }不断地从任务队列队首获取任务,然后执行,直到没有任务。
pollTaskFrom是获取队首任务:
protected static Runnable pollTaskFrom(Queue<Runnable> taskQueue) { Runnable task; do { task = (Runnable)taskQueue.poll(); } while(task == WAKEUP_TASK); return task; }其中WAKEUP_TASK,是用来巧妙地控制循环:
private static final Runnable WAKEUP_TASK = new Runnable() { public void run() { } };safeExecute是执行任务:
protected static void safeExecute(Runnable task) { try { task.run(); } catch (Throwable var2) { logger.warn("A task raised an exception. Task: {}", task, var2); } }实际上就是执行Runnable 的run方法。
继续回到runAllTasks方法,当所有到期任务执行完毕后,根据ranAtLeastOne判断是否需要修改最后一次执行时间lastExecutionTime,最后调用afterRunningAllTasks方法,该方法是在SingleThreadEventLoop中实现的:
protected void afterRunningAllTasks() { this.runAllTasksFrom(this.tailTasks); }这里就仅仅执行了tailTasks队列中的任务。runAllTasks到这里执行完毕。
再来看看runAllTasks(timeoutNanos)方法:
protected boolean runAllTasks(long timeoutNanos) { this.fetchFromScheduledTaskQueue(); Runnable task = this.pollTask(); if (task == null) { this.afterRunningAllTasks(); return false; } else { long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos; long runTasks = 0L; long lastExecutionTime; while(true) { safeExecute(task); ++runTasks; if ((runTasks & 63L) == 0L) { lastExecutionTime = ScheduledFutureTask.nanoTime(); if (lastExecutionTime >= deadline) { break; } } task = this.pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); break; } } this.afterRunningAllTasks(); this.lastExecutionTime = lastExecutionTime; return true; } }这个方法前面的runAllTasks方法类似,先通过fetchFromScheduledTaskQueue将所有到期了的延时任务放在taskQueue中,然后不断从taskQueue队首获取任务,但是,若是执行到了到超过了63个任务,判断是否达到了超时时间deadline,若是达到结束循环,留着下次执行,反之继续循环执行任务。
回到run方法,在轮询完毕,并且执行完任务后,通过isShuttingDown判断当前状态,在之前的CAS操作中,state已经变为了3,所以isShuttingDown成立,就需要调用closeAll方法:
private void closeAll() { this.selectAgain(); Set<SelectionKey> keys = this.selector.keys(); Collection<AbstractNioChannel> channels = new ArrayList(keys.size()); Iterator var3 = keys.iterator(); while(var3.hasNext()) { SelectionKey k = (SelectionKey)var3.next(); Object a = k.attachment(); if (a instanceof AbstractNioChannel) { channels.add((AbstractNioChannel)a); } else { k.cancel(); NioTask<SelectableChannel> task = (NioTask)a; invokeChannelUnregistered(task, k, (Throwable)null); } } var3 = channels.iterator(); while(var3.hasNext()) { AbstractNioChannel ch = (AbstractNioChannel)var3.next(); ch.unsafe().close(ch.unsafe().voidPromise()); } }在这里首先调用selectAgain进行一次轮询:
private void selectAgain() { this.needsToSelectAgain = false; try { this.selector.selectNow(); } catch (Throwable var2) { logger.warn("Failed to update SelectionKeys.", var2); } }通过这次的轮询,将当前仍有事件就绪的JDK的SelectionKey中绑定的Netty的Channel添加到channels集合中,遍历这个集合,通过unsafe的close方法关闭Netty的Channel。
之后调用confirmShutdown方法:
protected boolean confirmShutdown() { if (!this.isShuttingDown()) { return false; } else if (!this.inEventLoop()) { throw new IllegalStateException("must be invoked from an event loop"); } else { this.cancelScheduledTasks(); if (this.gracefulShutdownStartTime == 0L) { this.gracefulShutdownStartTime = ScheduledFutureTask.nanoTime(); } if (!this.runAllTasks() && !this.runShutdownHooks()) { long nanoTime = ScheduledFutureTask.nanoTime(); if (!this.isShutdown() && nanoTime - this.gracefulShutdownStartTime <= this.gracefulShutdownTimeout) { if (nanoTime - this.lastExecutionTime <= this.gracefulShutdownQuietPeriod) { this.wakeup(true); try { Thread.sleep(100L); } catch (InterruptedException var4) { ; } return false; } else { return true; } } else { return true; } } else if (this.isShutdown()) { return true; } else if (this.gracefulShutdownQuietPeriod == 0L) { return true; } else { this.wakeup(true); return false; } } }首先调用cancelScheduledTasks,取消所有的延时任务:
protected void cancelScheduledTasks() { assert this.inEventLoop(); PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; if (!isNullOrEmpty(scheduledTaskQueue)) { ScheduledFutureTask<?>[] scheduledTasks = (ScheduledFutureTask[])scheduledTaskQueue.toArray(new ScheduledFutureTask[scheduledTaskQueue.size()]); ScheduledFutureTask[] var3 = scheduledTasks; int var4 = scheduledTasks.length; for(int var5 = 0; var5 < var4; ++var5) { ScheduledFutureTask<?> task = var3[var5]; task.cancelWithoutRemove(false); } scheduledTaskQueue.clearIgnoringIndexes(); } }遍历scheduledTasks这个延时任务对立中所有的任务,通过cancelWithoutRemove将该任务取消。
至此轮询的整个生命周期完成。
回到SingleThreadEventExecutor的doStartThread方法,在run方法执行完毕后,说明Selector轮询结束,调用SingleThreadEventExecutor.this.cleanup()方法关闭Selector:
protected void cleanup() { try { this.selector.close(); } catch (IOException var2) { logger.warn("Failed to close a selector.", var2); } }这次终于可以回到MultithreadEventExecutorGroup的构造,在children创建完毕后,用chooserFactory根据children的大小创建chooser,前面说过。
然后产生terminationListener异步中断监听对象,给每个NioEventLoop设置中断监听,然后对children进行了备份处理,通过readonlyChildren保存。
至此NioEventLoopGroup的创建全部结束。