42

Golang限流器time/rate实现剖析

 4 years ago
source link: http://www.cyhone.com/articles/analisys-of-golang-rate/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

限流器是微服务中必不缺少的一环,可以起到保护下游服务,防止服务过载等作用。上一篇文章 《Golang限流器time/rate使用介绍》 简单介绍了time/rate的使用方法,本文则着重分析下其实现原理。建议在正式阅读本文之前,先阅读下上一篇文章。

上一篇文章讲到,time/rate是基于Token Bucket(令牌桶)算法实现的限流。本文将会基于源码,深入剖析下Golang是如何实现Token Bucket的。其代码也非常简洁,去除注释后,也就200行左右的代码量。

同时,我也提供了 time/rate注释版 ,辅助大家理解该组件的实现。

背景

简单来说,令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放Token,桶满则暂时不放。

而用户则从桶中取Token,如果有剩余Token就可以一直取。如果没有剩余Token,则需要等到系统中被放置了Token才行。

一般介绍Token Bucket的时候,都会有一张这样的原理图:

3AJnma6.jpg!web

从这个图中看起来,似乎令牌桶实现应该是这样的:

有一个Timer和一个BlockingQueue。Timer固定的往BlockingQueue中放token。用户则从BlockingQueue中取数据。

这固然是Token Bucket的一种实现方式,这么做也非常直观,但是效率太低了:我们需要不仅多维护一个Timer和BlockingQueue,而且还耗费了一些不必要的内存。

在Golang的 timer/rate 中的实现, 并没有单独维护一个Timer,而是采用了lazyload的方式,直到每次消费之前才根据时间差更新Token数目,而且也不是用BlockingQueue来存放Token,而是仅仅通过计数的方式。

Token的生成和消费

我们在上一篇文章中讲到,Token的消费方式有三种。但其实在内部实现,最终三种消费方式都调用了reserveN函数来生成和消费Token。

我们看下reserveN函数的具体实现,整个过程非常简单:

  1. 计算从当前时间到上次取Token的时刻,期间一共新产生了多少Token。

    因为我们知道Token Bucket每秒产生的Token数,只要把上次取Token的时间记录下来,求出时间差值,就可以很容易的将时间差转化为Token数。

    time/rate 中对应的实现函数为 tokensFromDuration ,后面会详细讲下。从时间转化为Token数目,大致公式如下。

    $$NewTokensNum = Rate*PastSeconds$$

    同时,当前Token数目 = 新产生的Token数目 + 之前剩余的Token数目 - 要消费的Token数目。

  2. 如果消费后剩余Token数目大于零,说明此时Token桶内仍不为空,此时无需等待。如果Token数目小于零,则需等待一段时间。

    那么这个时候,我们需要将负值的Token数转化为需要等待的时间。 time/rate 中对应的实现函数为 durationFromTokens

  3. 将需要等待的时间等相关结果返回给调用方。

从上面可以看出,其实整个过程就是利用了 Token数可以和时间相互转化 的原理。

而如果Token数为负,则需要等待相关时间即可。

注意:如果当消费时,Token桶中的Token数目已经为负值了,依然可以按照上述流程进行消费。随着负值越来越小,等待的时间将会越来越长。

从结果来看,这个行为跟用Timer+BlockQueue实现是一样的。

此外,整个过程为了保证线程安全,更新令牌桶相关数据时都用了mutex加锁。

对于Allow函数实现时,只要判断需要等待的时间是否为0即可,如果大于0说明需要等待,则返回False,反之返回True。

对于Wait函数,直接 t := time.NewTimer(delay) ,等待对应的时间即可。

float精度问题

从上面原理讲述可以看出,在Token和时间的相互转化函数 durationFromTokenstokensFromDuration 中,涉及到float64的乘除运算。

一谈到float的乘除,我们就需要小心精度问题了。

而Golang在这里也踩了坑,以下是 tokensFromDuration 最初的实现版本

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	return d.Seconds() * float64(limit)
}

这个操作看起来一点问题都没:每秒生成的Token数乘于秒数。

然而,这里的问题在于, d.Seconds() 已经是小数了。两个小数相乘,会带来精度的损失。

所以就有了这个issue: golang.org/issues/34861

修改后新的版本如下:

func (limit Limit) tokensFromDuration(d time.Duration) float64 {
	sec := float64(d/time.Second) * float64(limit)
	nsec := float64(d%time.Second) * float64(limit)
	return sec + nsec/1e9
}

time.Durationint64 的别名,代表纳秒。分别求出秒的整数部分和小数部分,进行相乘后再相加,这样可以得到最精确的精度。

数值溢出问题

我们讲reserveN函数的具体实现时,第一步就是计算从当前时间到上次取Token的时刻,期间一共新产生了多少Token,同时也可得出当前的Token是多少。

我最开始的理解是,直接可以这么做:

// elapsed表示过去的时间差
elapsed := now.Sub(lim.last)
// delta表示这段时间一共新产生了多少Token
delta = tokensFromDuration(now.Sub(lim.last))

tokens := lim.tokens + delta
if(token > lim.burst){
	token = lim.burst
}

其中, lim.tokens 是当前剩余的Token, lim.last 是上次取token的时刻。 lim.burst 是Token桶的大小。

使用tokensFromDuration计算出新生成了多少Token,累加起来后,不能超过桶的容量即可。

这么做看起来也没什么问题,然而并不是这样。

time/rate 里面是这么做的,如下代码所示:

maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
elapsed := now.Sub(last)
if elapsed > maxElapsed {
	elapsed = maxElapsed
}

delta := lim.limit.tokensFromDuration(elapsed)

tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
	tokens = burst
}

与我们最开始的代码不一样的是,它没有直接用 now.Sub(lim.last) 来转化为对应的Token数,而是

先用 lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) ,计算把桶填满的时间maxElapsed。

取elapsed和maxElapsed的最小值。

这么做算出的结果肯定是正确的,但是这么做相比于我们的做法,好处在哪里?

对于我们的代码,当last非常小的时候(或者当其为初始值0的时候),此时 now.Sub(lim.last) 的值就会非常大,如果 lim.limit 即每秒生成的Token数目也非常大时,直接将二者进行乘法运算, 结果有可能会溢出。

因此, time/rate 先计算了把桶填满的时间,将其作为时间差值的上限,这样就规避了溢出的问题。

Token的归还

而对于Reserve函数,返回的结果中,我们可以通过 Reservation.Delay() 函数,得到需要等待时间。

同时调用方可以根据返回条件和现有情况,可以调用 Reservation.Cancel() 函数,取消此次消费。

当调用 Cancel() 函数时,消费的Token数将会尽可能归还给Token桶。

此外,我们在上一篇文章中讲到,Wait函数可以通过Context进行取消或者超时等,

当通过Context进行取消或超时时,此时消费的Token数也会归还给Token桶。

然而,归还Token的时候,并不是简单的将Token数直接累加到现有Token桶的数目上,这里还有一些注意点:

restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct))
if restoreTokens <= 0 {
	return
}

以上代码就是计算需要归还多少的Token。其中:

r.tokens
r.timeToAct
r.lim.lastEvent

其中: r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) 指的是,从该次消费到当前时间,一共又新消费了多少Token数目。

根据代码来看,要归还的Token要是该次消费的Token减去新消费的Token。

不过这里我还没有想明白,为什么归还的时候,要减去新消费数目。

按照我的理解,直接归还全部Token数目,这样对于下一次消费是无感知影响的。这块的具体原因还需要进一步探索。

总结

Token Bucket其实非常适合互联网突发式请求的场景,其请求Token时并不是严格的限制为固定的速率,而是中间有一个桶作为缓冲。

只要桶中还有Token,请求就还可以一直进行。当突发量激增到一定程度,则才会按照预定速率进行消费。

此外在维基百科中,也提到了分层Token Bucket(HTB)作为传统Token Bucket的进一步优化,Linux内核中也用它进行流量控制。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK