不像DISCARD和ECHO的服务端,对于TIME协议我们需要一个客户端因为人们不能把一个32位的二进制数据翻译成一个日期或者日历。在这一部分,我们将会讨论如何确保服务端是正常工作的,并且学习怎样用Netty编写一个客户端。
在Netty中,编写服务端和客户端最大的并且唯一不同的使用了不同的BootStrap和Channel的实现。请看一下下面的代码:
01 package io.netty.example.time; 02 03 public class TimeClient { 04 public static void main(String[] args) throws Exception { 05 String host = args[0]; 06 int port = Integer.parseInt(args[1]); 07 EventLoopGroup workerGroup = new NioEventLoopGroup(); 08 09 try { 10 Bootstrap b = new Bootstrap(); // (1) 11 b.group(workerGroup); // (2) 12 b.channel(NioSocketChannel.class); // (3) 13 b.option(ChannelOption.SO_KEEPALIVE, true); // (4) 14 b.handler(new ChannelInitializer<SocketChannel>() { 15 @Override 16 public void initChannel(SocketChannel ch) throws Exception { 17 ch.pipeline().addLast(new TimeClientHandler()); 18 } 19 }); 20 21 // Start the client. 22 ChannelFuture f = b.connect(host, port).sync(); // (5) 23 24 // Wait until the connection is closed. 25 f.channel().closeFuture().sync(); 26 } finally { 27 workerGroup.shutdownGracefully(); 28 } 29 } 30} BootStrap和ServerBootstrap类似,不过他是对非服务端的channel而言,比如客户端或者无连接传输模式的channel。如果你只指定了一个EventLoopGroup,那他就会即作为一个‘boss’线程,也会作为一个‘workder’线程,尽管客户端不需要使用到‘boss’线程。代替NioServerSocketChannel的是NioSocketChannel,这个类在客户端channel被创建时使用。不像在使用ServerBootstrap时需要用childOption()方法,因为客户端的SocketChannel没有父channel的概念。我们用connect()方法代替了bind()方法。正如你看到的,他和服务端的代码是不一样的。ChannelHandler是如何实现的?他应该从服务端接受一个32位的整数消息,把他翻译成人们能读懂的格式,并打印翻译好的时间,最后关闭连接:
01 package io.netty.example.time; 02 03 import java.util.Date; 04 05 public class TimeClientHandler extends ChannelHandlerAdapter { 06 @Override 07 public void channelRead(ChannelHandlerContext ctx, Object msg) { 08 ByteBuf m = (ByteBuf) msg; // (1) 09 try { 10 long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; 11 System.out.println(new Date(currentTimeMillis)); 12 ctx.close(); 13 } finally { 14 m.release(); 15 } 16 } 17 18 @Override 19 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 20 cause.printStackTrace(); 21 ctx.close(); 22 } 23} 在TCP/IP中,NETTY会把读到的数据放到ByteBuf的数据结构中。这样看起来非常简单,并且和服务端的那个例子的代码也相差不多。然而,处理器有时候会因为抛出IndexOutOfBoundsException而拒绝工作。在下个部分我们会讨论为什么会发生这种情况。
在基于流的传输里比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅是作为一连串的字节而言。因此这是不能保证你远程写入的数据就会准确地读取。举个例子,让我们假设操作系统的TCP/TP协议栈已经接收了3个数据包:
由于基于流传输的协议的这种普通的性质,在你的应用程序里读取数据的时候会有很高的可能性被分成下面的片段。
因此,一个接收方不管他是客户端还是服务端,都应该把接收到的数据整理成一个或者多个更有意思并且能够让程序的业务逻辑更好理解的数据。在上面的例子中,接收到的数据应该被构造成下面的格式:
现在让我们回到TIME客户端的例子上。这里我们遇到了同样的问题,一个32字节数据是非常小的数据量,他并不见得会被经常拆分到到不同的数据段内。然而,问题是他确实可能会被拆分到不同的数据段内,并且拆分的可能性会随着通信量的增加而增加。
最简单的方案是构造一个内部的可积累的缓冲,直到4个字节全部接收到了内部缓冲。下面的代码修改了TimeClientHandler的实现类修复了这个问题
01 package io.netty.example.time; 02 03 import java.util.Date; 04 05 public class TimeClientHandler extends ChannelHandlerAdapter { 06 private ByteBuf buf; 07 08 @Override 09 public void handlerAdded(ChannelHandlerContext ctx) { 10 buf = ctx.alloc().buffer(4); // (1) 11 } 12 13 @Override 14 public void handlerRemoved(ChannelHandlerContext ctx) { 15 buf.release(); // (1) 16 buf = null; 17 } 18 19 @Override 20 public void channelRead(ChannelHandlerContext ctx, Object msg) { 21 ByteBuf m = (ByteBuf) msg; 22 buf.writeBytes(m); // (2) 23 m.release(); 24 25 if (buf.readableBytes() >= 4) { // (3) 26 long currentTimeMillis = (buf.readInt() - 2208988800L) * 1000L; 27 System.out.println(new Date(currentTimeMillis)); 28 ctx.close(); 29 } 30 } 31 32 @Override 33 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { 34 cause.printStackTrace(); 35 ctx.close(); 36 } 37} ChannelHandler有2个生命周期的监听方法:handlerAdded()和handlerRemoved()。你可以完成任意初始化任务只要他不会被阻塞很长的时间。首先,所有接收的数据都应该被累积在buf变量里。然后,处理器必须检查buf变量是否有足够的数据,在这个例子中是4个字节,然后处理实际的业务逻辑。否则,Netty会重复调用channelRead()当有更多数据到达直到4个字节的数据被积累。尽管第一个解决方案已经解决了Time客户端的问题了,但是修改后的处理器看起来不那么的简洁,想象一下如果由多个字段比如可变长度的字段组成的更为复杂的协议时,你的ChannelHandler的实现将很快地变得难以维护。
正如你所知的,你可以增加多个ChannelHandler到ChannelPipeline ,因此你可以把一整个ChannelHandler拆分成多个模块以减少应用的复杂程度,比如你可以把TimeClientHandler拆分成2个处理器:
TimeDecoder处理数据拆分的问题TimeClientHandler原始版本的实现幸运地是,Netty提供了一个可扩展的类,帮你完成TimeDecoder的开发。
01 package io.netty.example.time; 02 03 public class TimeDecoder extends ByteToMessageDecoder { // (1) 04 @Override 05 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2) 06 if (in.readableBytes() < 4) { 07 return; // (3) 08 } 09 10 out.add(in.readBytes(4)); // (4) 11 } 12} ByteToMessageDecoder是ChannelHandler的一个实现类,他可以在处理数据拆分的问题上变得很简单。每当有新数据接收的时候,ByteToMessageDecoder都会调用decode()方法来处理内部的那个累积缓冲。Decode()方法可以决定当累积缓冲里没有足够数据时可以往out对象里放任意数据。当有更多的数据被接收了ByteToMessageDecoder会再一次调用decode()方法。如果在decode()方法里增加了一个对象到out对象里,这意味着解码器解码消息成功。ByteToMessageDecoder将会丢弃在累积缓冲里已经被读过的数据。请记得你不需要对多条消息调用decode(),ByteToMessageDecoder会持续调用decode()直到不放任何数据到out里。现在我们有另外一个处理器插入到ChannelPipeline里,我们应该在TimeClient里修改ChannelInitializer 的实现:
1 b.handler(new ChannelInitializer<SocketChannel>() { 2 @Override 3 public void initChannel(SocketChannel ch) throws Exception { 4 ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler()); 5 } 6});如果你是一个大胆的人,你可能会尝试使用更简单的解码类ReplayingDecoder。不过你还是需要参考一下API文档来获取更多的信息。
1 public class TimeDecoder extends ReplayingDecoder { 2@Override 3 protected void decode( 4 ChannelHandlerContext ctx, ByteBuf in, List<object width="300" height="150">out) {out.add(in.readBytes(4));}}此外,Netty还提供了更多可以直接拿来用的解码器使你可以更简单地实现更多的协议,帮助你避免开发一个难以维护的处理器实现。请参考下面的包以获取更多更详细的例子:
对于二进制协议请看io.netty.example.factorial 对于基于文本协议请看io.netty.example.telnet我们已经讨论了所有的例子,到目前为止一个消息的消息都是使用ByteBuf作为一个基本的数据结构。在这一部分,我们会改进TIME协议的客户端和服务端的例子,用POJO替代ByteBuf。在你的ChannelHandlerS中使用POJO优势是比较明显的。通过从ChannelHandler中提取出ByteBuf的代码,将会使ChannelHandler的实现变得更加可维护和可重用。在TIME客户端和服务端的例子中,我们读取的仅仅是一个32位的整形数据,直接使用ByteBuf不会是一个主要的问题。然后,你会发现当你需要实现一个真实的协议,分离代码变得非常的必要。首先,让我们定义一个新的类型叫做UnixTime。
01 package io.netty.example.time; 02 03 import java.util.Date; 04 05 public class UnixTime { 06 07 private final int value; 08 09 public UnixTime() { 10 this((int) (System.currentTimeMillis() / 1000L + 2208988800L)); 11 } 12 13 public UnixTime(int value) { 14 this.value = value; 15 } 16 17 public int value() { 18 return value; 19 } 20 21 @Override 22 public String toString() { 23 return new Date((value() - 2208988800L) * 1000L).toString(); 24 } 25}现在我们可以修改下TimeDecoder类,返回一个UnixTime,以替代ByteBuf
1@Override 2 protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { 3 if (in.readableBytes() < 4) { 4 return; 5 } 6 7 out.add(new UnixTime(in.readInt())); 8}下面是修改后的解码器,TimeClientHandler不再有任何的ByteBuf代码了。
1@Override 2 public void channelRead(ChannelHandlerContext ctx, Object msg) { 3 UnixTime m = (UnixTime) msg; 4 System.out.println(m); 5 ctx.close(); 6}是不是变得更加简单和优雅了?相同的技术可以被运用到服务端。让我们修改一下TimeServerHandler的代码。
1@Override 2 public void channelActive(ChannelHandlerContext ctx) { 3 ChannelFuture f = ctx.writeAndFlush(new UnixTime()); 4 f.addListener(ChannelFutureListener.CLOSE); 5}现在,仅仅需要修改的是ChannelHandler的实现,这里需要把UnixTime对象重新转化为一个ByteBuf。不过这已经是非常简单了,因为当你对一个消息编码的时候,你不需要再处理拆包和组装的过程。
01 package io.netty.example.time; 02 03 public class TimeEncoder extends ChannelHandlerAdapter { 04 @Override 05 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { 06 UnixTime m = (UnixTime) msg; 07 ByteBuf encoded = ctx.alloc().buffer(4); 08 encoded.writeInt(m.value()); 09 ctx.write(encoded, promise); // (1) 10 } 11} 在这几行代码里还有几个重要的事情。第一, 通过ChannelPromise,当编码后的数据被写到了通道上Netty可以通过这个对象标记是成功还是失败。第二, 我们不需要调用cxt.flush()。因为处理器已经单独分离出了一个方法void flush(ChannelHandlerContext cxt),如果像自己实现flush方法内容可以自行覆盖这个方法。进一步简化操作,你可以使用MessageToByteEncode:
1 public class TimeEncoder extends MessageToByteEncoder<UnixTime> { 2 @Override 3 protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) { 4 out.writeInt(msg.value()); 5 } 6}最后的任务就是在TimeServerHandler之前把TimeEncoder插入到ChannelPipeline。但这是不那么重要的工作。
关闭一个Netty应用往往只需要简单地通过shutdownGracefully()方法来关闭你构建的所有的NioEventLoopGroupS.当EventLoopGroup被完全地终止,并且对应的所有channels都已经被关闭时,Netty会返回一个Future对象。
在这一章节中,我们会快速地回顾下如果在熟练掌握Netty的情况下编写出一个健壮能运行的网络应用程序。在Netty接下去的章节中还会有更多更相信的信息。我们也鼓励你去重新复习下在io.netty.example包下的例子。请注意社区一直在等待你的问题和想法以帮助Netty的持续改进,Netty的文档也是基于你们的快速反馈上。
转载自 并发编程网 - ifeve.com 相关资源:Netty5学习指南