使用golang实现令牌桶限流和时间窗口控制

    xiaoxiao2022-07-12  146

    这篇文章不是讲令牌桶算法原理,关于原理,请参考 https://blog.csdn.net/lzw_2006/article/details/51768935 

    我这里只是使用golang语言来实现令牌桶算法,以及时间窗口限流。

    #### 针对接口进行并发控制

    如果担心接口某个时刻并发量过大了,可以细粒度地限制每个接口的 总并发/请求数

    以下代码golang实现 ```go package main

    import (     "fmt"     "net"     "os"     "sync/atomic"     "time" )

    var (    limiting int32 = 1 // 这就是我的令牌桶 )

    func main() {     tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:9090") //获取一个tcpAddr     checkError(err)     listener, err := net.ListenTCP("tcp", tcpAddr) //监听一个端口     checkError(err)     defer listener.Close()     for {         conn, err := listener.Accept() // 在此处阻塞,每次来一个请求才往下运行handle函数         if err != nil {             fmt.Println(err)             continue         }         go handle(&conn) // 起一个单独的协程处理,有多少个请求,就起多少个协程,协程之间共享同一个全局变量limiting,对其进行原子操作。     } }

    func handle(conn *net.Conn) {     defer (*conn).Close()     n := atomic.AddInt32(&limiting, -1) // dcr 1 by atomic,获取一个令牌,总数减1。这是一个原子性的操作,并发情况下,数据不会写错。     if n < 0 {         // 令牌不够用了,限流,抛弃此次请求。         (*conn).Write([]byte("HTTP/1.1 404 NOT FOUND\r\n\r\nError, too many request, please try again."))     } else {         // 还有剩余令牌可用         time.Sleep(1 * time.Second) // 假设我们的应用处理业务用了1s的时间         (*conn).Write([]byte("HTTP/1.1 200 OK\r\n\r\nI can change the world!")) // 业务处理结束后,回复200成功。     }     atomic.AddInt32(&limiting, 1) // add 1 by atomic,业务处理完毕,放回令牌 }

    // 异常报错的处理 func checkError(err error) {     if err != nil {         fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())         os.Exit(1)     } } ``` limiting这个变量就是我用来限流的,把它看做令牌桶的池子吧。初始池中只有1个令牌,每一条处理请求,sleep了1秒。看看并发的效果。在一个终端中启动 ``` go run example1.go ``` 另外起一个终端,用golang的boom来做压测。要提前安装boom工具 ``` go get github.com/rakyll/hey go install github.com/rakyll/hey ``` 然后压测 ```sh $ hey -c 10 -n 50 http://localhost:9090 Summary:   Total:    5.0246 secs   Slowest:    1.0066 secs   Fastest:    0.0008 secs   Average:    0.1023 secs   Requests/sec:    9.9510

    Response time histogram:   0.001 [1]    |■   0.101 [44]    |■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■■   0.202 [0]    |   0.303 [0]    |   0.403 [0]    |   0.504 [0]    |   0.604 [0]    |   0.705 [0]    |   0.805 [0]    |   0.906 [0]    |   1.007 [5]    |■■■■■

    Latency distribution:   10% in 0.0011 secs   25% in 0.0013 secs   50% in 0.0014 secs   75% in 0.0044 secs   90% in 1.0021 secs   95% in 1.0061 secs   0% in 0.0000 secs

    Details (average, fastest, slowest):   DNS+dialup:    0.0016 secs, 0.0008 secs, 1.0066 secs   DNS-lookup:    0.0010 secs, 0.0003 secs, 0.0022 secs   req write:    0.0002 secs, 0.0000 secs, 0.0008 secs   resp wait:    0.1022 secs, 0.0000 secs, 1.0050 secs   resp read:    0.0001 secs, 0.0000 secs, 0.0002 secs

    Status code distribution:   [200]    5 responses   [404]    45 responses ``` hey命令-c表示并发数,我设为10,-n表示总共发送多少条,我发50条。

    结果是只有5条返回http成功的状态码200,其他45条都失败了。这说明有得线程能竞争资源成功,有的线程竞争资源失败,这里只有5个竞争成功的。总共用时也就5.0246秒,平均速率1r/s。这种结果这和代码中令牌池只有1个令牌,而每个请求要花1s的时间的要求相吻合。说明我们现在将请求限流在1r/s,超过这个速度涌进来的请求都会被抛弃404。

    注意:这里使用的是golang的协程,和线程还是有区别的,不过在这里不影响我们做测试,只要把它理解为并发就行了,协程的原理可以去搜下看看。

    修改一下结果,把limiting改成10,再测试 ``` ...... Status code distribution:   [200] 50 responses ``` 这回是恰到好处啊,刚好满足10r/s的QPS,所有的请求都成功了。

    当然,这种并发控制方式简单粗暴,没有平滑处理,慎用。

    #### 针对时间窗口进行并发控制

    如果某个基础服务调用量很大,我们害怕它被突然的大流量打挂,所以需要限制一个窗口期内接口的请求量。下面是一种实现窗口时间并发控制的方法

    我们使用缓存来存储计数器,秒数作为Key,Value代表这一秒有多少个请求。这样就限制了一秒内的并发数,过期时间设置长一些,比如两秒,保证一秒内的数据是存在的。 ```go package main

    import (     "fmt"     "net"     "os"     "time"     cache "github.com/UncleBig/goCache" )

    var (     limit int = 10     c *cache.Cache )

    func main() {     c = cache.New(10*time.Minute, 30*time.Second)     tcpAddr, err := net.ResolveTCPAddr("tcp4", "0.0.0.0:9090") //获取一个tcpAddr     checkError(err)     listener, err := net.ListenTCP("tcp", tcpAddr) //监听一个端口     checkError(err)     defer listener.Close()     for {         conn, err := listener.Accept()         if err != nil {             fmt.Println(err)             continue         }         go handle(&conn)     } }

    func handle(conn *net.Conn) {     defer (*conn).Close()     t := time.Now().Unix()     key := fmt.Sprintf("%d", t)     if n, found := c.Get(key); found {         num := n.(int)         fmt.Printf("key:%d num:%d\n", t, num)         if num >= limit {             (*conn).Write([]byte("HTTP/1.1 404 NOT FOUND\r\n\r\nError, too many request, please try again."))         } else {             (*conn).Write([]byte("HTTP/1.1 200 OK\r\n\r\nI can change the world!"))             c.Increment(key, 1)         }     } else {         (*conn).Write([]byte("HTTP/1.1 200 OK\r\n\r\nI can change the world!"))         c.Set(key, 1, 2 * time.Second)     } }

    func checkError(err error) {     if err != nil {         fmt.Fprintf(os.Stderr, "Fatal error: %s", err.Error())         os.Exit(1)     } } ``` 这段代码用了缓存,所以要先下载库 ``` go get -u github.com/UncleBig/goCache ``` 同样的方式启动测试,先来个小测试,服务端打印日志 ```sh [root@VM_195_216_centos ~]# go run example2.go key:1510229724 num:1 success key:1510229724 num:2 success key:1510229724 num:3 success key:1510229724 num:4 success key:1510229724 num:5 success key:1510229724 num:6 success key:1510229724 num:7 success key:1510229724 num:8 success key:1510229724 num:9 success key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed key:1510229724 num:10 failed ``` 再看看我们测试用的命令 ``` $ hey -c 10 -n 30 http://localhost:9090 ...... Status code distribution:   [200] 10 responses   [404] 20 responses ``` 结果是10条成功20条失败。看服务端 的日志发现,所有的日志都是打印的同一秒(1510229724)内的请求。当累计处理完10条限流要求的请求之后(num从1打印到10),再往后在这一秒内的请求都直接返回失败了,在这一秒内的限流取得了成功。

    接下来再看看,大量持续请求的情况下,限流效果。 ```sh [root@VM_195_216_centos ~]# go run example2.go  key:1510229933 num:1 success key:1510229933 num:2 success key:1510229933 num:3 success key:1510229933 num:4 success key:1510229933 num:5 success key:1510229933 num:6 success key:1510229933 num:7 success key:1510229933 num:8 success key:1510229933 num:9 success key:1510229933 num:10 failed key:1510229933 num:10 failed ...... key:1510229933 num:10 failed key:1510229933 num:10 failed key:1510229934 num:1 success key:1510229934 num:2 success key:1510229934 num:3 success key:1510229934 num:4 success key:1510229934 num:5 success key:1510229934 num:6 success key:1510229934 num:7 success key:1510229934 num:8 success key:1510229934 num:9 success key:1510229934 num:10 failed key:1510229934 num:10 failed ...... key:1510229934 num:10 failed key:1510229934 num:10 failed key:1510229935 num:1 success key:1510229935 num:2 success key:1510229935 num:3 success key:1510229935 num:4 success key:1510229935 num:5 success key:1510229935 num:6 success key:1510229935 num:7 success key:1510229935 num:8 success key:1510229935 num:9 success key:1510229935 num:10 failed key:1510229935 num:10 failed ...... key:1510229935 num:10 failed key:1510229935 num:10 failed key:1510229936 num:1 success key:1510229936 num:2 success key:1510229936 num:3 success key:1510229936 num:4 success key:1510229936 num:5 success key:1510229936 num:6 success key:1510229936 num:7 success key:1510229936 num:8 success key:1510229936 num:9 success key:1510229936 num:10 failed key:1510229936 num:10 failed ...... ``` 测试命令 ```sh $ hey -c 10 -n 10000 http://localhost:9090 Summary:   Total:        2.9792 secs ...... Status code distribution:   [200] 40 responses   [404] 9937 responses ``` 这次总共花了近3秒时间,发了1w条请求,由于日志打印太多了,截取部分有代表性的。可以看到经历了3秒,每1秒内都只成功10条,接下来到下一秒之前的请求都是失败的。3秒总共成功了40条,按理说应该30条,可能边界值那几毫秒控制的不是很精准,这个误差可以容忍,还是能达到限流的理想效果。

    **** > 创建于 2018-09-08 北京,更新于 2019-05-23 北京

    >该文章在以下平台同步 >- HICOOL.TOP: http://www.hicool.top/article/5ce65138215b583cdd068b48 >- :  >- 简书: https://www.jianshu.com/p/9032c6f41f1e

    最新回复(0)