Zinx-V0.5消息封装和处理tcp粘包

    xiaoxiao2023-12-25  158

    Zinx-V0.5消息封装和处理tcp粘包

    1.将Request中的data、len再进行封装,封装成一个message,其中包含id、data、len

    1.1.抽象层ziface
    type IMessage interface { GetDataLen() uint32 //获取消息数据段长度 GetMsgId() uint32 //获取消息ID GetData() []byte //获取消息内容 SetMsgId(uint32) //设计消息ID SetData([]byte) //设计消息内容 SetDataLen(uint32) //设置消息数据段长度 }
    1.2.实现层znet
    type Message struct { Id uint32 //消息的ID DataLen uint32 //消息的长度 Data []byte //消息的内容 } //创建一个Message消息包 func NewMsgPackage(id uint32, data []byte) *Message { return &Message{ Id: id, DataLen: uint32(len(data)), Data: data, } } //获取消息数据段长度 func (msg *Message) GetDataLen() uint32 { return msg.DataLen } //获取消息ID func (msg *Message) GetMsgId() uint32 { return msg.Id } //获取消息内容 func (msg *Message) GetData() []byte { return msg.Data } //设置消息数据段长度 func (msg *Message) SetDataLen(len uint32) { msg.DataLen = len } //设计消息ID func (msg *Message) SetMsgId(msgId uint32) { msg.Id = msgId } //设计消息内容 func (msg *Message) SetData(data []byte) { msg.Data = data }

    2.定义一个能解决tcp粘包问题的拆包封包模块

    2.1.抽象层ziface
    type IDataPack interface{ GetHeadLen() uint32 //获取包头长度方法 Pack(msg IMessage)([]byte, error) //封包方法 Unpack([]byte)(IMessage, error) //拆包方法 }
    2.2.实现层znet
    //封包拆包类实例,暂时不需要成员 type DataPack struct {} //封包拆包实例初始化方法 func NewDataPack() *DataPack { return &DataPack{} } //获取包头长度方法 func(dp *DataPack) GetHeadLen() uint32 { //Id uint32(4字节) + DataLen uint32(4字节) return 8 } //封包方法(压缩数据) func(dp *DataPack) Pack(msg ziface.IMessage)([]byte, error) { //创建一个存放bytes字节的缓冲 dataBuff := bytes.NewBuffer([]byte{}) //写dataLen if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil { return nil, err } //写msgID if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgId()); err != nil { return nil, err } //写data数据 if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil { return nil ,err } return dataBuff.Bytes(), nil } //拆包方法(解压数据) func(dp *DataPack) Unpack(binaryData []byte)(ziface.IMessage, error) { //创建一个从输入二进制数据的ioReader dataBuff := bytes.NewReader(binaryData) //只解压head的信息,得到dataLen和msgID msg := &Message{} //读dataLen if err := binary.Read(dataBuff, binary.LittleEndian, &msg.DataLen); err != nil { return nil, err } //读msgID if err := binary.Read(dataBuff, binary.LittleEndian, &msg.Id); err != nil { return nil, err } //判断dataLen的长度是否超出我们允许的最大包长度 if (utils.GlobalObject.MaxPacketSize > 0 && msg.DataLen > utils.GlobalObject.MaxPacketSize) { return nil, errors.New("Too large msg data recieved") } //这里只需要把head的数据拆包出来就可以了,然后再通过head的长度,再从conn读取一次数据 return msg, nil }

    ​ 封包过程就是将一个message封装成一条[]byte类型的数据,其中前4字节是一个uint32类型数据,内容为data数据的len,后面4字节是一个uint32类型数据,内容为data数据的id。

    ​ 拆包过程就是先读出数据的前8字节内容,代表了后面data的len和id,用这两数据返回一个message类型的对象,然后通过len来判断后面data的len,创建一个len长度的缓冲区,将一条完整的data读出,添加到message中,完成解读。

    3.将上述两模块集成到zinx中

    3.1改变Request的抽象层和实现层
    type IRequest interface { //得到当前的请求的链接 GetConnection() IConnection //得到请求的消息 GetMsg() IMessage } type Request struct { //链接信息 conn ziface.IConnection //客户端发送的消息 msg ziface.IMessage } func NewReqeust(conn ziface.IConnection, msg ziface.IMessage) ziface.IRequest { req := &Request{ conn:conn, msg:msg, } return req } //得到当前的请求的链接 func(r *Request) GetConnection() ziface.IConnection { return r.conn } //得到链接的数据 func(r *Request) GetMsg() ziface.IMessage { return r.msg }
    3.2将zinx读包 的机制,由之前的一次read,改成按照拆包机制, 读取两次
    func (c *Connection) StartReader() { ...... for { /* buf := make([]byte, config.GlobalObject.MaxPackageSize) cnt, err := c.Conn.Read(buf) if err != nil { fmt.Println("recv buf err", err) break } */ //创建拆包封包的对象 dp := NewDataPack() //读取客户端消息的头部 headData := make([]byte, dp.GetHeadLen()) if _, err := io.ReadFull(c.Conn, headData); err != nil { fmt.Println("read msg head error", err) break } //根据头部 获取数据的长度,进行第二次读取 msg, err := dp.UnPack(headData) //将msg 头部信息填充满 if err != nil { fmt.Println("unpack error ", err) break } //根据长度 再次读取 var data []byte if msg.GetMsgLen() > 0 { //有内容 data = make([]byte, msg.GetMsgLen()) if _, err := io.ReadFull(c.Conn, data); err != nil { fmt.Println("read msg data error ", err) break } } msg.SetData(data) //将读出来的msg 组装一个request //将当前一次性得到的对端客户端请求的数据 封装成一个Request req := NewReqeust(c, msg) //调用用户传递进来的业务 模板 设计模式 go func() { c.Router.PreHandle(req) c.Router.Handle(req) c.Router.PostHandle(req) }() } }
    3.3将zinx的发包机制, 有之前的write二进制,改成先封包,再发送

    修改send方法

    func (c *Connection) Send(msgId uint32, msgData []byte) error { if c.isClosed == true { return errors.New("Connection closed ..send Msg ") } //封装成msg dp := NewDataPack() binaryMsg, err := dp.Pack(NewMsgPackage(msgId, msgData)) if err != nil { fmt.Println("Pack error msg id = ", msgId) return err } //将binaryMsg发送给对端 if _, err := c.Conn.Write(binaryMsg); err != nil { fmt.Println("send buf error") return err } return nil }
    最新回复(0)