Netty TCP通讯拆包、粘包处理

    xiaoxiao2024-12-27  68

    tcp是个流协议,所谓流,就是没有界限的一串数据。tcp底层并不了解上层业务的具体含义,它会根据tcp缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被tcp拆分为多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送。这就是所谓的tcp拆包/粘包问题。

    拆包:通常是由于发送较大长度数的据超出了自定义长度,或者超出了相关网络传输协议的长度限制,发送的一包数据被拆分为多次发送。

    粘包:由于前后包之间的发送间隔过短,造成接收端将多条数据当做同一包数据读取出来。例子如下,

    channel.writeAndFlush(sMsg); channel.writeAndFlush(sMsg); channel.writeAndFlush(sMsg);

    连续多个发送,其实是发送了多个包,对方应该把其看成是多个消息。但是因为发送的过快,对方几乎一定会把其当作一个包来处理。看成是发送了一条消息。这个就发生了粘包。

    netty中解决方案:(注意事项请看文章最后)

    1)LineBasedFrameDecoder行分割解码

    SocketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024); SocketChannel.pipeline().addLast(new StringDecoder());

    LineBaseFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有"\n"或者"\r\n",如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。如果连续取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。

    StringDecoder的功能非常简单,就是将接受到的对象转换成字符串,然后继续调用后面的Handler。

    LineBasedFrameDecoder+StringDecoder的组合就是按行切换的文本解码器,它被设计用来支持TCP的粘包和拆包。

     

    2)DelimiterBasedFrameDecoder自定义分隔符

    // 创建分隔符缓冲对象$_作为分割符 ByteBuf byteBuf = Unpooled.copiedBuffer("$_".getBytes()); /** * 第一个参数:单条消息的最大长度,当达到最大长度仍然找不到分隔符抛异常,防止由于异常码流缺失分隔符号导致的内存溢出 * 第二个参数:分隔符缓冲对象 */ socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf)); socketChannel.pipeline().addLast(new StringDecoder());

    DelimiterBasedFrameDecoder还可以设置对自定义分割付的处理,如下:

    ByteBuf delemiter= Unpooled.buffer(); delemiter.writeBytes("$##$".getBytes());//自定义分隔符 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, true, true,delemiter)); //netty实现 DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast,ByteBuf delimiter)

    maxLength: 表示一行最大的长度,如果超过这个长度依然没有检测到\n或者\r\n,将会抛出TooLongFrameException

    failFast: 与maxLength联合使用,表示超过maxLength后,抛出TooLongFrameException的时机。如果为true,则超出maxLength后立即抛出TooLongFrameException,不继续进行解码;如果为false,则等到完整的消息被解码后,再抛出TooLongFrameException异常。

    stripDelimiter: 解码后的消息是否去除分隔符。

    delimiters: 分隔符。我们需要先将分割符,写入到ByteBuf中,然后当做参数传入。

    需要注意的是,netty并没有提供一个DelimiterBasedFrameDecoder对应的编码器实现(笔者没有找到),因此在发送端需要自行编码,添加分隔符。

     

    3)FixedLengthFrameDecoder定长,即发送接受固定长度的包。感觉不大适合我的业务,暂时不考虑使用。

     

    注意事项:

    1、编码格式的设置

    //字符串编解码器获取环境默认编码格式 pipeline.addLast( new StringDecoder(), new StringEncoder() ); //指定字符串编解码器编码格式为UTF-8 pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));

    2、自定义分隔符和解码的添加顺序是,先添加自定义解码器,然后再添加StringDecoder,否则分割无效。

    //先使用DelimiterBasedFrameDecoder解码,以自定义的字符作为分割符 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, true, true,delemiter)); //解码为UTF-8字符串 socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));

     

    实际开发代码示例:

    以下示例时结合业务需求写的,有些地方不需要,请自行删除,仅供参考。

    package com.groot.CPMasterController.netty.tcp.server; import com.groot.CPMasterController.netty.tcp.entity.TCPConst; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * description: * author:groot * date: 2019-4-10 12:07 **/ @Component @PropertySource(value="classpath:config.properties") @Slf4j public class TcpServer { //boss事件轮询线程组 //处理Accept连接事件的线程,这里线程数设置为1即可,netty处理链接事件默认为单线程,过度设置反而浪费cpu资源 private EventLoopGroup boss = new NioEventLoopGroup(1); //worker事件轮询线程组 //处理hadnler的工作线程,其实也就是处理IO读写 。线程数据默认为 CPU 核心数乘以2 private EventLoopGroup worker = new NioEventLoopGroup(); @Autowired TCPServerChannelInitializer TCPServerChannelInitializer; @Value("${netty.tcp.server.port}") private Integer port; //与客户端建立连接后得到的通道对象 private Channel channel; /** * @Author groot * @Date 2019/4/13 12:46 * @Param * @return * @Description 存储所有client的channel **/ // public static Map<String, Channel> clientTotalMap = new ConcurrentHashMap<String, Channel>(); /** * @Author groot * @Date 2019/4/13 12:46 * @Param key 链接身份,Value channel队列 * @return * @Description 分类型存储业务所需channel **/ public static Map<String, Set<Channel>> clientTypeMap = new ConcurrentHashMap<>(); /** * @Author groot * @Date 2019/4/13 12:46 * @Param [] * @return io.netty.channel.ChannelFuture * @Description 开启Netty tcp server服务 **/ public void start() { try { //启动类 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(boss, worker)//组配置,初始化ServerBootstrap的线程组 .channel(NioServerSocketChannel.class)///构造channel通道工厂//bossGroup的通道,只是负责连接 .childHandler(TCPServerChannelInitializer)//设置通道处理者ChannelHandlerworkerGroup的处理器 .option(ChannelOption.SO_BACKLOG, 1024)//socket参数,当服务器请求处理程全满时,用于临时存放已完成三次握手请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。 .childOption(ChannelOption.SO_KEEPALIVE, true)//启用心跳保活机制,tcp,默认2小时发一次心跳 .childOption(ChannelOption.TCP_NODELAY, true)//2019年4月15日新增 TCP无延迟 .handler(new LoggingHandler(LogLevel.INFO));//2019年4月15日新增 日志级别info //Future:异步任务的生命周期,可用来获取任务结果 // ChannelFuture channelFuture1 = serverBootstrap.bind(port).syncUninterruptibly();//绑定端口,开启监听,同步等待 ChannelFuture channelFuture = serverBootstrap.bind(port).sync();//绑定端口,开启监听,同步等待 if (channelFuture != null && channelFuture.isSuccess()) { channel = channelFuture.channel();//获取通道 log.info("Netty tcp server start success, port = {}", port); } else { log.error("Netty tcp server start fail"); } channelFuture.channel().closeFuture().sync();// 监听服务器关闭监听 } catch (InterruptedException e) { log.error("Netty tcp server start Exception e:"+e); }finally { boss.shutdownGracefully(); //关闭EventLoopGroup,释放掉所有资源包括创建的线程 worker.shutdownGracefully(); } } /** * @Author groot * @Date 2019/4/13 12:46 * @Param [] * @return void * @Description 停止Netty tcp server服务 **/ @PreDestroy public void destroy() { if (channel != null) { channel.close(); } try { Future<?> future = worker.shutdownGracefully().await(); if (!future.isSuccess()) { log.error("netty tcp workerGroup shutdown fail, {}", future.cause()); } Future<?> future1 = boss.shutdownGracefully().await(); if (!future1.isSuccess()) { log.error("netty tcp bossGroup shutdown fail, {}", future1.cause()); } } catch (InterruptedException e) { e.printStackTrace(); } log.info("Netty tcp server shutdown success"); } /** * @Author groot * @Date 2019/5/8 14:34 * @Param [identity, msg] 链接身份,消息 * @return void * @Description 通过 **/ public static void sendMsg(String identity,String msg) { send(identity, msg,true); } /** * @Author groot * @Date 2019/5/8 14:34 * @Param [identity, msg] 链接身份,消息 * @return void * @Description 通过 **/ public static void sendHeart(String identity,String msg) { send(identity, msg,false); } /** * @Author groot * @Date 2019/5/17 15:38 * @Param [identity, msg,endFlag] endFlag是否添加结束符 * @return void * @Description 发送 **/ public static void send(String identity, String msg,boolean endFlag) { //log.info("sendMsg to:{},msg:{}",identity,msg); if(StringUtils.isEmpty(identity) || StringUtils.isEmpty(msg))return; StringBuffer sMsg = new StringBuffer(msg); if(endFlag){ sMsg.append(TCPConst.MARK_END);//拼接消息截止符 } Set<Channel> channels = TcpServer.clientTypeMap.get(identity); if(channels!=null && !channels.isEmpty()){//如果有client链接 //遍历发送消息 for (Channel channel:channels){ channel.writeAndFlush(sMsg).syncUninterruptibly(); } } } // 这个注解表示在spring boot依赖注入完成后执行一次该方法,但对方法有很严格的要求 @PostConstruct() public void init() { //需要开启一个新的线程来执行netty server 服务器 new Thread(new Runnable() { public void run() { start(); } }).start(); } }

     

    package com.groot.CPMasterController.netty.tcp.server; import com.alibaba.fastjson.JSON; import com.groot.CPMasterController.common.utils.TimeUtil; import com.groot.CPMasterController.control.service.ipml.GameControlService; import com.groot.CPMasterController.netty.tcp.entity.TCPConst; import com.groot.CPMasterController.netty.websocket.WebSocketServer; import io.netty.channel.Channel; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.SocketAddress; import java.util.*; /** * description: * author:groot * date: 2019-4-10 15:49 **/ @Component @ChannelHandler.Sharable @Slf4j public class TCPServerChannelHandler extends SimpleChannelInboundHandler<Object> { @Resource GameControlService gameControlService; //msg拼包 private StringBuffer msgBuffer = new StringBuffer(); /** * 拿到传过来的msg数据,开始处理 * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("Netty tcp server receive data : " + msg); //tcp监听收到消息,解析消息内容 try { if(null==msg)return; String sMsg = msg.toString(); if (TCPConst.MARK_HEART.equals(sMsg)){//心跳 //TODO 心跳暂时无需处理 // log.info("Netty tcp server receive heart : " + msg); }else{ // spellPackage(ctx, sMsg);//手动解决拆包粘包 //netty自定义分隔符解析 dealMsg(ctx,sMsg); } } catch (Exception e) { log.error("Netty tcp server channelRead0 Exception e:{}",e); } } /** * @Author groot * @Date 2019/5/17 9:21 * @Param [ctx, msg] * @return void * @Description TCP拼包 **/ private void spellPackage(ChannelHandlerContext ctx, String msg) { if(StringUtils.isNotBlank(msg)){ synchronized (msgBuffer) { StringBuffer msgbf = new StringBuffer(msg); String sMsg = msgBuffer.append(msgbf).toString(); if(sMsg.contains(TCPConst.MARK_HEART)){//包含心跳 sMsg=sMsg.replaceAll(TCPConst.MARK_HEART,"");//去心跳 } if(sMsg.contains(TCPConst.MARK_END)){//包含结束符 msgBuffer.setLength(0);//清空缓存 需要重新拼接 String[] splitMsg = sMsg.trim().split(TCPConst.MARK_END); int arrLen = splitMsg.length; if(arrLen ==1){ dealMsg(ctx,splitMsg[0]); }else if(arrLen >1){ for(int i = 0; i< arrLen; i++){ if(i==0){ dealMsg(ctx,splitMsg[0]); }else if(i==arrLen-1){ if(msg.endsWith(TCPConst.MARK_END)) {//最后一条是完整的 dealMsg(ctx,splitMsg[i]); }else {//最后一条结尾不是结束符 继续拼接 msgBuffer.append(splitMsg[i]);//只拼接最后一条不执行 } }else { dealMsg(ctx,splitMsg[i]); } } } } } } } /** * @Author groot * @Date 2019/5/16 17:23 * @Param [ctx, msg] * @return void * @Description 处理消息 **/ private void dealMsg(ChannelHandlerContext ctx,String msg) { log.info("dealMsg :::{}",msg); gameControlService.dealMassage( ctx, msg); } /** * 活跃的、有效的通道 * 第一次连接成功后进入的方法 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); log.info("tcp client " + getRemoteAddress(ctx) + " connect success"); } /** * 不活动的通道 * 连接丢失后执行的方法(client端可据此实现断线重连) * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //删除Channel Map中的失效Client log.error("检测到不活跃的通道-- ip:{} ,即将删除", getRemoteAddress(ctx)); removeChannel(ctx.channel());//安全删除channel ctx.close(); } /** * 异常处理 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); //发生异常,关闭连接 log.error(" ip:{} -- 的通道发生异常,即将断开连接", getRemoteAddress(ctx)); removeChannel(ctx.channel());//安全删除channel ctx.close();//再次建议close 内部解析的错误已经在channelRead0中捕获 所以这里的异常 应该是在连接出现异常时出现 } /** * 心跳机制,超时处理 * * @param ctx * @param evt * @throws Exception */ /*@Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { String socketString = ctx.channel().remoteAddress().toString(); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { log.info("Client: " + socketString + " READER_IDLE 读超时"); ctx.disconnect();//断开 } else if (event.state() == IdleState.WRITER_IDLE) { log.info("Client: " + socketString + " WRITER_IDLE 写超时"); ctx.disconnect(); } else if (event.state() == IdleState.ALL_IDLE) { log.info("Client: " + socketString + " ALL_IDLE 总超时"); ctx.disconnect(); } } }*/ /** * @Author groot * @Date 15:07 * @Param [id, channel] * @return void * @Description 添加业务channel **/ public static void addChannel(String id,Channel channel){ synchronized (TcpServer.clientTypeMap) { if(TcpServer.clientTypeMap.get(id)!=null){ TcpServer.clientTypeMap.get(id).add(channel); }else { Set<Channel> channels = new HashSet<>(); channels.add(channel); TcpServer.clientTypeMap.put(id,channels); } } log.info("addChannel -- 业务channel size:{}",getNumberOfClients()); } /** * @Author groot * @Date 2019/5/8 13:34 * @Param [channel] * @return void * @Description 安全移除业务channel **/ private void removeChannel(Channel channel){ synchronized (TcpServer.clientTypeMap) { if(!TcpServer.clientTypeMap.isEmpty()){ Set<String> keys = TcpServer.clientTypeMap.keySet(); for (String key:keys){ Set<Channel> channels = TcpServer.clientTypeMap.get(key); //判断包含channel if(channels!=null && channels.contains(channel)){ //获取ip SocketAddress socketAddress = channel.remoteAddress(); //删除channel channels.remove(channel); //获取通知web端 String clientName=""; switch (key){ case TCPConst.ID_CPTIMER: clientName =TCPConst.ID_CPTIMER_NAME; break; case TCPConst.ID_DEVCTRL: clientName =TCPConst.ID_DEVCTRL_NAME; break; case TCPConst.ID_GPSCTRL: clientName =TCPConst.ID_GPSCTRL_NAME; break; default: clientName =TCPConst.ID_UNKNOWN_NAME; break; } warnMsgToWeb(socketAddress.toString().replaceAll("/",""), clientName); } } } } } /** * @Author groot * @Date 2019/5/17 13:43 * @Param [socketAddress, clientName] * @return void * @Description 提示web连接断开 **/ private void warnMsgToWeb(String ip, String clientName) { Map<String,Object> map=new HashMap<>(); map.put("type","warn"); map.put("name",clientName); map.put("ip",ip); map.put("msg","断开连接"); map.put("time", TimeUtil.dateFormat(new Date())); WebSocketServer.sendInfo(JSON.toJSONString(map),null); } /** * @Author groot * @Date 2019/5/8 14:00 * @Param [] * @return int * @Description 获取当前channel连接数 **/ public static int getNumberOfClients(){ int count = 0; if(TcpServer.clientTypeMap!=null && !TcpServer.clientTypeMap.isEmpty()){ for (Set<Channel> channels: TcpServer.clientTypeMap.values()) count += channels.size(); } return count; } /** * 获取client对象:ip+port * * @param ctx * @return */ public String getRemoteAddress(ChannelHandlerContext ctx) { String socketString = ctx.channel().remoteAddress().toString(); return socketString; } /** * 获取client的ip * * @param ctx * @return */ public String getIPString(ChannelHandlerContext ctx) { String socketString = ctx.channel().remoteAddress().toString(); int colonAt = socketString.indexOf(":"); String ipString = socketString.substring(1, colonAt); return ipString; } } package com.groot.CPMasterController.netty.tcp.server; import com.groot.CPMasterController.netty.tcp.entity.TCPConst; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * description: 通道初始化,主要用于设置各种Handler * author:groot * date: 2019-4-10 14:55 **/ @Component public class TCPServerChannelInitializer extends ChannelInitializer<SocketChannel> { @Autowired TCPServerChannelHandler TCPServerChannelHandler; @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // ChannelPipeline pipeline = socketChannel.pipeline(); //IdleStateHandler心跳机制,如果超时触发Handle中userEventTrigger()方法 // pipeline.addLast("idleStateHandler",new IdleStateHandler(15, 0, 0, TimeUnit.MINUTES)); //字符串编解码器获取环境默认编码格式 ,如UTF-8 // pipeline.addLast( // new StringDecoder(), // new StringEncoder() // ); //指定字符串编解码器编码格式为UTF-8 // pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); // pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); ByteBuf delemiter= Unpooled.buffer(); delemiter.writeBytes(TCPConst.MARK_END.getBytes());//自定义分隔符 /** * 第一个参数:单条消息的最大长度,当达到最大长度仍然找不到分隔符抛异常,防止由于异常码流缺失分隔符号导致的内存溢出 * 第二个参数:分隔符缓冲对象 */ //先使用DelimiterBasedFrameDecoder解码,以自定义的字符作为分割符 socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(65535, true, true,delemiter)); //解码为UTF-8字符串 socketChannel.pipeline().addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); //编码为UTF-8字符串 socketChannel.pipeline().addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); //自定义Handler socketChannel.pipeline().addLast("serverChannelHandler", TCPServerChannelHandler); } }

     

    最新回复(0)