goroutine pool的简单实现

    xiaoxiao2023-11-25  172

    goroutine pool的简单实现

    golang web server原理粗糙版本的goroutine pool升级版 goroutine pool

    这篇文章从go提供的web server的出发,理解go web server的实现方式,提出goroutine pool的重要性,然后自己实现一个简单版本的goroutine pool

    golang web server原理

    我们先来看一个非常简单的实例:

    package main import ( "fmt" "html" "io" "log" "net/http" "strings" "time" ) func main() { s := &http.Server{ Addr: ":8080", Handler: nil, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, MaxHeaderBytes: 1 << 20, } helloHandler := func(w http.ResponseWriter, req *http.Request) { req.ParseForm() fmt.Println(req.Form) fmt.Println("path", req.URL.Path) fmt.Println("scheme", req.URL.Scheme) fmt.Println(req.Form["url_long"]) for k, v := range req.Form { fmt.Println("key:", k) fmt.Println("val:", strings.Join(v, "")) } io.WriteString(w, "Hello, world!\n") } http.HandleFunc("/hello", helloHandler) http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Hello, %q", html.EscapeString(r.URL.Path)) }) log.Fatal(s.ListenAndServe()) }

    这是基于go原生的web server的一个非常简单server模块,可以看到实现一个server非常的简单。

    源码之下,了无秘密。我们从 server.ListenAndServe() 出发查看实现原理。下面的源码都会删除不影响主流程的部分。

    func (srv *Server) ListenAndServe() error { addr := srv.Addr if addr == "" { addr = ":http" } ln, err := net.Listen("tcp", addr) if err != nil { return err } return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)}) } func (srv *Server) Serve(l net.Listener) error { l = &onceCloseListener{Listener: l} defer l.Close() // how long to sleep on accept failure baseCtx := context.Background() // base is always background, per Issue 16220 ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { rw, e := l.Accept() c := srv.newConn(rw) c.setState(c.rwc, StateNew) // before Serve can return go c.serve(ctx) } }

    主要流程如下:

    首先 server 基于 TCP 协议监听指定的端口在 server 的主线程中循环(在 go 中也是主的 goroutine ) 调用 rw, e := l.Accept() ,从网络端口中取出 TCP 连接;针对每一个 connection 都创建一个goroutine 来处理

    整体的处理流程可用下图表示:

    这里对于 TCP 连接的处理可以说是比较暴力的,来一个连接就起一个goroutine, 我们都知道go 的runtime 肩负起调度 goroutine 运行的职责,虽然goroutine调度整体的性能损失比原生线程要低,但是在高并发下不可避免存在性能损耗。

    我们知道在系统负载很高的时候肯定 fasthttp 这个网络框架性能要比原生的 net/http 性能要好,其中一个原因就是因为使用了goroutine pool。

    那么问题来了,如果要我们自己去实现一个goroutine pool,该怎么去实现呢?我们先来实现一个最简单的。

    粗糙版本的goroutine pool

    golang中的协程 goroutine 是通过 go 的 runtime 来调度的,所以goroutine资源没法像临时资源一样放回去再取出来。

    所以goroutine 应该是一直运行的,需要的时候就运行,不需要的时候就阻塞,这样对其他的goroutine的调度影响也不是很大。而goroutine的任务传递可以通过 channel 来传递。

    一个粗糙版本的goroutine pool实现如下:

    func go_pool() { //start := time.Now() wg := new(sync.WaitGroup) data := make(chan int, 100) for i := 0; i < 10; i++ { wg.Add(1) go func(n int) { defer wg.Done() for _ = range data { //for i:=0; i<100000; i++ { // //} } }(i) } for i := 0; i < 10000; i++ { data <- i } close(data) wg.Wait() //end := time.Now() //fmt.Println(end.Sub(start)) }

    这个版本的实现逻辑还比较简单,总的来说:我们起了 N 个协程,每个协程循环从 channel 获取任务消息,如果获取到消息就处理,否则协程就被阻塞。

    这里也给出一个不用协程池的实现:

    func no_go_pool() { //start := time.Now() wg := new(sync.WaitGroup) for i := 0; i < 10000; i++ { wg.Add(1) go func(n int) { defer wg.Done() //for i:=0; i<100000; i++ { // //} }(i) } wg.Wait() //end := time.Now() //fmt.Println(end.Sub(start)) }

    这种非协程池做法就是来一个任务开一个协程,除了性能损耗还有内存损耗,毕竟协程也是占用内存资源的。

    这里给出了一个benchmark的测试数据(自己mac测试)

    func BenchmarkGopool(b *testing.B) { for i := 0; i < b.N; i++ { go_pool() } } func BenchmarkNopool(b *testing.B) { for i := 0; i < b.N; i++ { no_go_pool() } } ----------------------------------- result ----------------------------------------------------------- BenchmarkGopool-12 1000 1441389 ns/op BenchmarkNopool-12 500 3024904 ns/op=

    升级版 goroutine pool

    对于一个好的线程池来说,能自定义goroutine运行的函数十分重要。函数无非就是函数地址和函数参数。如果要传入的函数形式不一样(形参或者返回值不一样)怎么办?一个比较简单的方法是引入反射。但是引入反射又会引入性能问题。所以这里使用闭包:

    type Worker struct { Func func() } func go_pool_with_func() { var wg sync.WaitGroup channels := make(chan Worker, 10) for i:=0; i<5; i++ { wg.Add(1) go func() { defer wg.Done() for ch := range channels { ch.Func() } }() } for i := 0; i < 10000; i++ { j := i wk := Worker{ Func: func() { j++ }, } channels <- wk } // close channel, and will wait for all msg finished close(channels) wg.Wait() }
    最新回复(0)