31

Golang rate无法延迟重排的BUG – 峰云就她了

 4 years ago
source link: http://xiaorui.cc/2019/06/28/golang-rate%E6%97%A0%E6%B3%95%E5%BB%B6%E8%BF%9F%E9%87%8D%E6%8E%92%E7%9A%84bug/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Golang rate无法延迟重排的BUG – 峰云就她了

专注于Golang、Python、DB、cluster

研究golang rate限频的源码时候,发现rate的Wait,WaitN存在一个问题。当waiter提前return后,当前和后面waiter无法进行延迟时间重排,回收的问题。

该文章后续仍在不断的更新修改中, 请移步到原文地址  http://xiaorui.cc/?p=5930

我们知道go rate waitN是可以传递context的。比如,在一个场景里,5秒放一个token,池的大小就一个。开始协程g1拿到了token,g2、g3也需要token, 但是rate池子里已经没有token, g2、g3自然就需要wait方法去等待新token的产生, 那么g2,、g3需要等待多久? 最好的计划肯定是等待到下次产生token的时候,简单说g2等待5s, g3需要等待到10s。

当我们通过传递context来主动关闭g2的等待,但协程 g3 还是在等待10s。也就是说,g2退出了,按理来说后面的rate waiter应该调整下时间。但go rate没有做这方面的处理。

如何解决?

我自己想了差不多有三种方法吧。

第一种方法,修改go rate源码,可以把wait里的timer放在heap里,某waiter退出后,我们可以把大于该waiter等待时间的timer,重新reset一下。当然这个复杂度有点大了。

第二种方法,所有协程统一按照下次token的生产时间来等待,但这个问题就有点忙轮询和竞争了。或者可以自定义等待时间加配 rate allow 非阻塞方法。

第三种方法,自己去实现限频模块,new一个协程专门来生产token,可以用chan来做通知。

我最后采用的是第三种,就是自己实现令牌桶。

go rate异常源码分析:

通过代码我们拿到,最后的wait方法select两个chan上,一个是正常可拿到token的timeout定时器,另一个是业务传递进来的context上下文。
当上层把context对应的cancelFunc关闭了,wait在return之前,也只是更新lastEvent和tokens。目的在于,先的waiter在等待时,会稍微减少了一个token等待时间。

// xiaorui.cc

// 等待 token 
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
    if n > lim.burst && lim.limit != Inf {
        return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst)
    }
    ...
    waitLimit := InfDuration
    if deadline, ok := ctx.Deadline(); ok {
        waitLimit = deadline.Sub(now)
    }
    // Reserve
    r := lim.reserveN(now, n, waitLimit)
    if !r.ok {
        return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
    }
    // Wait if necessary
    delay := r.DelayFrom(now)
    if delay == 0 {
        // 需要等待,直接拿到了token
        return nil
    }
    t := time.NewTimer(delay)
    defer t.Stop()
    select {
    case <-t.C:
        // 按照分配好的delay时间去等待,自然是拿到了token
        return nil
    case <-ctx.Done():
        r.Cancel()  // 减少下个协程的等待时间
        return ctx.Err()
    }
}

// 给waiter算出需要等待的时间
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
    last := lim.last
    if now.Before(last) {
        last = now
    }

    // Avoid making delta overflow below when last is very old.
    maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens)
    elapsed := now.Sub(last)
    if elapsed > maxElapsed {
        elapsed = maxElapsed
    }

    // Calculate the new number of tokens, due to time that passed.
    delta := lim.limit.tokensFromDuration(elapsed)
    tokens := lim.tokens + delta
    if burst := float64(lim.burst); tokens > burst {
        tokens = burst
    }

    return now, last, tokens
}

需要注意,go rate提供的三个方法中,allow是非阻塞的特性,wait及reserve都是阻塞的特性。reserve / reserveN也是存在上面goroutine退出后,等待时间无法重排的问题,毕竟wait里主要调用的是Reserve的方法。

// xiaorui.cc

func reserveBug() {
	l := rate.NewLimiter(1, 1)
	for index := 0; index < 10; index++ {
		wg.Add(1)
		go func() {
			r := l.ReserveN(time.Now(), 1)
			time.Sleep(200 * time.Millisecond)
			r.Cancel()
			wg.Done()
		}()
	}

	wg.Wait()
	r := l.ReserveN(time.Now(), 1)
	fmt.Println("reserve need wait: ", r.Delay())
}

rate作为go的标准库,他的实现确实很巧妙。使用预计token的生产时间来分配一个个waiter等待者的延迟时间。
但这样带来的问题,一方面充斥了许多的定时器,另一方面不能让waiter快速的收敛排队。这类定时器在高并发下无疑是性能的杀手。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc
weixin_new.jpg

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK