Netty学习笔记(五)--- ChannelHandler的执行顺序

    xiaoxiao2022-07-13  146

    在上一篇文章中,我们已经对ChannelHandler的执行顺序进行了介绍,一个I/O事件可以通过ChannelHandlerContext中的事件传播方法(如 ChannelHandlerContext.fireChannelRead(Object) 和ChannelHandlerContext.write(Object))传递到下一个类型相同的ChannelHandler上。

    下图取自Netty源码的注释,它描述了通常情况下,ChannelPipeline中的ChannleHandler是以何种顺序处理I/O事件的。

    下面通过案例来对ChannelHandler的执行顺序进行验证。该案例模拟了服务端和客户端的通讯,服务端注册了两个ChannelInboundHandler和两个ChannelOutboundHandler。当客户端连接到服务端后,会向其发送一条信息,然后服务端经过多个ChannelHandler的处理,向客户端反馈一条信息。

    ChannelHandler代码

    FirstInServerHandler
    package com.netty.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; public class FirstInServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server console: " + in.toString(CharsetUtil.UTF_8)); ctx.fireChannelRead(Unpooled.copiedBuffer("FirstInServerHandler -> ", CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); } }
    SecondInServerHandler
    package com.netty.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; public class SecondInServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server console: " + in.toString(CharsetUtil.UTF_8)); ctx.write(Unpooled.copiedBuffer(in, Unpooled.copiedBuffer("SecondInServerHandler -> ", CharsetUtil.UTF_8))); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
    FirstOutServerHandler
    package com.netty.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; public class FirstOutServerHandler extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server console: " + in.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(in, Unpooled.copiedBuffer("FirstOutServerHandler", CharsetUtil.UTF_8))) .addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
    SecondOutServerHandler
    package com.netty.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; public class SecondOutServerHandler extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server console: " + in.toString(CharsetUtil.UTF_8)); ctx.writeAndFlush(Unpooled.copiedBuffer(in, Unpooled.copiedBuffer("SecondOutServerHandler -> ", CharsetUtil.UTF_8))); } }
    ClientInHandler
    package com.netty.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; public class ClientInHandler extends SimpleChannelInboundHandler<ByteBuf>{ @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("Client Console: " + msg.toString(CharsetUtil.UTF_8)); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("ClientInHandler -> ", CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
    ClientOutHandler
    package com.netty.handler; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; import io.netty.util.CharsetUtil; public class ClientOutHandler extends ChannelOutboundHandlerAdapter{ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf in = (ByteBuf) msg; ctx.writeAndFlush(Unpooled.copiedBuffer(in, Unpooled.copiedBuffer("ClientOutHandler", CharsetUtil.UTF_8))); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }

    Server代码

    HelloServerDemo
    package com.netty; import com.netty.handler.FirstInServerHandler; import com.netty.handler.FirstOutServerHandler; import com.netty.handler.SecondInServerHandler; import com.netty.handler.SecondOutServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class HelloServerDemo { public static void main(String[] args) { HelloServerDemo server = new HelloServerDemo(); server.start(20000); } public void start(int port){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new FirstOutServerHandler()); pipeline.addLast(new SecondOutServerHandler()); pipeline.addLast(new FirstInServerHandler()); pipeline.addLast(new SecondInServerHandler()); } }); ChannelFuture future = bootstrap.bind(port).sync(); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

    Client代码

    HellClientDemo
    package com.netty; import com.netty.handler.ClientInHandler; import com.netty.handler.ClientOutHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class HelloClientDemo { public static void main(String[] args) { HelloClientDemo client = new HelloClientDemo(); client.start("localhost", 20000); } public void start(String ip, int port) { NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ClientOutHandler()); pipeline.addLast(new ClientInHandler()); } }); ChannelFuture future = bootstrap.connect(ip, port).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); } } }

    运行代码

    首先启动服务端代码,然后启动客户端代码,控制台的输出结果如下:

    HelloClientDemo console:
    Server console: ClientInHandler -> ClientOutHandler Server console: FirstInServerHandler -> Server console: FirstInServerHandler -> SecondInServerHandler -> Server console: FirstInServerHandler -> SecondInServerHandler -> SecondOutServerHandler ->
    HelloClientDemo console:
    Client Console: FirstInServerHandler -> SecondInServerHandler -> SecondOutServerHandler -> FirstOutServerHandler

    从结果可以看出,执行顺序和我们注册ChannelHandler的顺序有一定关联,其中inbound处理器是顺序执行,而outbound处理器是逆序执行,这也验证了源码中的执行顺序。

    注意事项

    到这里,关于ChannelHandler的执行顺序问题,已经弄清楚了,接下来介绍一下这个过程中需要注意的细节。

    下图代表注册了四个ChannelHandler后,每个通道的ChannelPipeline的handler分布。 上面的案例是按照如下方式注册:

    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new FirstOutServerHandler()); // ① pipeline.addLast(new SecondOutServerHandler()); // ② pipeline.addLast(new FirstInServerHandler()); // ③ pipeline.addLast(new SecondInServerHandler()); // ④ } });

    如果改变ChannelHandler的注册顺序(inbound处理器的相对位置不变),客户端控制台的显示效果也会随之变化,我们预期是四个handler类名称的都会出现在控制台上,只是顺序有所变化。但在有些情况下,四个handler类名称只会显示三个,甚至两个。多次测试发现,当outbound处理器位于ChannelPipeline的末端时,就会出现handler名称缺失。

    下面我们来复现该问题,按照如下的顺序注册ChannelHandler对象。

    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new FirstOutServerHandler()); // ① pipeline.addLast(new FirstInServerHandler()); // ② pipeline.addLast(new SecondInServerHandler()); // ③ pipeline.addLast(new SecondOutServerHandler()); // ④ } });

    执行代码,控制台的结果为(这里只展示客户端的控制台):

    Client Console: FirstInServerHandler -> SecondInServerHandler -> FirstOutServerHandler

    这是为什么呢?通过追踪源码发现,当我们调用ChannelContext.write(msg)时,会调用findContextOutbound()方法查找下一个出站处理器,该方法的源码如下:

    private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }

    这里的逻辑很简单,它会以自身为起点,反向查找下一个outbound处理器,从上面的handler分布图可知,如果将第二个outbound处理器放置在ChannnelPipeline的尾端时,由于handler的反向查找,就会将该outbound处理器跳过,导致SecondOutServerHandler没有打印出来。要解决这个问题,就需要在进站转换为出站时,从tail节点进行查找,这怎么实现呢?

    ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。

    public class SecondInServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf in = (ByteBuf) msg; System.out.println("Server console: " + in.toString(CharsetUtil.UTF_8)); ctx.pipeline().write(Unpooled.copiedBuffer(in, Unpooled.copiedBuffer("SecondInServerHandler -> ", CharsetUtil.UTF_8))); // ctx.channel().write(Unpooled.copiedBuffer(in, // Unpooled.copiedBuffer("SecondInServerHandler -> ", CharsetUtil.UTF_8))); } }

    按照上面两种方法之一,即可解决该问题,实现和案例相同的效果。因此我们在注册ChannelHandler时,最好将outbound处理器放在最后一个inbound处理器之前,然后调用ChannelHandlerContext的相应方法,这样会产生更短的事件流,使你的应用程序获得更好的性能。

    最新回复(0)