

Go可用性(四) 限流3: 漏桶算法
source link: https://lailin.xyz/post/go-training-week6-4-leaky-bucket.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

第 0 期已经结束,想要报名后面课程的同学,我联系极客时间为大家争取到了读者专属优惠,关注公众号
mohuishou
获取专属优惠,比官方优惠更给力哦 前往查看介绍: Go进阶训练营PS: 博客阅读结合视频观看效果更佳哦
在前面两篇文章当中我们学习了令牌桶算法的使用和实现,今天我们就一起来看一看另外一种常见的限流算法,漏桶算法
漏桶算法(Leaky Bucket) 是网络世界中流量整形(Traffic Shaping)或速率限制(Rate Limiting)时经常使用的一种算法,它的主要目的是控制数据注入到网络的速率,平滑网络上的突发流量。漏桶算法提供了一种机制,通过它,突发流量可以被整形以便为网络提供一个稳定的流量。 — 百度百科
漏桶算法其实非常形象,如下图所示可以理解为一个漏水的桶,当有突发流量来临的时候,会先到桶里面,桶下有一个洞,可以以固定的速率向外流水,如果水的从桶中外溢了出来,那么这个请求就会被拒绝掉。具体的表现就会向下图右侧的图表一样,突发流量就被整形成了一个平滑的流量。
漏桶算法的主要作用就是避免出现有的时候流量很高,有的时候又很低,导致系统出现旱的旱死,涝的涝死的这种情况。
Go 中比较常用的漏桶算法的实现就是来自 uber 的 ratelimit,下面我们就会看一下这个库的使用方式和源码
1
2
3
4
5
6
7
8
type Clock
type Limiter
func New(rate int, opts ...Option) Limiter
func NewUnlimited() Limiter
type Option
func Per(per time.Duration) Option
func WithClock(clock Clock) Option
func WithSlack(slack int) Option
Clock
是一个接口,计时器的最小实现,有两个方法,分别是当前的时间和睡眠
1
2
3
4
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
Limiter
也是一个接口,只有一个 Take
方法,执行这个方法的时候如果触发了 rps 限制则会阻塞住
1
2
3
4
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}
NewLimter
和 NewUnlimited
会分别初始化一个无锁的限速器和没有任何限制的限速器
Option
是在初始化的时候的额外参数,这种使用姿势在之前 Go 工程化的文章《Go 工程化(六) 配置管理》当中有讲到,这里我们就不再赘述了
Option
有三个方法
Per
可以修改时间单位,默认是秒所以我们默认限制的是 rps,如果改成分钟那么就是 rpm 了WithClock
可以修改时钟,这个用于在测试的时候可以 mock 掉不使用真实的时间WithSlack
用于修改松弛时间,也就是可以允许的突发流量的大小,默认是Pre / 10
,这个后面会讲到
案例: 10 行代码实现一个基于漏桶算法的 ip 限流中间件
案例我们使用和令牌桶类似的案例
1
2
3
4
5
6
7
8
9
10
11
12
13
func NewLimiter(rps int) gin.HandlerFunc {
limiters := &sync.Map{}
return func(c *gin.Context) {
// 获取限速器
// key 除了 ip 之外也可以是其他的,例如 header,user name 等
key := c.ClientIP()
l, _ := limiters.LoadOrStore(key, ratelimit.New(rps))
now := l.(ratelimit.Limiter).Take()
fmt.Printf("now: %s\n", now)
c.Next()
}
}
使用上也是比较简单的
1
2
3
4
5
6
7
8
9
func main() {
e := gin.Default()
// 新建一个限速器,允许突发 3 个并发
e.Use(NewLimiter(3))
e.GET("ping", func(c *gin.Context) {
c.String(http.StatusOK, "pong")
})
e.Run(":8080")
}
我们用 go-stress-testing
进行压测
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go-stress-testing-linux -c 100 -u http://localhost:8080/ping
─────┬───────┬───────┬───────┬────────┬────────┬────────┬────────┬────────┬────────┬────────
耗时 │ 并发数│ 成功数│ 失败数 │ qps │最长耗时│最短耗时 │平均耗时 │下载字节 │字节每秒 │ 错误码
─────┼───────┼───────┼───────┼────────┼────────┼────────┼────────┼────────┼────────┼────────
1s│ 13│ 13│ 0│ 233.55│ 676.10│ 5.82│ 85.64│ 52│ 51│200:13
2s│ 16│ 16│ 0│ 62.25│ 1675.17│ 5.82│ 321.30│ 64│ 31│200:16
3s│ 19│ 19│ 0│ 31.24│ 2673.94│ 5.82│ 640.20│ 76│ 25│200:19
3s│ 20│ 20│ 0│ 26.37│ 3006.49│ 5.82│ 758.51│ 80│ 26│200:20
************************* 结果 stat ****************************
处理协程数量: 20
请求总数(并发数*请求数 -c * -n): 20 总请求时间: 3.011 秒 successNum: 20 failureNum: 0
************************* 结果 end ****************************
查看结果发现为什么第一秒的时候完成了 13 个请求,不是限制的 3rps 么?不要慌,我们看看它的实现就知道了
这个库有基于互斥锁的实现和基于 CAS 的无锁实现,默认使用的是无锁实现版本,所以我们主要看无锁实现的源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type state struct {
last time.Time
sleepFor time.Duration
}
type atomicLimiter struct {
state unsafe.Pointer
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
// of this rate limiter in case of collocation with other frequently accessed memory.
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration
maxSlack time.Duration
clock Clock
}
atomicLimiter
结构体
state
是一个状态的指针,用于存储上一次的执行的时间,以及需要sleep
的时间padding
是一个无意义的填充数据,为了提高性能,避免 cpu 缓存的 false sharing- 之前在讲 Go 并发编程(二) Go 内存模型 的时候有讲到,为了能够最大限度的利用 CPU 的能力,会做很多丧心病狂的优化,其中一种就是 cpu cache
- cpu cache 一般是以 cache line 为单位的,在 64 位的机器上一般是 64 字节
- 所以如果我们高频并发访问的数据小于 64 字节的时候就可能会和其他数据一起缓存,其他数据如果出现改变就会导致 cpu 认为缓存失效,这就是 false sharing
- 所以在这里为了尽可能提高性能,填充了 56 字节的无意义数据,因为 state 是一个指针占用了 8 个字节,所以
64 - 8 = 56
- 剩下三个字段和
Option
中的三个方法意义对应perRequest
就是单位,默认是秒maxSlack
松弛时间,也就是可以允许的突发流量的大小,默认是Pre / 10
,这个后面会讲到clock
时钟,这个用于在测试的时候可以 mock 掉不使用真实的时间
接下来看看最主要的 Take
方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
func (t *atomicLimiter) Take() time.Time {
var (
// 状态
newState state
// 用于表示原子操作是否成功
taken bool
// 需要 sleep 的时间
interval time.Duration
)
// 如果 CAS 操作不成功就一直尝试
for !taken {
// 获取当前的时间
now := t.clock.Now()
// load 出上一次调用的时间
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// 如果 last 是零值的话,表示之前就没用过,直接保存返回即可
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor 是需要睡眠的时间,由于引入了松弛时间,所以 sleepFor 可能是一个
// maxSlack ~ 0 之间的一个值,所以这里需要将现在的需要 sleep 的时间和上一次
// sleepFor 的值相加
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// 如果距离上一次调用已经很久了,sleepFor 可能会是一个很小的值
// 最小值只能是 maxSlack 的大小
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
// 如果 sleepFor 大于 0 的话,计算出需要 sleep 的时间
// 然后将 state.sleepFor 置零
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
// 保存状态
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
// sleep interval
t.clock.Sleep(interval)
return newState.last
}
今天学习了漏桶的实现原理以及使用方式,漏桶和令牌桶的最大的区别就是,令牌桶是支持突发流量的,但是漏桶是不支持的。但是 uber 的这个库通过引入弹性时间的方式也让漏桶算法有了类似令牌桶能够应对部分突发流量的能力,并且实现上还非常的简单,值得学习。
多看看好的轮子的实现总会学到一些新姿势,今天就学到了使用 padding 填充来避免 false sharing 提高性能的操作
关注我获取更新
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK