步骤说明
1.1 创建 ServerBootstrap 实例,它是 netty 的启动辅助类,提供了一系列的方法用于设置服务 端启动相关的参数。底层通过门面模式对各种能力进行抽象和封装,尽量不需要用户跟过 多的底层 API 打交道,降低用户的开发难度
1.2 NioEventLoopGroup 是 netty Reactor 线程池,bossGroup 监听和 accept 客户端连接,workGroup 则处理 IO ,编解码
1.3 绑定服务端 NioServerSocketChannel
1.4 设置一些参数
1.5 初始化 pipeline 并绑定 handler ,pipeline 是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler ,设置系统提供的 IdleStateHandler 和自定义 IOHandler
1.6 serverBootstrap.bind(8802) 这里才是启动服务端绑定端口
1.7 future.channel().closeFuture().sync(); 等待服务端关闭
1.8 优雅关闭
NioEventLoopGroup 不仅仅是 I/O 线程,除了负责 I/O 的读写,还负责系统 Task 和定时任务
public NioEventLoopGroup(int nThreads) { this(nThreads, null); } public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) { this(nThreads, threadFactory, SelectorProvider.provider()); } public NioEventLoopGroup( int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) { super(nThreads, threadFactory, selectorProvider); } protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) { super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args); }继续,以下是精简代码
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) { ... if (threadFactory == null) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0; i < nThreads; i ++) { ... children[i] = newChild(threadFactory, args); ... }MultithreadEventExecutorGroup 实现了线程的创建和线程的选择,我们看看 newChild 方法( NioEventLoopGroup 类的方法),newChild 实例化线程
@Override protected EventExecutor newChild( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]); }创建了一个 NioEventLoop
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) { super(parent, threadFactory, false); if (selectorProvider == null) { throw new NullPointerException("selectorProvider"); } provider = selectorProvider; selector = openSelector(); }跟着 super
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { super(parent, threadFactory, addTaskWakesUp); }代码有精简,继续
protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { thread = threadFactory.newThread(new Runnable() { @Override public void run() { SingleThreadEventExecutor.this.run(); } } });在这里实例化了一个线程,并在 run 中调用 SingleThreadEventExecutor 的 run 方法,这个线程在哪里启动的呢,我们继续往下看 总结: NioEventLoopGroup 实际就是 Reactor 线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。
ServerBootstrap 是服务端的启动辅助类,父类是 AbstractBootstrap ,与之相对应的客户端启动辅助类是 Bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { volatile EventLoopGroup group; private volatile ChannelFactory<? extends C> channelFactory; private volatile SocketAddress localAddress; private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>(); private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>(); private volatile ChannelHandler handler; }将 bossGroup 传给父类,workGroup 赋值给 serverBootstrap 的 childGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }继续跟 new BootstrapChannelFactory
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; BootstrapChannelFactory(Class<? extends T> clazz) { this.clazz = clazz; } @Override public T newChannel() { try { return clazz.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + clazz, t); } } }BootstrapChannelFactory 是一个继承了 ChannelFactory 的内部类,从名称上就能看出,这是一个 channel 工厂类,重写了父类的 newChannel 方法,通过反射创建 NioServerSocketChannel 实例,后面会告诉你是在哪里调用到的
这里的 option 方法是父类 AbstractBootstrap 的方法,options 是一个有序的非线程安全的双向链表,加锁添加
childOption 是子类 serverBootstrap 的方法 childOption 和 option 的区别: option : 主要是设置 ServerChannel 的一些选项 childOption : 主要设置 ServerChannel 的子 channel 的选项,即 option 针对的是 boss 线程而 childOption 针对的是 work 线程池
handler 和 childHandler 的区别 Handler 是属于服务端 NioServerSocketChannel ,只会创建一次 childHandler 是属于每一个新建的 NioSocketChannel ,每当有一个连接上来,都会调用
channelFactory 是 serverBootstrap.channel() 时创建的,在这里调用反射创建 NioServerSocketChannel 实例
(3.2.1) 再看 init(channel) 方法( ServerBootstrap 类) @Override void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options(); synchronized (options) { channel.config().setOptions(options); } final Map<AttributeKey<?>, Object> attrs = attrs(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) { @SuppressWarnings("unchecked") AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry<ChannelOption<?>, Object>[] currentChildOptions; final Entry<AttributeKey<?>, Object>[] currentChildAttrs; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); }options() 是 serverBootstrap.option() 赋值的 AbstractBootstrap 类的 options 双向链表成员变量,在这里将 options 和 attrs 注入 channel 中 P.addLast() 为 NioServerSocketChannel 加入新的 handler (处理器),这里 pipeline 类似于 Servlet 的过滤器,管理所有 handler
(3.2.2) 再看 group().register() 方法 这里的 group 是 bossGroup(NioEventLoopGroup----▷MultithreadEventLoopGroup) ,多次跳转到 SingleThreadEventLoop 类的 register() 方法 @Override public ChannelFuture register(final Channel channel, final ChannelPromise promise) { if (channel == null) { throw new NullPointerException("channel"); } if (promise == null) { throw new NullPointerException("promise"); } channel.unsafe().register(this, promise); return promise; } (3.2.3) 清除一些不重要的代码,下面才是真正的注册 @Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { } } }eventLoop.inEventLoop() 用来判断启动线程与当前线程是否相同,相同表示已经启动,不同则有两种可能:未启动或者线程不同。
(3.2.4) 这里线程还未启动,走 eventLoop.execute() ,这个 execute() 方法是 SingleThreadEventExecutor 类的 @Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } } (3.2.5) 启动线程 private void startThread() { if (STATE_UPDATER.get(this) == ST_NOT_STARTED) { if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) { thread.start(); } } }我们在最开始2.1里面 SingleThreadEventExecutor 构造方法内的 thread 就是在这里启动的,我们再回到2.1的
protected SingleThreadEventExecutor( EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { thread = threadFactory.newThread(new Runnable() { @Override public void run() { SingleThreadEventExecutor.this.run(); } } }); (3.2.6) 打开 SingleThreadEventExecutor.this.run() ; @Override protected void run() { for (;;) { boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { try { Thread.sleep(1000); } catch (InterruptedException e) { } } } }在这里异步执行,轮询 select 客户端的 accept ,并且 runAllTasks 所有的任务
(3.3) 我们再看**(3.1)**里面的 ChannelFuture regFuture = group().register(channel); 跳转到 SingleThreadEventLoop 的 register 方法 @Override public ChannelFuture register(Channel channel) { ... channel.unsafe().register(this, promise); return promise; }以下是精简后的代码(位于 AbstractChannel 类的 AbstractUnsafe 内部类)
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { ... eventLoop.execute(new OneTimeTask() { @Override public void run() { register0(promise); } }); ... } private void register0(ChannelPromise promise) { ... doRegister(); ... }继续(位于 AbstractNioChannel 类)
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { ... selectionKey = javaChannel().register(eventLoop().selector, 0, this); ... } }将 NioServerSocketChannel 注册到 NioEventLoop 的 Selector 上。 在这里应该注册 OP_ACCEPT(16) 到多路复用器上 注册0的原因: (1)注册方法是多态的,它既可以被 NioServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或写操作 (2)通过 SelectionKey 的 interestOps(int ops) 方法可以方便地修改监听操作位
(4) 再看 doBind0( ) 方法 private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }将方法丢到 reactor 线程池任务队列中执行,会先判断注册是否成功,成功则继续执行bind方法
(5) 执行 bind( ) 方法 @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); } @Override public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return tail.bind(localAddress, promise); } @Override public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) { ... final AbstractChannelHandlerContext next = findContextOutbound(); EventExecutor executor = next.executor(); ... next.invokeBind(localAddress, promise); ... }由于 bind 事件是出站事件,寻找出站的 handler ,执行 invokeBind( ) 方法
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) { try { ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); } catch (Throwable t) { notifyOutboundHandlerException(t, promise); } } @Override public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); } @Override public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { ... doBind(localAddress); ... } @Override protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog()); }经过多层 bind 深入,最后在这里可以看到,还是会调用Java底层的nio进行 socket bind 自此,服务端启动流程解析完毕,我们总结一下 ① 通过 ServerBootstrap 辅助启动类,配置了 reactor 线程池,服务端 Channel ,一些配置参数,客户端连接后的 handler ② 将 ServerBootstrap 的值初始化,并注册 OP_ACCEPT 到多路复用器 ③ 启动 reactor 线程池,不断循环监听连接,处理任务 ④ 绑定端口
作者:陶章好 链接:https://mp.csdn.net/postedit/90489321
