在前面的文章中,通过对引导和线程的介绍,我们知道了如何配置、引导客户端和服务端程序,以及程序的线程模型。它们构建了应用程序运行的框架,但一个完整的网络应用程序还需要更多的内容,其中包括数据应该如何处理。Netty为此提供了一个强大功能组件ChannelHanlder接口,它允许用户自定义ChannelHandler的实现来处理传入和传出的数据。
在一款网络应用程序中,数据应该是其中最重要的部分,其他组件都是为了更方便有效的处理数据,所以作为处理数据的ChannelHandler组件应该算是Netty的核心组成部分。正如在前面的文章中的Echo应用程序,我们专门为服务器和客户端编写为ChannelHandler编写了实现类XXXXHandler,由此可见ChannelHandler的重要性。我们使用Netty时,绝大部分时间也都是写在这部分代码,所以了解它的一些机制和特性是很有必要的。
ChannelHandler作为Netty中Handler组件的根接口,其继承体系十分庞杂,有很多子类实现,具体的内容可参考api,下面会对其中的几个常用的实现进行介绍。
ChannelHandler接口中定义了三个生命周期操作,如下面的代码所示:
// 当把ChannelHandler添加到ChannelPipeline中时被调用 void handlerAdded(ChannelHandlerContext ctx) throws Exception; // 当把ChannelHandler从ChannelPipeline中移除时被调用 void handlerRemoved(ChannelHandlerContext ctx) throws Exception; // 当处理数据或事件的过程中,在ChannelPipeline中有错误产生时被调用 void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;如上面的类图所示,ChannelHandler有两个重要的接口:ChannelInboundHandler和ChannelOutboundHandler。
ChannelInboundHandler作为处理入站事件以及各类状态变化的Handler实现的父接口,它扩展了父接口,新增了一些特有的回调方法。这些方法将会在数据被接收时或者与其对应的 Channel 状态发生改变时被调用。
// 当Channel已经注册到它的EventLoop并且能够处理I/O时被调用 void channelRegistered(ChannelHandlerContext ctx) // 当Channel从它所关联的EventLoop注销并无法处理任何I/O时被调用 void channelUnregistered(ChannelHandlerContext ctx) // 当Channel处于活动状态时被调用,Channel已经连接/绑定并且已经就绪 void channelActive(ChannelHandlerContext ctx) // 当Channel离开活动状态并且不再连接它的远程节点时被调用 void channelInactive(ChannelHandlerContext ctx) // 当Channel读取数据时被调用 void channelRead(ChannelHandlerContext ctx, Object msg) // 当Channel上的一个读操作完成时被调用 void channelReadComplete(ChannelHandlerContext ctx) // 当用户事件触发时被调用,因为一个POJO被传经了ChannelPipeline void userEventTriggered(ChannelHandlerContext ctx, Object evt) // 当Channel的可写状态改变时被调用 void channelWritabilityChanged(ChannelHandlerContext ctx)可以注意到上面的每个方法都带了ChannelHandlerContext作为参数,具体作用是,在每个回调事件里面,处理完成之后,使用ChannelHandlerContext的fireChannelXXX方法来传递给下个ChannelHandler。
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { ctx.fireChannelRegistered(); }ChannelOutboundHandler是处理出站数据并且允许拦截所有的操作的Handler实现的父接口。它也对ChannelHandler接口进行了扩展,并定义了一些自身特有的方法。
// 当请求将Channel绑定在本地地址时被调用 void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) // 当请求将Channel连接到远程节点时被调用 void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) // 当请求将Channel从远程节点断开时被调用 void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) // 当请求关闭Channel时被调用 void close(ChannelHandlerContext ctx, ChannelPromise promise) // 当请求将Channel从所关联的EventLoop上注销时被调用 void deregister(ChannelHandlerContext ctx, ChannelPromise promise) // 当请求从Channel读取更多的数据时被调用 void read(ChannelHandlerContext ctx) // 当请求通过Channel将数据写到远程节点时被调用 void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) // 当请求通过Channel将入队数据冲刷到远程节点时被调用 void flush(ChannelHandlerContext ctx)和上面类似的,在每个回调事件处理完成之后,需要向下一个ChannelHandler传递,不同的是,这里需要调用ChannelHandlerContext的write()方法向下传递。
ChannelInboundHandlerAdapter 和 ChannelOutboundHandlerAdapter这两个适配器类作为上面两个接口的实现类,分别实现了它们所有的方法。实际编程中我们总是以这两个类作为起点,继承它们并重写其中感兴趣的方法来编写适合自己的Handler。
从上面的类图可看出,这两个适配器实现类同时继承了ChannelHandlerAdapter抽象类,而后者提供了一个实用方法 isSharable()。如果其对应的实现被标注为 @Sharable,那么这个方法将返回 true,表示它可以被添加到多个 ChannelPipeline中。
@Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { .... } ... }对于@Sharable注解的详细内容,可以查看你真的了解Netty中@Sharable?,文章对@sharable注解的原理做了详细说明。
至此,只剩下SimpleChannelInboundHandler抽象类了,它继承了ChannelInboundHandlerAdaptor类,重写channelRead方法,并增加了channelRead0的抽象方法。
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I imsg = (I) msg; channelRead0(ctx, imsg); } else { release = false; ctx.fireChannelRead(msg); } } finally { if (autoRelease && release) { ReferenceCountUtil.release(msg); } } } protected abstract void channelRead0(ChannelHandlerContext ctx, I msg) throws Exception;这里使用的是模板模式,将需要变化的逻辑放在抽象方法channelRead0(…)中,让子类根据自己的实现进行编写;将处理逻辑不变的内容写好在 channelRead(…)中,并在里面调用channelRead0(…)。
当我们继承SimpleChannelInboundHandler类时,将自定义逻辑写入channelRead0(…)中,当channelRead真正被触发调用时我们的逻辑才会被处理,然后根据需求执行ctx.fireChannelRead(msg)传递到下一个ChannelHandler或者执行ReferenceCountUtil.release(msg)自动进行资源释放。而直接继承ChannelInboundHandlerAdapter类,则需要负责显式地释放与池化的 ByteBuf 实例相关的内存。
关于SimpleChannelInboundHandler更全面的理解,可查看Netty源码:深入理解SimpleChannelInboundHandler。
ChannelHandler的主要作用是处理I/O事件或拦截I/O操作,并将事件转发到其所属ChannelPipeline中的下一个ChannelHandler。针对事件的转发方式以及事件的具体处理方式,会在后面的文章中介绍。
ChannelPipeline的作用是将多个ChannelHandler链接在一起来让事件在其中传播处理。一个ChannelPipeline中可能不仅有入站处理器,还有出站处理器,入站处理器只会处理入站的事件,而出站处理器只会处理出站的数据。
下面展示了一个同时具有入站处理器和出站处理器的ChannelPipeline典型分布,省略了tail和head节点。 从上图可看出,ChannelPipeline相当于一个ChannelHandler的容器。如果一个入站事件被触发,它将被从ChannelPipeline 的头部开始一直被传播到Channel Pipeline 的尾端。一个出站I/O 事件将从ChannelPipeline 的最右边开始,然后向左传播。
在 ChannelPipeline 传播事件时,它会检查 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。如果不匹配,ChannelPipeline 将跳过该ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。需要注意的是,如果一个ChannelHandler 同时实现了 ChannelInboundHandler 接口和 ChannelOutboundHandler 接口,就可以即处理进站事件,也处理出站事件了。
了解完上面的基本信息后,我们先从源码入手,了解ChannelPipeline的内部结构,看看其是如何实现的,下面是ChannelPipeline的实现类DefaultChannelPipeline的部分代码。
final class DefaultChannelPipeline implements ChannelPipeline { final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; public DefaultChannelPipeline(AbstractChannel channel) { if (channel == null) { throw new NullPointerException("channel"); } this.channel = channel; tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; } }可以看到, 在 DefaultChannelPipeline 的构造方法中,将传入的 channel 赋值给字段 this.channel,这里需要说明下:对于每个新的通道,会创建一个新的ChannelPipeline并附加至通道。一旦连接建立,Channel和ChannelPipeline之间的耦合就是永久性的。Channel不能附加其他的ChannelPipeline或从ChannelPipeline分离。
接着又实例化了两个特殊的字段: tail 与 head,这两个字段是一个双向链表的头和尾。其实在 DefaultChannelPipeline 中,维护了一个以 AbstractChannelHandlerContext 为节点的双向链表,这个链表是 Netty 实现 Pipeline 机制的关键。
继续查看tail和head的源码:
static final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler static final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler它们继承AbstractChannelHandlerContext的同时,分别实现了ChannelInboundHandler和ChannelOutboundHandler,因此可以说它们既是ChannelHandlerContext,又是 ChannelHandlerContext,这也是为什么它们是事件或消息的处理起点的原因。
到这里,通过这个AbstractChannelHandlerContext类型的双向链表和头尾节点的双重属性,Channelpipeline是和ChannelHandler关联起来了,但是我们添加的普通ChannelHandler实现类呢?
回到我们是熟悉的引导代码中:
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ServerHandler()); } });这里我们添加了一个自定义的ChannelHandler实现类,追踪其实现流程,其中一个环节:
@Override public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) { synchronized (this) { checkDuplicateName(name); AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler); addLast0(name, newCtx); } return this; }这里创建了一个AbstractChannelHandlerContext的对象,并将我们添加的ChannelHandler实现类作为参数。查看该构造函数:
DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) { super(pipeline, group, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; }再回到addLast(EventExecutorGroup group, final String name, ChannelHandler handler)中,继续向下执行,进入addLast0(name, newCtx)方法中:
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) { checkMultiplicity(newCtx); AbstractChannelHandlerContext prev = tail.prev; newCtx.prev = prev; newCtx.next = tail; prev.next = newCtx; tail.prev = newCtx; // 省略部分代码 }和我们熟悉的单向链表添加数据的方式类似,这里将新创建ChannelHandlerContext实现类,添加到双向链表中,至此我们明白了,每当添加一个ChannelHandler实现时,系统就会创建一个ChannelHandlerContext实例与之关联,我们添加ChannelHandler的过程,实际就是向ChannelPipeline中添加一个ChannelHandlerContext实例,只不过该实例内部包裹着一个自定义Channel Handler实例对象。
至此,我们自定义的ChannelHandler的成功添加到ChannelPipeline中。
当然,ChannelPipeline还提供了一系列方法,我们可以通过它们动态的添加、删除、替换其中的ChannelHandler,以满足不同的业务需求,这种机制极大的提高Netty的灵活性。
// 将ChannelHandler添加到ChannelPipeline起始位置 public ChannelPipeline addFirst(...) // 将ChannelHandler添加到ChannelPipeline末尾位置 public ChannelPipeline addLast (...) // 将ChannelHandler添加到指定的ChannelHandler之前 public ChannelPipeline addBefore (...) // 将ChannelHandler添加到指定的ChannelHandler之后 public ChannelPipeline addAfter (...) // 将一个 ChannelHandler 从 ChannelPipeline中移除 public ChannelPipeline remove (...) // 将 ChannelPipeline中的一个 ChannelHandler 替换为另一个 ChannelHandler public Channelpipeline replace (...)下面通过代码来进行演示:
ChannelPipeline pipeline = ch.pipeline(); // 创建一个新的handler实例作为"handler1"添加到pipeline中 pipeline.addLast("handler1", new FirstHandler()); // 创建一个新的handler实例添加到pipeline中,它将被放置在handler1的前面 pipeline.addFirst("handler2", new SecondHandler()); // 创建一个新的handler实例添加到pipeline中,它将被放置在handler1的后面 pipeline.addAfter("hander1", "handler3", new ThirdHandler()); // 通过名称删除"handler3" pipeline.remove("handler3"); // 创建一个新的handler实例,并替换掉名称为"handler2"的handler对象 pipeline.replace("handler2", "handler4", new FourthHanlder());上面这些方法还有很多重载方法,具体内容可参考API。
再次回到创建AbstractChannelHandlerContext对象的代码中:
private final ChannelHandler handler; DefaultChannelHandlerContext( DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) { super(pipeline, group, name, isInbound(handler), isOutbound(handler)); if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; } @Override public ChannelHandler handler() { return handler; } private static boolean isInbound(ChannelHandler handler) { return handler instanceof ChannelInboundHandler; } private static boolean isOutbound(ChannelHandler handler) { return handler instanceof ChannelOutboundHandler; }这里不单将ChannelHandler和ChannelHandlerContext关联在一起,还有两个重要的字段需要注意:inbound和outbound。
通过下面的两个方法isInbound(ChannelHandler handler)和isOutbound(ChannelHandler handler)可以知道,如果当前自定义的ChannelHandler对象实现了ChannelInboundHandler接口,则该对象为进站处理器,此时关联的ChannelHandlerContext对象的inbound=true,outbound=false;反之,该对象为出站处理器,与之关联的ChannelHandlerContext的inboun=false,outbound=true。这两个字段有什么作用呢?
上面我们提到:在 ChannelPipeline 传播事件时,它会检查 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。ChannelPipeline 中的事件传播是通过ChannelHandlerContext调用相应的方法展开的,而其中就有检查函数findContextInbound()和findContextOutbound()。
// 这里只看最关键的代码,其他代码删掉 // 进站 public ChannelHandlerContext fireChannelRead(final Object msg) { final AbstractChannelHandlerContext next = findContextInbound(); } // 出站 private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = findContextOutbound(); }可以看出,两个方法的返回值next就是下一个符合条件的ChannelHandler对象,分别进入两个方法查看:
// 查找下一个进站处理器 private AbstractChannelHandlerContext findContextInbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.next; } while (!ctx.inbound); return ctx; } // 查找下一个出站处理器 private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }上面的逻辑很简单,就是对inbound和outbound进行判断,如果当前ChannelHandlerContext的inbound或outbound符合要求就返回。这样就通过inbound和outbound两个字段,将I/O事件传递到了合适的ChannelHandler上了。
**需要注意:**ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。如果调用 Channel 或者 ChannelPipeline 上的这些方法,它们将沿着整个 ChannelPipeline 进行传播。而调用位于 ChannelHandlerContext上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该ChannelPipeline 中的下一个处理该事件ChannelHandler。