Netty服务端的启动源码分析

    xiaoxiao2025-12-30  10

    ServerBootstrap的构造:

    public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> { private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class); private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap(); private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap(); private final ServerBootstrapConfig config = new ServerBootstrapConfig(this); private volatile EventLoopGroup childGroup; private volatile ChannelHandler childHandler; public ServerBootstrap() { } ...... }

    隐式地执行了父类的无参构造:

    public abstract class AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel> implements Cloneable { volatile EventLoopGroup group; private volatile ChannelFactory<? extends C> channelFactory; private volatile SocketAddress localAddress; private final Map<ChannelOption<?>, Object> options = new LinkedHashMap(); private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap(); private volatile ChannelHandler handler; AbstractBootstrap() { } ...... }

    只是初始化了几个容器成员

    在ServerBootstrap创建后,需要调用group方法,绑定EventLoopGroup,有关EventLoopGroup的创建在我之前博客中写过:Netty中NioEventLoopGroup的创建源码分析

    ServerBootstrap的group方法:

    public ServerBootstrap group(EventLoopGroup group) { return this.group(group, group); } public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } else if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } else { this.childGroup = childGroup; return this; } }

    首先调用父类的group方法绑定parentGroup:

    public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } else if (this.group != null) { throw new IllegalStateException("group set already"); } else { this.group = group; return this.self(); } } private B self() { return this; }

    将传入的parentGroup绑定给AbstractBootstrap的group成员,将childGroup绑定给ServerBootstrap的childGroup成员。 group的绑定仅仅是交给了成员保存。

    再来看看ServerBootstrap的channel方法,,是在AbstractBootstrap中实现的:

    public B channel(Class<? extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } else { return this.channelFactory((io.netty.channel.ChannelFactory)(new ReflectiveChannelFactory(channelClass))); } }

    使用channelClass构建了一个ReflectiveChannelFactory对象:

    public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> { private final Class<? extends T> clazz; public ReflectiveChannelFactory(Class<? extends T> clazz) { if (clazz == null) { throw new NullPointerException("clazz"); } else { this.clazz = clazz; } } public T newChannel() { try { return (Channel)this.clazz.getConstructor().newInstance(); } catch (Throwable var2) { throw new ChannelException("Unable to create Channel from class " + this.clazz, var2); } } public String toString() { return StringUtil.simpleClassName(this.clazz) + ".class"; } }

    可以看到ReflectiveChannelFactory的作用就是通过反射机制,产生clazz的实例(这里以NioServerSocketChannel为例)。

    在创建完ReflectiveChannelFactory对象后, 调用channelFactory方法:

    public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) { return this.channelFactory((ChannelFactory)channelFactory); } public B channelFactory(ChannelFactory<? extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } else if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } else { this.channelFactory = channelFactory; return this.self(); } }

    将刚才创建的ReflectiveChannelFactory对象交给channelFactory成员,用于后续服务端NioServerSocketChannel的创建。

    再来看ServerBootstrap的childHandler方法:

    public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } else { this.childHandler = childHandler; return this; } }

    还是交给了childHandler成员保存,可以看到上述这一系列的操作,都是为了填充ServerBootstrap,而ServerBootstrap真正的启动是在bind时: ServerBootstrap的bind方法,在AbstractBootstrap中实现:

    public ChannelFuture bind(int inetPort) { return this.bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(String inetHost, int inetPort) { return this.bind(SocketUtils.socketAddress(inetHost, inetPort)); } public ChannelFuture bind(InetAddress inetHost, int inetPort) { return this.bind(new InetSocketAddress(inetHost, inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { this.validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } else { return this.doBind(localAddress); } }

    可以看到首先调用了ServerBootstrap的validate方法,:

    public ServerBootstrap validate() { super.validate(); if (this.childHandler == null) { throw new IllegalStateException("childHandler not set"); } else { if (this.childGroup == null) { logger.warn("childGroup is not set. Using parentGroup instead."); this.childGroup = this.config.group(); } return this; } }

    先调用了AbstractBootstrap的validate方法:

    public B validate() { if (this.group == null) { throw new IllegalStateException("group not set"); } else if (this.channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } else { return this.self(); } }

    这个方法就是用来检查是否绑定了group和channel以及childHandler,所以在执行bind方法前,无论如何都要执行group、channel和childHandler方法。

    实际的bind交给了doBind来完成:

    private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = this.initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } else if (regFuture.isDone()) { ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { final AbstractBootstrap.PendingRegistrationPromise promise = new AbstractBootstrap.PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { promise.setFailure(cause); } else { promise.registered(); AbstractBootstrap.doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }

    首先调用initAndRegister,完成ServerSocketChannel的创建以及注册:

    final ChannelFuture initAndRegister() { Channel channel = null; try { channel = this.channelFactory.newChannel(); this.init(channel); } catch (Throwable var3) { if (channel != null) { channel.unsafe().closeForcibly(); return (new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE)).setFailure(var3); } return (new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE)).setFailure(var3); } ChannelFuture regFuture = this.config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; }

    首先调用channelFactory的newChannel通过反射机制构建Channel实例,也就是NioServerSocketChannel,

    NioServerSocketChannel的无参构造:

    public class NioServerSocketChannel extends AbstractNioMessageChannel implements ServerSocketChannel { private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider(); public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } ...... }

    SelectorProvider 是JDK的,关于SelectorProvider在我之前的博客中有介绍:【Java】NIO中Selector的创建源码分析 在Windows系统下默认产生WindowsSelectorProvider,即DEFAULT_SELECTOR_PROVIDER,再来看看newSocket方法:

    private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) { try { return provider.openServerSocketChannel(); } catch (IOException var2) { throw new ChannelException("Failed to open a server socket.", var2); } }

    使用WindowsSelectorProvider创建了一个ServerSocketChannelImpl,其实看到这里就明白了,NioServerSocketChannel是为了封装JDK的ServerSocketChannel

    接着调用另一个重载的构造:

    public NioServerSocketChannel(java.nio.channels.ServerSocketChannel channel) { super((Channel)null, channel, 16); this.config = new NioServerSocketChannel.NioServerSocketChannelConfig(this, this.javaChannel().socket()); }

    首先调用父类的三参构造,其中16对应的是JDK中SelectionKey的ACCEPT状态:

    public static final int OP_ACCEPT = 1 << 4;

    其父类的构造处于一条继承链上:

    AbstractNioMessageChannel:

    protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }

    AbstractNioChannel:

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException var7) { try { ch.close(); } catch (IOException var6) { if (logger.isWarnEnabled()) { logger.warn("Failed to close a partially initialized socket.", var6); } } throw new ChannelException("Failed to enter non-blocking mode.", var7); } }

    AbstractChannel:

    private final ChannelId id; private final Channel parent; private final Unsafe unsafe; private final DefaultChannelPipeline pipeline; protected AbstractChannel(Channel parent) { this.parent = parent; this.id = this.newId(); this.unsafe = this.newUnsafe(); this.pipeline = this.newChannelPipeline(); }

    在AbstractChannel中使用newUnsafe和newChannelPipeline分别创建了一个Unsafe和一个DefaultChannelPipeline对象, 在前面的博客介绍NioEventLoopGroup时候,在NioEventLoop的run方法中,每次轮询完调用processSelectedKeys方法时,都是通过这个unsafe根据SelectedKey来完成数据的读或写,unsafe是处理基础的数据读写 (unsafe在NioServerSocketChannel创建时,产生NioMessageUnsafe实例,在NioSocketChannel创建时产生NioSocketChannelUnsafe实例) 而pipeline的实现是一条双向责任链,负责处理unsafe提供的数据,进而进行用户的业务逻辑(Netty中的ChannelPipeline源码分析)

    在AbstractNioChannel中调用configureBlocking方法给JDK的ServerSocketChannel设置为非阻塞模式,且让readInterestOp成员赋值为16用于未来注册ACCEPT事件。

    在调用完继承链后回到NioServerSocketChannel构造,调用了javaChannel方法:

    protected java.nio.channels.ServerSocketChannel javaChannel() { return (java.nio.channels.ServerSocketChannel)super.javaChannel(); }

    其实这个javaChannel就是刚才出传入到AbstractNioChannel中的ch成员:

    protected SelectableChannel javaChannel() { return this.ch; }

    也就是刚才创建的JDK的ServerSocketChannelImpl,用其socket方法,得到一个ServerSocket对象,然后产生了一个NioServerSocketChannelConfig对象,用于封装相关信息。

    NioServerSocketChannel构建完毕,回到initAndRegister方法,使用刚创建的NioServerSocketChannel调用init方法,这个方法是在ServerBootstrap中实现的:

    void init(Channel channel) throws Exception { Map<ChannelOption<?>, Object> options = this.options0(); synchronized(options) { setChannelOptions(channel, options, logger); } Map<AttributeKey<?>, Object> attrs = this.attrs0(); synchronized(attrs) { Iterator var5 = attrs.entrySet().iterator(); while(true) { if (!var5.hasNext()) { break; } Entry<AttributeKey<?>, Object> e = (Entry)var5.next(); AttributeKey<Object> key = (AttributeKey)e.getKey(); channel.attr(key).set(e.getValue()); } } ChannelPipeline p = channel.pipeline(); final EventLoopGroup currentChildGroup = this.childGroup; final ChannelHandler currentChildHandler = this.childHandler; Map var9 = this.childOptions; final Entry[] currentChildOptions; synchronized(this.childOptions) { currentChildOptions = (Entry[])this.childOptions.entrySet().toArray(newOptionArray(0)); } var9 = this.childAttrs; final Entry[] currentChildAttrs; synchronized(this.childAttrs) { currentChildAttrs = (Entry[])this.childAttrs.entrySet().toArray(newAttrArray(0)); } p.addLast(new ChannelHandler[]{new ChannelInitializer<Channel>() { public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = ServerBootstrap.this.config.handler(); if (handler != null) { pipeline.addLast(new ChannelHandler[]{handler}); } ch.eventLoop().execute(new Runnable() { public void run() { pipeline.addLast(new ChannelHandler[]{new ServerBootstrap.ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)}); } }); } }}); }

    首先对attrs和options这两个成员进行了填充属性配置,这不是重点,然后获取刚才创建的NioServerSocketChannel的责任链pipeline,通过addLast将ChannelInitializer加入责任链,在ChannelInitializer中重写了initChannel方法,首先根据handler是否是null(这个handler是ServerBootstrap调用handler方法添加的,和childHandler方法不一样),若是handler不是null,将handler加入责任链,无论如何,都会异步将一个ServerBootstrapAcceptor对象加入责任链(后面会说为什么是异步)

    这个ChannelInitializer的initChannel方法的执行需要等到后面注册时才会被调用,在后面pipeline处理channelRegistered请求时,此initChannel方法才会被执行(Netty中的ChannelPipeline源码分析)

    ChannelInitializer的channelRegistered方法:

    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception { if (initChannel(ctx)) { ctx.pipeline().fireChannelRegistered(); } else { ctx.fireChannelRegistered(); } }

    首先调用initChannel方法(和上面的initChannel不是一个):

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception { if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { try { initChannel((C) ctx.channel()); } catch (Throwable cause) { exceptionCaught(ctx, cause); } finally { remove(ctx); } return true; } return false; }

    可以看到,这个ChannelInitializer只会在pipeline中初始化一次,仅用于Channel的注册,在完成注册后,会调用remove方法将其从pipeline中移除: remove方法:

    private void remove(ChannelHandlerContext ctx) { try { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } finally { initMap.remove(ctx); } }

    在移除前,就会回调用刚才覆盖的initChannel方法,异步向pipeline添加了ServerBootstrapAcceptor,用于后续的NioServerSocketChannel侦听到客户端连接后,完成在服务端的NioSocketChannel的注册。

    回到initAndRegister,在对NioServerSocketChannel初始化完毕,接下来就是注册逻辑

    ChannelFuture regFuture = this.config().group().register(channel);

    首先调用config().group(),这个就得到了一开始在ServerBootstrap的group方法传入的parentGroup,调用parentGroup的register方法,parentGroup是NioEventLoopGroup,这个方法是在子类MultithreadEventLoopGroup中实现的:

    public ChannelFuture register(Channel channel) { return this.next().register(channel); }

    首先调用next方法:

    public EventLoop next() { return (EventLoop)super.next(); }

    实际上调用父类MultithreadEventExecutorGroup的next方法:

    public EventExecutor next() { return this.chooser.next(); }

    关于chooser在我之前博客:Netty中NioEventLoopGroup的创建源码分析介绍过,在NioEventLoopGroup创建时,默认会根据cpu个数创建二倍个NioEventLoop,而chooser就负责通过取模,每次选择一个NioEventLoop使用 所以在MultithreadEventLoopGroup的register方法实际调用了NioEventLoop的register方法:

    NioEventLoop的register方法在子类SingleThreadEventLoop中实现:

    public ChannelFuture register(Channel channel) { return this.register((ChannelPromise)(new DefaultChannelPromise(channel, this))); } public ChannelFuture register(ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }

    先把channel包装成ChannelPromise,默认是DefaultChannelPromise ( Netty中的ChannelFuture和ChannelPromise),用于处理异步操作 调用重载方法,而在重载方法里,可以看到,实际上的register操作交给了channel的unsafe来实现: unsafe的register方法在AbstractUnsafe中实现:

    public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } else if (AbstractChannel.this.isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); } else if (!AbstractChannel.this.isCompatible(eventLoop)) { promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); } else { AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { this.register0(promise); } else { try { eventLoop.execute(new Runnable() { public void run() { AbstractUnsafe.this.register0(promise); } }); } catch (Throwable var4) { AbstractChannel.logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, var4); this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var4); } } } }

    前面的判断做了一些检查就不细说了,直接看到else块 首先给当前Channel绑定了eventLoop,即通过刚才chooser选择的eventLoop,该Channel也就是NioServerSocketChannel 由于Unsafe的操作是在轮询线程中异步执行的,所里,这里需要判断inEventLoop是否处于轮询中 在之前介绍NioEventLoopGroup的时候说过,NioEventLoop在没有调用doStartThread方法时并没有启动轮询的,所以inEventLoop判断不成立

    那么就调用eventLoop的execute方法,实际上的注册方法可以看到调用了AbstractUnsafe的register0方法,而将这个方法封装为Runnable交给eventLoop作为一个task去异步执行 先来看eventLoop的execute方法实现,是在NioEventLoop的子类SingleThreadEventExecutor中实现的:

    public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } else { boolean inEventLoop = this.inEventLoop(); this.addTask(task); if (!inEventLoop) { this.startThread(); if (this.isShutdown() && this.removeTask(task)) { reject(); } } if (!this.addTaskWakesUp && this.wakesUpForTask(task)) { this.wakeup(inEventLoop); } } }

    这里首先将task,即刚才的注册事件放入阻塞任务队列中,然后调用startThread方法:

    private void startThread() { if (this.state == 1 && STATE_UPDATER.compareAndSet(this, 1, 2)) { try { this.doStartThread(); } catch (Throwable var2) { STATE_UPDATER.set(this, 1); PlatformDependent.throwException(var2); } } }

    NioEventLoop此时还没有轮询,所以状态是1,对应ST_NOT_STARTED,此时利用CAS操作,将状态修改为2,即ST_STARTED ,标志着NioEventLoop要启动轮询了,果然,接下来就调用了doStartThread开启轮询线程:

    private void doStartThread() { assert this.thread == null; this.executor.execute(new Runnable() { public void run() { SingleThreadEventExecutor.this.thread = Thread.currentThread(); if (SingleThreadEventExecutor.this.interrupted) { SingleThreadEventExecutor.this.thread.interrupt(); } boolean success = false; SingleThreadEventExecutor.this.updateLastExecutionTime(); boolean var112 = false; int oldState; label1907: { try { var112 = true; SingleThreadEventExecutor.this.run(); success = true; var112 = false; break label1907; } catch (Throwable var119) { SingleThreadEventExecutor.logger.warn("Unexpected exception from an event executor: ", var119); var112 = false; } finally { if (var112) { int oldStatex; do { oldStatex = SingleThreadEventExecutor.this.state; } while(oldStatex < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldStatex, 3)); if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) { SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); } try { while(!SingleThreadEventExecutor.this.confirmShutdown()) { ; } } finally { try { SingleThreadEventExecutor.this.cleanup(); } finally { SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); SingleThreadEventExecutor.this.threadLock.release(); if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) { SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); } SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); } } } } do { oldState = SingleThreadEventExecutor.this.state; } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3)); if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) { SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); } try { while(!SingleThreadEventExecutor.this.confirmShutdown()) { ; } return; } finally { try { SingleThreadEventExecutor.this.cleanup(); } finally { SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); SingleThreadEventExecutor.this.threadLock.release(); if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) { SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); } SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); } } } do { oldState = SingleThreadEventExecutor.this.state; } while(oldState < 3 && !SingleThreadEventExecutor.STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, oldState, 3)); if (success && SingleThreadEventExecutor.this.gracefulShutdownStartTime == 0L && SingleThreadEventExecutor.logger.isErrorEnabled()) { SingleThreadEventExecutor.logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called before run() implementation terminates."); } try { while(!SingleThreadEventExecutor.this.confirmShutdown()) { ; } } finally { try { SingleThreadEventExecutor.this.cleanup(); } finally { SingleThreadEventExecutor.STATE_UPDATER.set(SingleThreadEventExecutor.this, 5); SingleThreadEventExecutor.this.threadLock.release(); if (!SingleThreadEventExecutor.this.taskQueue.isEmpty() && SingleThreadEventExecutor.logger.isWarnEnabled()) { SingleThreadEventExecutor.logger.warn("An event executor terminated with non-empty task queue (" + SingleThreadEventExecutor.this.taskQueue.size() + ')'); } SingleThreadEventExecutor.this.terminationFuture.setSuccess((Object)null); } } } }); }

    关于doStartThread方法,我在Netty中NioEventLoopGroup的创建源码分析 中已经说的很细了,这里就不再一步一步分析了

    因为此时还没真正意义上的启动轮询,所以thread等于null成立的,然后调用executor的execute方法,这里的executor是一个线程池,在之前说过的,所以里面的run方法是处于一个线程里面的,然后给thread成员赋值为当前线程,表明正式进入了轮询。 而SingleThreadEventExecutor.this.run()才是真正的轮询逻辑,这在之前也说过,这个run的实现是在父类NioEventLoop中:

    protected void run() { while(true) { while(true) { try { switch(this.selectStrategy.calculateStrategy(this.selectNowSupplier, this.hasTasks())) { case -2: continue; case -1: this.select(this.wakenUp.getAndSet(false)); if (this.wakenUp.get()) { this.selector.wakeup(); } default: this.cancelledKeys = 0; this.needsToSelectAgain = false; int ioRatio = this.ioRatio; if (ioRatio == 100) { try { this.processSelectedKeys(); } finally { this.runAllTasks(); } } else { long ioStartTime = System.nanoTime(); boolean var13 = false; try { var13 = true; this.processSelectedKeys(); var13 = false; } finally { if (var13) { long ioTime = System.nanoTime() - ioStartTime; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } } long ioTime = System.nanoTime() - ioStartTime; this.runAllTasks(ioTime * (long)(100 - ioRatio) / (long)ioRatio); } } } catch (Throwable var21) { handleLoopException(var21); } try { if (this.isShuttingDown()) { this.closeAll(); if (this.confirmShutdown()) { return; } } } catch (Throwable var18) { handleLoopException(var18); } } } }

    首先由于task已经有一个了,就是刚才的注册事件,所以选择策略calculateStrategy最终调用selectNow(也是之前说过的):

    private final IntSupplier selectNowSupplier = new IntSupplier() { public int get() throws Exception { return NioEventLoop.this.selectNow(); } }; int selectNow() throws IOException { int var1; try { var1 = this.selector.selectNow(); } finally { if (this.wakenUp.get()) { this.selector.wakeup(); } } return var1; }

    使用JDK原生Selector进行selectNow,由于此时没有任何Channel的注册,所以selectNow会立刻返回0,此时就进入default逻辑,由于没有任何注册,processSelectedKeys方法也做不了什么,所以在这一次的轮询实质上只进行了runAllTasks方法,此方法会执行阻塞队列中的task的run方法(还是在之前博客中介绍过),由于轮询是在线程池中的一个线程中运行的,所以task的执行是一个异步操作。(在执行完task,将task移除阻塞对立,线程继续轮询)

    这时就可以回到AbstractChannel的register方法中了,由上面可以知道task实际上异步执行了:

    AbstractUnsafe.this.register0(promise);

    register0方法:

    private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !this.ensureOpen(promise)) { return; } boolean firstRegistration = this.neverRegistered; AbstractChannel.this.doRegister(); this.neverRegistered = false; AbstractChannel.this.registered = true; AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded(); this.safeSetSuccess(promise); AbstractChannel.this.pipeline.fireChannelRegistered(); if (AbstractChannel.this.isActive()) { if (firstRegistration) { AbstractChannel.this.pipeline.fireChannelActive(); } else if (AbstractChannel.this.config().isAutoRead()) { this.beginRead(); } } } catch (Throwable var3) { this.closeForcibly(); AbstractChannel.this.closeFuture.setClosed(); this.safeSetFailure(promise, var3); } }

    可以看到实际上的注册逻辑又交给了AbstractChannel的doRegister,而这个方法在AbstractNioChannel中实现:

    protected void doRegister() throws Exception { boolean selected = false; while(true) { try { this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException var3) { if (selected) { throw var3; } this.eventLoop().selectNow(); selected = true; } } }

    javaChannel就是之前产生的JDK的ServerSocketChannel,unwrappedSelector在之前说过,就是未经修改的JDK原生Selector,这个Selector和eventLoop是一对一绑定的,可以看到调用JDK原生的注册方法,完成了对ServerSocketChannel的注册,但是注册的是一个0状态(缺省值),而传入的this,即AbstractNioChannel对象作为了一个附件,用于以后processSelectedKeys方法从SelectionKey中得到对应的Netty的Channel(还是之前博客说过) 关于缺省值,是由于AbstractNioChannel不仅用于NioServerSocketChannel的注册,还用于NioSocketChannel的注册,只有都使用缺省值注册才不会产生异常(【Java】NIO中Channel的注册源码分析),并且,在以后processSelectedKeys方法会对0状态判断,再使用unsafe进行相应的逻辑处理。

    在完成JDK的注册后,调用pipeline的invokeHandlerAddedIfNeeded方法,处理ChannelHandler的handlerAdded的回调(Netty中的ChannelPipeline源码分析),即调用用户添加的ChannelHandler的handlerAdded方法。 调用safeSetSuccess,标志异步操作完成:

    protected final void safeSetSuccess(ChannelPromise promise) { if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) { logger.warn("Failed to mark a promise as success because it is done already: {}", promise); } }

    关于异步操作我在之前的博客中说的很清楚了:Netty中的ChannelFuture和ChannelPromise

    接着调用pipeline的fireChannelRegistered方法,也就是在责任链上调用channelRegistered方法,这时,就会调用之在ServerBootstrap中向pipeline添加的ChannelInitializer的channelRegistered,进而回调initChannel方法,完成对ServerBootstrapAcceptor的添加。

    回到register0方法,在处理完pipeline的责任链后,根据当前AbstractChannel即NioServerSocketChannel的isActive:

    public boolean isActive() { return this.javaChannel().socket().isBound(); }

    获得NioServerSocketChannel绑定的JDK的ServerSocketChannel,进而获取ServerSocket,判断isBound:

    public boolean isBound() { // Before 1.3 ServerSockets were always bound during creation return bound || oldImpl; }

    这里实际上就是判断ServerSocket是否调用了bind方法,前面说过register0方法是一个异步操作,在多线程环境下不能保证执行顺序,若是此时已经完成了ServerSocket的bind,根据firstRegistration判断是否需要pipeline传递channelActive请求,首先会执行pipeline的head即HeadContext的channelActive方法:

    @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelActive(); readIfIsAutoRead(); }

    在HeadContext通过ChannelHandlerContext 传递完channelActive请求后,会调用readIfIsAutoRead方法:

    private void readIfIsAutoRead() { if (channel.config().isAutoRead()) { channel.read(); } }

    此时调用AbstractChannel的read方法:

    public Channel read() { pipeline.read(); return this; }

    最终在请求链由HeadContext执行read方法:

    public void read(ChannelHandlerContext ctx) { unsafe.beginRead(); }

    终于可以看到此时调用unsafe的beginRead方法:

    public final void beginRead() { assertEventLoop(); if (!isActive()) { return; } try { doBeginRead(); } catch (final Exception e) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireExceptionCaught(e); } }); close(voidPromise()); } }

    最终执行了doBeginRead方法,由AbstractNioChannel实现:

    protected void doBeginRead() throws Exception { final SelectionKey selectionKey = this.selectionKey; if (!selectionKey.isValid()) { return; } readPending = true; final int interestOps = selectionKey.interestOps(); if ((interestOps & readInterestOp) == 0) { selectionKey.interestOps(interestOps | readInterestOp); } }

    这里,就完成了向Selector注册readInterestOp事件,从前面来看就是ACCEPT事件

    回到AbstractBootstrap的doBind方法,在initAndRegister逻辑结束后,由上面可以知道,实际上的register注册逻辑是一个异步操作,在register0中完成 根据ChannelFuture来判断异步操作是否完成,如果isDone,则表明异步操作先完成,即完成了safeSetSuccess方法, 然后调用newPromise方法:

    public ChannelPromise newPromise() { return pipeline.newPromise(); }

    给channel的pipeline绑定异步操作ChannelPromise 然后调用doBind0方法完成ServerSocket的绑定,若是register0这个异步操作还没完成,就需要给ChannelFuture产生一个异步操作的侦听ChannelFutureListener对象,等到register0方法调用safeSetSuccess时,在promise的trySuccess中会回调ChannelFutureListener的operationComplete方法,进而调用doBind0方法

    doBind0方法:

    private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }

    向轮询线程提交了一个任务,异步处理bind,可以看到,只有在regFuture异步操作成功结束后,调用channel的bind方法:

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { return pipeline.bind(localAddress, promise); }

    实际上的bind又交给pipeline,去完成,pipeline中就会交给责任链去完成,最终会交给HeadContext完成:

    public void bind( ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception { unsafe.bind(localAddress, promise); }

    可以看到,绕了一大圈,交给了unsafe完成:

    public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { assertEventLoop(); if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() && !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) { logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { safeSetFailure(promise, t); closeIfClosed(); return; } if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } safeSetSuccess(promise); }

    然而,真正的bind还是回调了doBind方法,最终是由NioServerSocketChannel来实现:

    @Override protected void doBind(SocketAddress localAddress) throws Exception { if (PlatformDependent.javaVersion() >= 7) { javaChannel().bind(localAddress, config.getBacklog()); } else { javaChannel().socket().bind(localAddress, config.getBacklog()); } }

    在这里终于完成了对JDK的ServerSocketChannel的bind操作

    在上面的

    if (!wasActive && isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); }

    这个判断,就是确保在register0中isActive时,还没完成绑定,也就没有beginRead操作来向Selector注册ACCEPT事件,那么就在这里进行注册,进而让ServerSocket去侦听客户端的连接

    在服务端ACCEPT到客户端的连接后,在NioEventLoop轮询中,就会调用processSelectedKeys处理ACCEPT的事件就绪,然后交给unsafe的read去处理 Netty中NioEventLoopGroup的创建源码分析

    在服务端,由NioMessageUnsafe实现:

    public void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.reset(config); boolean closed = false; Throwable exception = null; try { try { do { int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } allocHandle.incMessagesRead(localRead); } while (allocHandle.continueReading()); } catch (Throwable t) { exception = t; } int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (exception != null) { closed = closeOnReadError(exception); pipeline.fireExceptionCaught(exception); } if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } }

    核心在doReadMessages方法,由NioServerSocketChannel实现:

    protected int doReadMessages(List<Object> buf) throws Exception { SocketChannel ch = SocketUtils.accept(javaChannel()); try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t); try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } } return 0; }

    SocketUtils的accept方法其实就是用来调用JDK中ServerSocketChannel原生的accept方法,来得到一个JDK的SocketChannel对象,然后通过这个SocketChannel对象,将其包装成NioSocketChannel对象添加在buf这个List中

    由此可以看到doReadMessages用来侦听所有就绪的连接,包装成NioSocketChannel将其放在List中 然后遍历这个List,调用 NioServerSocketChannel的pipeline的fireChannelRead方法,传递channelRead请求,、 在前面向pipeline中添加了ServerBootstrapAcceptor这个ChannelHandler,此时,它也会响应这个请求,回调channelRead方法:

    public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }

    msg就是侦听到的NioSocketChannel对象,给该对象的pipeline添加childHandler,也就是我们在ServerBootstrap中通过childHandler方法添加的 然后通过register方法完成对NioSocketChannel的注册(和NioServerSocketChannel注册逻辑一样)

    至此Netty服务端的启动结束。

    最新回复(0)