Netty详解(七):Netty 编解码以及消息头编解码器

    xiaoxiao2022-06-26  209

    1. MessagePack 概述

    MessagePack是一个高效的二进制序列化框架,像JSON一样支持不同语言间的数据交换,速度更快,序列化之后的码流更小。

    MessagePacke优点

    编解码高效,性能高序列化后的码流小支持跨语言

    1.1 MessagePack Java API 介绍

    <dependencies> ... <dependency> <groupId>org.msgpack</groupId> <artifactId>msgpack</artifactId> <version>${msgpack.version}</version> </dependency> ... </dependencies>

    1.2 API 官方示例:

    import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.msgpack.MessagePack; import org.msgpack.template.Templates; public class TestMessagePack { public static void main(String[] args) { // Create serialize objects List<String> src=new ArrayList<String>(); src.add("msgpack"); src.add("kumofs"); src.add("viver"); MessagePack msgpack=new MessagePack(); // Serialize byte[] raw; try { raw = msgpack.write(src); // Deserialize directly using a template List<String> dst1 = msgpack.read(raw,Templates.tList(Templates.TString)); System.out.println(dst1.get(0)); System.out.println(dst1.get(1)); System.out.println(dst1.get(2)); } catch (IOException e) { e.printStackTrace(); } } }

    2. Netty 编码器和解码器开发

    2.1 Netty编码器

    import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class MsgpackEncoder extends MessageToByteEncoder<Object> { @Override protected void encode(ChannelHandlerContext arg0, Object arg1, ByteBuf arg2) throws Exception { MessagePack msgpack=new MessagePack(); byte[] raw=msgpack.write(arg1); arg2.writeBytes(raw); } }

    Netty编码继承MessageToByteEncoder,它负责将Object类型的POJO对象编码成byte数组,然后写入到ByteBuffer中。

    2.2 Netty解码器

    import java.util.List; import org.msgpack.MessagePack; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageDecoder; public class MsgPackDecoder extends MessageToMessageDecoder<ByteBuf> { @Override protected void decode(ChannelHandlerContext arg0, ByteBuf arg1, List<Object> arg2) throws Exception { final byte[] array; final int length=arg1.readableBytes(); array=new byte[length]; arg1.getBytes(arg1.readerIndex(), array,0,length); MessagePack msgpack=new MessagePack(); arg2.add(msgpack.read(array)); } }

    Netty解码继承MessageToMessageDecoder,他负责将ByteBuffer类型的数据解码成Object对象。首先从数据报arg1中获取需要解码的byte数组,然后调用MessagePacke的read方法将其反序列化为Object对象,将解码的对象加入到解码列表arg2中,这样就完成了MessagePack解码工作。

    3. 利用LengthFieldBasedFrameDecoder解决TCP粘包/拆包

    对于TCP粘包 拆包问题来说,最常用的在消息头中新增报文长度字段,然后利用该字段进行半包的编解码。下面我们就利用Netty提供的LengthFieldBasedFrameDecoder和LengthFieldPrepender结合新开发的Netty编解码器来实现对TCP粘包/拆包的支持。

    EventLoopGroup group=new NioEventLoopGroup(); try{ Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000) .handler(new ChannelInitializer<SocketChannel>(){ @Override protected void initChannel(SocketChannel ch) throws Exception { //LengthFieldBasedFrameDecoder用于处理半包消息 //这样后面的MsgpackDecoder接收的永远是整包消息 ch.pipeline().addLast("frameDecoder",new LengthFieldBasedFrameDecoder(65535,0,2,0,2)); ch.pipeline().addLast("msgpack decoder",new MsgPackDecoder()); //在ByteBuf之前增加2个字节的消息长度字段 ch.pipeline().addLast("frameEncoder",new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgpackEncoder()); ch.pipeline().addLast(new EchoClientHandler(sendNumber)); } }); ChannelFuture f= b.connect(host,port).sync(); f.channel().closeFuture().sync(); }

    在MessagePack编码器之前增加LengthFieldPrepender,它将在ByteBuffer之前增加2个字节的消息长度字段,其原理如下:

    在MessagePack解码器之前增加LengthFieldBasedFrameDecoder,用于处理半包消息,这样后面的MsgpackDecoder接受到的永远是整包消息。它的工作原理如下:

    LengthFieldBasedFrameDecoder构造函数参数列表如下:

    * @param maxFrameLength * the maximum length of the frame. If the length of the frame is * greater than this value, {@link TooLongFrameException} will be * thrown. * @param lengthFieldOffset * the offset of the length field * @param lengthFieldLength * the length of the length field * @param lengthAdjustment * the compensation value to add to the value of the length field * @param initialBytesToStrip * the number of first bytes to strip out from the decoded frame

    最新回复(0)