Zinx-v0.8消息队列和任务池

    xiaoxiao2025-04-23  18

    消息队列和任务池

    原来一个链接对应一个Reader和一个Writer,一个消息对应Handler

    原来客户端发一个Message,服务器会开辟一个Handler Goroutine去处理业务。很多Message会有很多Handler Goroutine

    为了提高性能,要降低Gorouine数量

    手段:让处理业务的 Handler Goroutine和消息无关

    制定一个worker处理业务

    启动一定数量的worker处理业务的go程,每个worker绑定一个任务的队列

    能够降低go的数量

    内存节省

    go调度器切换go的成本降低,更好利用cpu

    比如1万个Reader和1万个Writer都是阻塞,并不会占用太多的cpu资源

    消息列队及worker工作池的实现

    创建消息队列

    属性

    消息队列

    工作池的数量

    type MsgHandler struct { //存放路由集合的map Apis map[uint32] ziface.IRouter //就是开发者全部的业务,消息ID和业务的对应关系 //负责Worker取任务的消息队列 一个worker对应一个任务队列 TaskQueue []chan ziface.IRequest //worker工作池的worker数量 WorkerPoolSize uint32 }
    构造方法
    //初始化方法 func NewMsgHandler() ziface.IMsgHandler { //给map开辟头空间 return &MsgHandler{ Apis:make(map[uint32]ziface.IRouter), WorkerPoolSize:config.GlobalObject.WorkerPoolSize, TaskQueue:make([]chan ziface.IRequest, config.GlobalObject.WorkerPoolSize),//切片的初始化 } }
    方法

    启动工作池:启动固定数量的Goroutine

    //启动Worker工作池 (在整个server服务中 只启动一次) func (mh *MsgHandler) StartWorkerPool() { fmt.Println("WorkPool is started..") //根据WorkerPoolSize 创建worker goroutine for i := 0; i < int(mh.WorkerPoolSize); i++ { //开启一个workergoroutine //1 给当前Worker所绑定消息channel对象 开辟空间 第0个worker 就用第0个Channel //给channel 进行开辟空间 mh.TaskQueue[i] = make(chan ziface.IRequest, config.GlobalObject.MaxWorkerTaskLen) //2 启动一个Worker,阻塞等待消息从对应的管道中进来 go mh.startOneWorker(i, mh.TaskQueue[i]) } }

    worker和消息队列进行绑定,并处理业务:

    func (mh *MsgHandler) startOneWorker(workerID int, taskQueue chan ziface.IRequest) { fmt.Println(" worker ID = ", workerID , " is starting... ") //不断的从对应的管道 等待数据 for { select { case req := <-taskQueue: mh.DoMsgHandler(req) } } }

    将请求发送给工作池

    func (mh *MsgHandler) SendMsgToTaskQueue(request ziface.IRequest) { //1 将消息 平均分配给worker 确定当前的request到底要给哪个worker来处理 //1个客户端绑定一个worker来处理 workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize //2 直接将 request 发送给对应的worker的taskqueue mh.TaskQueue[workerID] <- request }

    创建多任务的worker工作池,并且启动

    启动工作池:绑定worker和channel

    将之前发送的消息,由Reader调用Handler过程变成 让worker工作池来处理

    //全局计数器,记录消息的个数

    修改全局配置

    工作池最大数量

    消息队列的长度

    修改Connection

    给消息队列发送请求

    if config.GlobalObject.WorkerPoolSize > 0 { c.msgHandler.SendMsgToTaskQueue(req) } else { go c.msgHandler.DoMsgHandler(req) }
    最新回复(0)