在上一篇文章中,我们已经对ChannelHandler的执行顺序进行了介绍,一个I/O事件可以通过ChannelHandlerContext中的事件传播方法(如 ChannelHandlerContext.fireChannelRead(Object) 和ChannelHandlerContext.write(Object))传递到下一个类型相同的ChannelHandler上。
下图取自Netty源码的注释,它描述了通常情况下,ChannelPipeline中的ChannleHandler是以何种顺序处理I/O事件的。
下面通过案例来对ChannelHandler的执行顺序进行验证。该案例模拟了服务端和客户端的通讯,服务端注册了两个ChannelInboundHandler和两个ChannelOutboundHandler。当客户端连接到服务端后,会向其发送一条信息,然后服务端经过多个ChannelHandler的处理,向客户端反馈一条信息。
ChannelHandler代码
FirstInServerHandler
package com
.netty
.handler
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.Unpooled
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.ChannelInboundHandlerAdapter
;
import io
.netty
.util
.CharsetUtil
;
public class FirstInServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
ByteBuf in
= (ByteBuf
) msg
;
System
.out
.println("Server console: " + in
.toString(CharsetUtil
.UTF_8
));
ctx
.fireChannelRead(Unpooled
.copiedBuffer("FirstInServerHandler -> ",
CharsetUtil
.UTF_8
));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx
) throws Exception
{
super.channelReadComplete(ctx
);
}
}
SecondInServerHandler
package com
.netty
.handler
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.Unpooled
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.ChannelInboundHandlerAdapter
;
import io
.netty
.util
.CharsetUtil
;
import io
.netty
.util
.ReferenceCountUtil
;
public class SecondInServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
ByteBuf in
= (ByteBuf
) msg
;
System
.out
.println("Server console: " + in
.toString(CharsetUtil
.UTF_8
));
ctx
.write(Unpooled
.copiedBuffer(in
,
Unpooled
.copiedBuffer("SecondInServerHandler -> ", CharsetUtil
.UTF_8
)));
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx
) throws Exception
{
ctx
.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
cause
.printStackTrace();
ctx
.close();
}
}
FirstOutServerHandler
package com
.netty
.handler
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.Unpooled
;
import io
.netty
.channel
.ChannelFutureListener
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.ChannelOutboundHandlerAdapter
;
import io
.netty
.channel
.ChannelPromise
;
import io
.netty
.util
.CharsetUtil
;
public class FirstOutServerHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx
, Object msg
, ChannelPromise promise
) throws Exception
{
ByteBuf in
= (ByteBuf
) msg
;
System
.out
.println("Server console: " + in
.toString(CharsetUtil
.UTF_8
));
ctx
.writeAndFlush(Unpooled
.copiedBuffer(in
,
Unpooled
.copiedBuffer("FirstOutServerHandler", CharsetUtil
.UTF_8
)))
.addListener(ChannelFutureListener
.CLOSE
);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
cause
.printStackTrace();
ctx
.close();
}
}
SecondOutServerHandler
package com
.netty
.handler
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.Unpooled
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.ChannelOutboundHandlerAdapter
;
import io
.netty
.channel
.ChannelPromise
;
import io
.netty
.util
.CharsetUtil
;
public class SecondOutServerHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx
, Object msg
, ChannelPromise promise
) throws Exception
{
ByteBuf in
= (ByteBuf
) msg
;
System
.out
.println("Server console: " + in
.toString(CharsetUtil
.UTF_8
));
ctx
.writeAndFlush(Unpooled
.copiedBuffer(in
,
Unpooled
.copiedBuffer("SecondOutServerHandler -> ", CharsetUtil
.UTF_8
)));
}
}
ClientInHandler
package com
.netty
.handler
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.Unpooled
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.SimpleChannelInboundHandler
;
import io
.netty
.util
.CharsetUtil
;
public class ClientInHandler extends SimpleChannelInboundHandler<ByteBuf>{
@Override
protected void channelRead0(ChannelHandlerContext ctx
, ByteBuf msg
) throws Exception
{
System
.out
.println("Client Console: " + msg
.toString(CharsetUtil
.UTF_8
));
}
@Override
public void channelActive(ChannelHandlerContext ctx
) throws Exception
{
ctx
.writeAndFlush(Unpooled
.copiedBuffer("ClientInHandler -> ", CharsetUtil
.UTF_8
));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
cause
.printStackTrace();
ctx
.close();
}
}
ClientOutHandler
package com
.netty
.handler
;
import io
.netty
.buffer
.ByteBuf
;
import io
.netty
.buffer
.Unpooled
;
import io
.netty
.channel
.ChannelHandlerContext
;
import io
.netty
.channel
.ChannelOutboundHandlerAdapter
;
import io
.netty
.channel
.ChannelPromise
;
import io
.netty
.util
.CharsetUtil
;
public class ClientOutHandler extends ChannelOutboundHandlerAdapter{
@Override
public void write(ChannelHandlerContext ctx
, Object msg
, ChannelPromise promise
) throws Exception
{
ByteBuf in
= (ByteBuf
) msg
;
ctx
.writeAndFlush(Unpooled
.copiedBuffer(in
,
Unpooled
.copiedBuffer("ClientOutHandler", CharsetUtil
.UTF_8
)));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx
, Throwable cause
) throws Exception
{
cause
.printStackTrace();
ctx
.close();
}
}
Server代码
HelloServerDemo
package com
.netty
;
import com
.netty
.handler
.FirstInServerHandler
;
import com
.netty
.handler
.FirstOutServerHandler
;
import com
.netty
.handler
.SecondInServerHandler
;
import com
.netty
.handler
.SecondOutServerHandler
;
import io
.netty
.bootstrap
.ServerBootstrap
;
import io
.netty
.channel
.ChannelFuture
;
import io
.netty
.channel
.ChannelInitializer
;
import io
.netty
.channel
.ChannelPipeline
;
import io
.netty
.channel
.nio
.NioEventLoopGroup
;
import io
.netty
.channel
.socket
.SocketChannel
;
import io
.netty
.channel
.socket
.nio
.NioServerSocketChannel
;
public class HelloServerDemo {
public static void main(String
[] args
) {
HelloServerDemo server
= new HelloServerDemo();
server
.start(20000);
}
public void start(int port
){
NioEventLoopGroup bossGroup
= new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup
= new NioEventLoopGroup();
try {
ServerBootstrap bootstrap
= new ServerBootstrap();
bootstrap
.group(bossGroup
,workerGroup
);
bootstrap
.channel(NioServerSocketChannel
.class);
bootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch
) throws Exception
{
ChannelPipeline pipeline
= ch
.pipeline();
pipeline
.addLast(new FirstOutServerHandler());
pipeline
.addLast(new SecondOutServerHandler());
pipeline
.addLast(new FirstInServerHandler());
pipeline
.addLast(new SecondInServerHandler());
}
});
ChannelFuture future
= bootstrap
.bind(port
).sync();
future
.channel().closeFuture().sync();
}catch (Exception e
){
e
.printStackTrace();
}finally {
bossGroup
.shutdownGracefully();
workerGroup
.shutdownGracefully();
}
}
}
Client代码
HellClientDemo
package com
.netty
;
import com
.netty
.handler
.ClientInHandler
;
import com
.netty
.handler
.ClientOutHandler
;
import io
.netty
.bootstrap
.Bootstrap
;
import io
.netty
.channel
.ChannelFuture
;
import io
.netty
.channel
.ChannelInitializer
;
import io
.netty
.channel
.ChannelPipeline
;
import io
.netty
.channel
.nio
.NioEventLoopGroup
;
import io
.netty
.channel
.socket
.SocketChannel
;
import io
.netty
.channel
.socket
.nio
.NioSocketChannel
;
public class HelloClientDemo {
public static void main(String
[] args
) {
HelloClientDemo client
= new HelloClientDemo();
client
.start("localhost", 20000);
}
public void start(String ip
, int port
) {
NioEventLoopGroup workerGroup
= new NioEventLoopGroup();
try {
Bootstrap bootstrap
= new Bootstrap();
bootstrap
.group(workerGroup
);
bootstrap
.channel(NioSocketChannel
.class);
bootstrap
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch
) throws Exception
{
ChannelPipeline pipeline
= ch
.pipeline();
pipeline
.addLast(new ClientOutHandler());
pipeline
.addLast(new ClientInHandler());
}
});
ChannelFuture future
= bootstrap
.connect(ip
, port
).sync();
future
.channel().closeFuture().sync();
} catch (Exception e
) {
e
.printStackTrace();
} finally {
workerGroup
.shutdownGracefully();
}
}
}
运行代码
首先启动服务端代码,然后启动客户端代码,控制台的输出结果如下:
HelloClientDemo console:
Server console
: ClientInHandler
-> ClientOutHandler
Server console
: FirstInServerHandler
->
Server console
: FirstInServerHandler
-> SecondInServerHandler
->
Server console
: FirstInServerHandler
-> SecondInServerHandler
-> SecondOutServerHandler
->
HelloClientDemo console:
Client Console
: FirstInServerHandler
-> SecondInServerHandler
-> SecondOutServerHandler
-> FirstOutServerHandler
从结果可以看出,执行顺序和我们注册ChannelHandler的顺序有一定关联,其中inbound处理器是顺序执行,而outbound处理器是逆序执行,这也验证了源码中的执行顺序。
注意事项
到这里,关于ChannelHandler的执行顺序问题,已经弄清楚了,接下来介绍一下这个过程中需要注意的细节。
下图代表注册了四个ChannelHandler后,每个通道的ChannelPipeline的handler分布。 上面的案例是按照如下方式注册:
bootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch
) throws Exception
{
ChannelPipeline pipeline
= ch
.pipeline();
pipeline
.addLast(new FirstOutServerHandler());
pipeline
.addLast(new SecondOutServerHandler());
pipeline
.addLast(new FirstInServerHandler());
pipeline
.addLast(new SecondInServerHandler());
}
});
如果改变ChannelHandler的注册顺序(inbound处理器的相对位置不变),客户端控制台的显示效果也会随之变化,我们预期是四个handler类名称的都会出现在控制台上,只是顺序有所变化。但在有些情况下,四个handler类名称只会显示三个,甚至两个。多次测试发现,当outbound处理器位于ChannelPipeline的末端时,就会出现handler名称缺失。
下面我们来复现该问题,按照如下的顺序注册ChannelHandler对象。
bootstrap
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch
) throws Exception
{
ChannelPipeline pipeline
= ch
.pipeline();
pipeline
.addLast(new FirstOutServerHandler());
pipeline
.addLast(new FirstInServerHandler());
pipeline
.addLast(new SecondInServerHandler());
pipeline
.addLast(new SecondOutServerHandler());
}
});
执行代码,控制台的结果为(这里只展示客户端的控制台):
Client Console
: FirstInServerHandler
-> SecondInServerHandler
-> FirstOutServerHandler
这是为什么呢?通过追踪源码发现,当我们调用ChannelContext.write(msg)时,会调用findContextOutbound()方法查找下一个出站处理器,该方法的源码如下:
private AbstractChannelHandlerContext
findContextOutbound() {
AbstractChannelHandlerContext ctx
= this;
do {
ctx
= ctx
.prev
;
} while (!ctx
.outbound
);
return ctx
;
}
这里的逻辑很简单,它会以自身为起点,反向查找下一个outbound处理器,从上面的handler分布图可知,如果将第二个outbound处理器放置在ChannnelPipeline的尾端时,由于handler的反向查找,就会将该outbound处理器跳过,导致SecondOutServerHandler没有打印出来。要解决这个问题,就需要在进站转换为出站时,从tail节点进行查找,这怎么实现呢?
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。
public class SecondInServerHandler extends ChannelInboundHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx
, Object msg
) throws Exception
{
ByteBuf in
= (ByteBuf
) msg
;
System
.out
.println("Server console: " + in
.toString(CharsetUtil
.UTF_8
));
ctx
.pipeline().write(Unpooled
.copiedBuffer(in
,
Unpooled
.copiedBuffer("SecondInServerHandler -> ", CharsetUtil
.UTF_8
)));
}
}
按照上面两种方法之一,即可解决该问题,实现和案例相同的效果。因此我们在注册ChannelHandler时,最好将outbound处理器放在最后一个inbound处理器之前,然后调用ChannelHandlerContext的相应方法,这样会产生更短的事件流,使你的应用程序获得更好的性能。