这篇文章主要讲解fasthttp 这个http库关于协程池做的优化原理;
fasthttp 是一个非常优秀的web server框架,github上的benchmark号称比官方的net/http快10倍以上。fasthttp用了很多黑魔法。我们今天通过源码来看一看它的goroutine pool的实现。
这里还是以一个例子开头来说明 fasthttp 的使用方法:
package main import ( "github.com/valyala/fasthttp" "fmt" ) // request handler in fasthttp style, i.e. just plain function. func fastHTTPHandler(ctx *fasthttp.RequestCtx) { fmt.Fprintf(ctx, "Hi there! RequestURI is %q", ctx.RequestURI()) } func main() { // pass plain function to fasthttp fasthttp.ListenAndServe(":8081", fastHTTPHandler) }可以看到 fasthttp 使用起来也是非常的方便。这里需要有一点注意, fasthttp 和 net/http 的 Handler 接口不一样,没有使用request 和 response style的Handler,而是使用了 context 这样一个角色,当然,context 里面肯定是包含request 和 response的。
下面我们从源码出发;理解其协程池的使用原理。
入口函数是 ListenAndServe() 下面我们从该函数出发分析整个调用链路(只保留最核心主链路代码):
func ListenAndServe(addr string, handler RequestHandler) error { s := &Server{ Handler: handler, } return s.ListenAndServe(addr) } func (s *Server) ListenAndServe(addr string) error { ln, err := net.Listen("tcp4", addr) if err != nil { return err } return s.Serve(ln) } func (s *Server) Serve(ln net.Listener) error { ...... wp := &workerPool{ WorkerFunc: s.serveConn, MaxWorkersCount: maxWorkersCount, Logger: s.logger(), connState: s.setState, } wp.Start() for { if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil { wp.Stop() if err == io.EOF { return nil } return err } if !wp.Serve(c) { s.writeFastError(c, StatusServiceUnavailable, "The connection cannot be served because Server.Concurrency limit exceeded") c.Close() s.setState(c, StateClosed) } c = nil } }fasthttp 首先监听基于TCP的网络端口,然后创建一个 workerPool 也就是一个协程池,协程池的代码定义与实现在 workerpool.go 中。
之后主线程就处于死循环中,调用 acceptConn() 函数接收TCP连接的请求,如果没有请求到来就阻塞。然后调用 Serve() 函数处理连接。这里有一点类似于 Reactor 的线程模型。
我们先看看 workerPool 的定义(只保留核心数据域):
type workerPool struct { // Function for serving server connections. // It must leave c unclosed. WorkerFunc ServeHandler MaxWorkersCount int MaxIdleWorkerDuration time.Duration Logger Logger lock sync.Mutex workersCount int mustStop bool ready []*workerChan stopCh chan struct{} workerChanPool sync.Pool connState func(net.Conn, ConnState) } type workerChan struct { lastUseTime time.Time ch chan net.Conn } 成员 WorkerFunc 是每个TCP Conn 的处理函数,类似net/http包中的ServeHTTP,因为在fasthttp中所有conn的处理函数都是一样的,所以WorkerFunc不需要和传入的每个conn绑定,整个worker pool共用一个。workerChanPool是sync.Pool对象池。MaxIdleWorkerDuration是worker空闲的最长时间,超过就将worker关闭。workersCount是worker的数量。ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。wp.Start()启动worker pool。wp.Stop()是出错处理。wp.Serve©是对conn进行处理的函数。我们先看一下wp.Start() 和 wp.Stop()。
func (wp *workerPool) Start() { if wp.stopCh != nil { panic("BUG: workerPool already started") } wp.stopCh = make(chan struct{}) stopCh := wp.stopCh go func() { var scratch []*workerChan for { wp.clean(&scratch) select { case <-stopCh: return default: time.Sleep(wp.getMaxIdleWorkerDuration()) } } }() } func (wp *workerPool) Stop() { if wp.stopCh == nil { panic("BUG: workerPool wasn't started") } close(wp.stopCh) wp.stopCh = nil wp.lock.Lock() ready := wp.ready for i, ch := range ready { ch.ch <- nil ready[i] = nil } wp.ready = ready[:0] wp.mustStop = true wp.lock.Unlock() }简单来说,workerPool 启动时候开了一个 goroutine 来定期清理worker pool中过期worker(过期=未使用时间超过MaxIdleWorkerDuration)。清理操作都在wp.clean()函数中完成,这里就不继续往下看了。
wp.Stop() 负责停止worker pool的处理工作,包括关闭stopCh,清理闲置的worker列表(这时候还有一部分worker在处理conn,待其处理完成通过判断wp.mustStop来停止)。这里需要注意的一点是做资源清理的时候,对于channel需要置nil。
下面看看最重要的函数 wp.Serve() 的调用链路。 wp.Serve() 负责处理主线程接收到的每一个 TCP 连接。
先看源码:
func (wp *workerPool) Serve(c net.Conn) bool { ch := wp.getCh() if ch == nil { return false } ch.ch <- c return true } func (wp *workerPool) getCh() *workerChan { var ch *workerChan createWorker := false wp.lock.Lock() ready := wp.ready n := len(ready) - 1 if n < 0 { if wp.workersCount < wp.MaxWorkersCount { createWorker = true wp.workersCount++ } } else { ch = ready[n] ready[n] = nil wp.ready = ready[:n] } wp.lock.Unlock() if ch == nil { if !createWorker { return nil } vch := wp.workerChanPool.Get() if vch == nil { vch = &workerChan{ ch: make(chan net.Conn, workerChanCap), } } ch = vch.(*workerChan) go func() { wp.workerFunc(ch) wp.workerChanPool.Put(vch) }() } return ch }Serve() 最主要也最核心的就是调用 getCh() 它从worker pool的可用空闲worker列表尾部取出一个可用的worker。然后 Serve() 将待处理的连接 conn 存入该可用worker的channel。
getCh() 首先从worker pool 的worker队列的队尾获取一个可用的worker,这里有几点需要注意:
先从worker pool的队尾获取可用worker如果没有可用的worker,就新建一个worker(比如处理第一个conn是,worker pool还是空的)如果worker达到上限,则直接不处理这个连接(这个地方感觉处理不是很好,应该加入一定的策略,或者加一个Hook)这里重点需要关注一下新建worker的过程:
首先从workerChanPool 里面获取一个 workerChan使用 go 关键字新建一个协程来处理这个workerChan我们来看看新建的协程是怎么处理workerChan的:直接调用 wp.workerFunc(ch) 来处理,我们跟踪进去调用链路:
func (wp *workerPool) workerFunc(ch *workerChan) { var c net.Conn var err error for c = range ch.ch { if c == nil { break } if err = wp.WorkerFunc(c); err != nil && err != errHijacked { ........ } if err == errHijacked { wp.connState(c, StateHijacked) } else { c.Close() wp.connState(c, StateClosed) } c = nil if !wp.release(ch) { break } } wp.lock.Lock() wp.workersCount-- wp.lock.Unlock() }可以看到,当我们新建一个worker协程的时候,该协程就会进入一个死循环中。 这个死循环的逻辑非常熟悉,就是一个典型的协程池的实现逻辑。
从channel阻塞获取待处理的任务;调用任务里面封装的函数;因为前面的wp.Serve()函数只处理一个conn,所以for循环执行一次我们就可以把worker放到空闲队列中去等待下一次conn过来。release(ch 函数就是将workChan放回空闲队列的末尾(可算和上面呼应上了)。还有上面提到的mustStop,如果worker pool停止了,mustStop就为true,那么workerFunc就要跳出循环,也就是goroutine结束了。
从上面图中我们可以分析出:
fasthttp 采用协程池避免创建goroutine和go runtime schedule goroutine的性能成本,提高性能才用goroutine + channel 的形式解决多线程并发数据竞争问题复用TCP连接,直至超时断开,提升性能