9

流量管制-令牌桶与漏桶 - Cylon

 1 year ago
source link: https://www.cnblogs.com/Cylon/p/16379709.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Principle of token bucket#

随着互联网的发展,在处理流量的方法也不仅仅为 first-come,first-served,而在共享网络中实现流量管理的基本机制就是排队。而公平算法则是实现在优先级队列中基于哪些策略来排队的”公平队列“Token Bucket 则是为公平排队提供了替代方案。Fair Queue 与 Token Bucket的区别主要在,对于Fair Queue来讲,如果请求者目前空闲,Queue会将该请求者的带宽分配给其他请求者;而 Token Bucket 则是分配给请求者的带宽是带宽的上限。

通过例子了解算法原理

假设出站带宽是 4个数据包/ms,此时有一个需求为,为一个特定的发送端 A 来分配 1个数据包/ms的带宽。此时可以使用公平排队的方法分给发送 A 25%的带宽。

此时存在的问题是我们希望可以灵活地允许 A 的数据包以无规则的时间间隔发送。例如假设 A 在每个数据包发送后等待1毫秒后再开始下一个数据包的发送。

  • sence1:此时假设 A 以 1ms 的间隔去发送数据包,而由于某种原因导致应该在 t=6 到达的数据包却在 t=6.5 到达。随后的数据包在 t=7 准时到达,在这种情况下是否应该保留到t=7.5?
  • sence2:或者是否允许在 t=6.5 发送一个迟到的数据包,在 t=7 发送下一个数据包,此时理论上平均速率仍然还是 1 个数据包/ms?

显然sence2是合理的,这个场景的解决方法就是令牌桶算法,规定 A 的配额,允许指定平均速率和突发容量。当数据包不符合令牌桶规范,那么就认为其不合理,此时会做出一下相应:

  • delay,直到桶准备好
  • mark,标记为不合规的数据包

delay 被称为 整形 shaping , shaping 是指在某个时间间隔内发送超过 Bc(Committed Burst)的大小,Bc 在这里指桶的尺寸。由于数据流量是突发性的,当在一段时间内不活动后,再次激活后的在一个间隔内发送的数量大于 Bc ,那么额外的流量被称为Be (burst excess)。

将流量丢弃或标记超额流量,保持在一个流量速率限制称为 管制 policing

Definition#

令牌桶的定义是指,有一个桶,以稳定的速度填充令牌;桶中的任何一个溢出都会被丢弃。当要发送一个数据包,需要能够从桶中取出一个令牌;如果桶是空的那么此时数据包是不合规的数据包,必须进行 delay , drop , mark 操作。如果桶是满的,则会发送与桶容量相对应的突发(短时间内的高带宽传输),这是桶是空的。

令牌桶的规范:TB(r,Bmax)TB(r,Bmax)

  • rr :r个token每秒的令牌填充率,表示桶填充令牌的速率
  • BB :桶容量,Bmac>0Bmac>0

那么公式则表示,桶以指定的速率填充令牌,最大为 BmaxBmax 。这就说明了为了使大小为 S 的数据包合规,桶内必须至少有 S 个令牌,即 B≥SB≥S,否则数据包不合规,在发送时,桶为 B=B−SB=B−S

image

Examples#

场景1:假设令牌桶规范为 TB(13 packet/ms,4 packet)TB(13packet/ms,4packet),桶最初是满的,数据包在以下时间到达 [0, 0, 0, 2, 3, 6, 9, 12]

在处理完所有 T=0 的数据包后,桶中还剩 1 个令牌。到第四个数据包 T=2 到达时,桶内已经有1个令牌 + 2323 个令牌;当发送完第四个数据包时,桶内令牌数为 2323 。到 T=3 数据包时,桶内令牌为1,满足发送第 5 个数据包。万松完成后桶是空的,在后面 6 9 12时,都满足3/ms 一个数据包,都可以发送成功

场景2:另外一个实例,在同样的令牌桶规范下 TB(13,4)TB(13,4),数据包到达时间为 [0, 0, 0, 0, 12, 12, 12, 12, 24, 24, 24, 24] ,可以看到在这个场景下,数据到达为3个突发,每个突发4个数据包,此时每次发送完成后桶被清空,当再次填满时需要12ms,此时另外一组突发达。故这组数据是合规的。、

场景3:在同样的令牌桶规范下 TB(13,4)TB(13,4),数据包到达时间为 [0, 1, 2, 3, 4, 5] , 这组数据是不合规的

用表格形式表示如下:

数据包到达时间 0 1 2 3 4 5
发送前桶内令牌 4 3 1313 2 2323 2 1 1313 2323
发送后桶内令牌 3 2 1313 1 2323 1 1313 2323

如果一个数据包在桶中没有足够的令牌来发送它时到达,可以进行整形或管制,整形使数据包等到足够的令牌积累。管制会丢弃数据包。或者发送方可以立即发送数据包,但将其标记为不合规。

Principle of leaky bucket#

漏桶 (leaky bucket)是一种临时存储可变数量的请求并将它们组织成设定速率输出的数据包的方法。漏桶的概念与令牌桶比起是相反的,漏桶可以理解为是一个具有恒定服务时间的队列。

由下图可以看出,漏桶的概念是一个底部有孔的桶。无论水进入桶的速度是多少,它都会以恒定的速度通过孔从桶中泄漏出来。如果桶中没有水,则流速为零,如果桶已满,则多余的水溢出并丢失。

image

和令牌桶一样,漏桶用于流量整形和流量管制

Difference between Token and Leaky#

Leaky Token
桶中存放的是所有到达的数据包,必须入桶 桶中存放的是定期生成的令牌
桶以恒定速率泄漏 桶有最大容量 BmaxBmax
突发流量入桶转换为恒定流量发送 发送数据包需要小号对应的token

token较leaky的优势

  • 在令牌桶中,如果桶已满,处理的方式有 shaping和policing两种模型三种方式(延迟、丢弃、标记),而漏桶中的流量仅为shaping。
    • 通俗来说,就是令牌桶已满,丢弃的是令牌,漏桶中丢弃的则是数据包
  • 令牌桶可以更快的速率发送大突发流量,而漏桶仅是恒定速率

Implementation with go#

Token#

在golang中,内置的 rate 包实现了一个令牌桶算法,通过 rate.NewLimiter(r,B) 进行构造。与公式TB(r,Bmax)TB(r,Bmax) 意思相同。

type Limiter struct {
	limit Limit // 向桶中放置令牌的速率
	burst int // 桶的容量
	mu     sync.Mutex
	tokens float64 // 可用令牌容量
	last time.Time // 上次放入token的时间
	lastEvent time.Time
}

Limiter中带有三种方法, AllowReserveWait 分别表示Token Bucket中的 shapingpolicing

  • Allow:丢弃超过速率的事件,类似 drop
  • Wait:等待,直到获取到令牌或者取消或deadline/timeout
  • Reserve:等待或减速,不丢弃事件,类似于 delay

Reserve/ReserveN#

  • Reserve() 返回了 ReserveN(time.Now(), 1)
  • ReserveN() 无论如何都会返回一个 Reservation,指定了调用者在 n 个事件发生之前必须等待多长时间。
  • Reservation 是一个令牌桶事件信息
  • Reservation 中的 Delay() 方法返回了需要等待的时间,如果时间为0则不需要等待
  • Reservation 中的 Cancel() 将取消等待

wait/waitN

Allow/AllowN#

  • 在获取不到令牌是丢弃对应的事件
  • 返回的是一个 reserveN() 拿到token是合规的,并消耗掉token

AllowN 为截止到某一时刻,当前桶内桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token。反之不消费 Token,false。

func (lim *Limiter) AllowN(now time.Time, n int) bool {
	return lim.reserveN(now, n, 0).ok // 由于仅需要一个合规否,顾合规的通过,不合规的丢弃
}

reserveN() 是三个行为的核心,AllowN中指定的为 0 ,因为 maxFutureReserve 是最大的等待时间,AllowN给定的是0,即如果突发大的情况下丢弃额外的 Bc

func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
	lim.mu.Lock()

	if lim.limit == Inf {
		lim.mu.Unlock()
		return Reservation{
			ok:        true,
			lim:       lim,
			tokens:    n,
			timeToAct: now,
		}
	}
	// 这里拿到的是now,上次更新token时间和桶内token数量
	now, last, tokens := lim.advance(now)
	// 计算剩余的token
	tokens -= float64(n)

	// Calculate the wait duration
	var waitDuration time.Duration
	if tokens < 0 {
		waitDuration = lim.limit.durationFromTokens(-tokens)
	}

	// 确定是否合规,n是token
    // token 的数量要小于桶的容量,并且 等待时间小于最大等待时间
	ok := n <= lim.burst && waitDuration <= maxFutureReserve

	// Prepare reservation
	r := Reservation{
		ok:    ok,
		lim:   lim,
		limit: lim.limit,
	}
	if ok {
		r.tokens = n
		r.timeToAct = now.Add(waitDuration)
	}

	// Update state
	if ok {
		lim.last = now
		lim.tokens = tokens
		lim.lastEvent = r.timeToAct
	} else {
		lim.last = last
	}

	lim.mu.Unlock()
	return r
}

在reserveN中调用了一个 advance() 函数,

func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
   last := lim.last
   if now.Before(last) { // 计算上次放入token是否在传入now之前
      last = now
   }

   // 当 last 很旧时,避免在下面进行 delta 溢出。
   // maxElapsed 计算装满需要多少时间
   maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
   elapsed := now.Sub(last) // 上次装入到现在的时差
   if elapsed > maxElapsed { // 上次如果放入token时间超长,就让他与装满时间相等
      elapsed = maxElapsed // 即,让桶为满的
   }

   // 装桶的动作,下面函数表示,elapsed时间内可以生成多少个token
   delta := lim.limit.tokensFromDuration(elapsed)
   tokens := lim.tokens + delta // 当前的token
   if burst := float64(lim.burst); tokens > burst {
      tokens = burst // 这里表示token溢出,让他装满就好
   }

   return now, last, tokens
}

wait/waitN#

  • 桶内令牌可以>N时,返回,在获取不到令牌是阻塞,等待context取消或者超时
  • 返回的是一个 reserveN() 拿到token是合规的,并消耗掉token
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
	if n > lim.burst && lim.limit != Inf {
		return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
	}
	// 外部已取消
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}
	// Determine wait limit
	now := time.Now()
	waitLimit := InfDuration
	if deadline, ok := ctx.Deadline(); ok {
		waitLimit = deadline.Sub(now)
	}
	// 三个方法的核心,这里给定了deatline
	r := lim.reserveN(now, n, waitLimit)
	if !r.ok {
		return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
	}
	// Wait if necessary
	delay := r.DelayFrom(now)
	if delay == 0 {
		return nil
	}
	t := time.NewTimer(delay)
	defer t.Stop()
	select {
	case <-t.C:
		// We can proceed.
		return nil
	case <-ctx.Done():
		// Context was canceled before we could proceed.  Cancel the
		// reservation, which may permit other events to proceed sooner.
		r.Cancel()
		return ctx.Err()
	}
}

Dynamic Adjustment#

rate.limiter 中,支持调整速率和桶大小,这样就可以根据现有环境和条件,来动态的改变 Token生成速率和桶容量

  • SetLimit(Limit) 更改生成 Token 的速率
  • SetBurst(int) 改变桶容量

Example#

一个流量整形的场景#

package main

import (
	"log"
	"strconv"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	timeLayout := "2006-01-02:15:04:05.0000"
	limiter := rate.NewLimiter(1, 5) // BT(1,5)
	log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
	length := 20 // 一共请求20次
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string, r *rate.Limiter) {
			err := limiter.Allow()
			if !err {
				ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
			}

			time.Sleep(time.Duration(5) * time.Millisecond)
			ch <- "Task-" + taskId + " run success  " + time.Now().Format(timeLayout)
			return

		}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
	}
	for _, ch := range chs {
		log.Println("task start at " + <-ch)
	}
}

image

通过执行结果可以看出,在突发为20的情况下,allow仅允许了获得token的事件执行,,这种场景下实现了流量整形的特性。

一个流量管制的场景#

package main

import (
	"context"
	"log"
	"strconv"
	"time"

	"golang.org/x/time/rate"
)

func main() {
	timeLayout := "2006-01-02:15:04:05.0000"
	limiter := rate.NewLimiter(1, 5) // BT(1,5)
	log.Println("bucket current capacity: " + strconv.Itoa(limiter.Burst()))
	length := 20 // 一共请求20次
	chs := make([]chan string, length)
	for i := 0; i < length; i++ {
		chs[i] = make(chan string, 1)
		go func(taskId string, ch chan string, r *rate.Limiter) {
			err := limiter.Wait(context.TODO())
			if err != nil {
				ch <- "Task-" + taskId + " unallow " + time.Now().Format(timeLayout)
			}
			ch <- "Task-" + taskId + " run success  " + time.Now().Format(timeLayout)
			return

		}(strconv.FormatInt(int64(i), 10), chs[i], limiter)
	}
	for _, ch := range chs {
		log.Println("task start at " + <-ch)
	}
}

image

结果可以看出,在大突发的情况下,在拿到token的任务会立即执行,没有拿到token的会等待拿到token后继续执行,这种场景下实现了流量管制的特性

Reference

tokenbucket
QoS Policing


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK