原来一个链接对应一个Reader和一个Writer,一个消息对应Handler
原来客户端发一个Message,服务器会开辟一个Handler Goroutine去处理业务。很多Message会有很多Handler Goroutine
为了提高性能,要降低Gorouine数量
手段:让处理业务的 Handler Goroutine和消息无关
启动一定数量的worker处理业务的go程,每个worker绑定一个任务的队列
能够降低go的数量
内存节省
go调度器切换go的成本降低,更好利用cpu
比如1万个Reader和1万个Writer都是阻塞,并不会占用太多的cpu资源
创建消息队列
消息队列
工作池的数量
type MsgHandler struct { //存放路由集合的map Apis map[uint32] ziface.IRouter //就是开发者全部的业务,消息ID和业务的对应关系 //负责Worker取任务的消息队列 一个worker对应一个任务队列 TaskQueue []chan ziface.IRequest //worker工作池的worker数量 WorkerPoolSize uint32 }启动工作池:启动固定数量的Goroutine
//启动Worker工作池 (在整个server服务中 只启动一次) func (mh *MsgHandler) StartWorkerPool() { fmt.Println("WorkPool is started..") //根据WorkerPoolSize 创建worker goroutine for i := 0; i < int(mh.WorkerPoolSize); i++ { //开启一个workergoroutine //1 给当前Worker所绑定消息channel对象 开辟空间 第0个worker 就用第0个Channel //给channel 进行开辟空间 mh.TaskQueue[i] = make(chan ziface.IRequest, config.GlobalObject.MaxWorkerTaskLen) //2 启动一个Worker,阻塞等待消息从对应的管道中进来 go mh.startOneWorker(i, mh.TaskQueue[i]) } }worker和消息队列进行绑定,并处理业务:
func (mh *MsgHandler) startOneWorker(workerID int, taskQueue chan ziface.IRequest) { fmt.Println(" worker ID = ", workerID , " is starting... ") //不断的从对应的管道 等待数据 for { select { case req := <-taskQueue: mh.DoMsgHandler(req) } } }将请求发送给工作池
func (mh *MsgHandler) SendMsgToTaskQueue(request ziface.IRequest) { //1 将消息 平均分配给worker 确定当前的request到底要给哪个worker来处理 //1个客户端绑定一个worker来处理 workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize //2 直接将 request 发送给对应的worker的taskqueue mh.TaskQueue[workerID] <- request }创建多任务的worker工作池,并且启动
启动工作池:绑定worker和channel
将之前发送的消息,由Reader调用Handler过程变成 让worker工作池来处理
//全局计数器,记录消息的个数
工作池最大数量
消息队列的长度
给消息队列发送请求
if config.GlobalObject.WorkerPoolSize > 0 { c.msgHandler.SendMsgToTaskQueue(req) } else { go c.msgHandler.DoMsgHandler(req) }