3

并发请求量限制组件分享

 2 years ago
source link: https://studygolang.com/articles/35404
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

并发请求量限制组件分享

uuid · 大约19小时之前 · 71 次点击 · 预计阅读时间 7 分钟 · 大约8小时之前 开始浏览    

关于限流Go官方通过一个采用令牌池的算法的实现:golang.org/x/time/rate,但是,这个限制的是每秒的请求数,有的时候我们希望限制的是系统并发处理的请求数量,类似线程池的功能,需求如下:

  1. 设置一个最大的请求处理数量,当请求超过时,后续请求将等待,直到有请求处理完后被唤醒。
  2. 请求的等待时间能够指定,超出等待时间就返回,提示给客户端。
  3. 等待请求的个数需要能够限制,数量超过时就直接返回,提示给客户端。

设计思路是实现一个Ticket池(NumLimiter),每个请求首先需要向NumLimiter申请一个ticket,当请求处理结束后,需要被回收。

获取不到ticket的请求就等待现有的ticket释放,所以会有两个核心对象:

  1. NumLimiter:数量限制器(ticket 池)
  2. Ticket:入场券,请求需要先申请一个Ticket

先不考虑细节,可以设计如下:

package numlimiter

// 数量限制器
type NumLimiter struct {
  maxTicket   int // 最大请求数
  maxWait       int // 最大等待数
    ...
}
// 释放Ticket
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
    ...
}
// 预订Ticket
func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
    ...
}
// 创建一个tocket池
func New(maxTicket) *NumLimiter {
    l := &NumLimiter{
        maxTicket:   maxTicket,
    }
    return l
}
// 入场券
type Ticket struct {
    l      *NumLimiter
    reqKey int64
}
// 释放入场券
func (r *Ticket) Close() {
    r.l.releaseTicket(r)
}

NumLimiter有两个核心的方法:

  1. Reserve - 申请Ticket:每个请求处理前需要先调用该方法获取一个ticket,如果当前颁发的ticket数已经是大于等于 maxTicket时,请求就pending等待Ticket释放。 该方法接收一个context,作用是传递外部超时或取消的信号,结束等待。
  2. releaseTicket - 释放Ticket:当请求处理完就需要把持有的ticket释放,该方法不直接暴露给外部,提供给ticket的Close方法调用。

Ticket就只有一个Close方法:

  1. Close:调用NumLimiter的releaseTicket释放Ticket

客户端使用

每次处理请求需要先调用Reserve获取Ticket,获取到后才执行具体的业务逻辑,执行完毕后调用Close方法释放Ticket

l := numlimiter.New(2) 
func Do(req Request) error { // 模拟请求request
  tk, err := l.Reserve(context.Background()) // 申请Ticket
  if err != nil { // 异常
    return err
  }
  defer tk.Close() // 释放Ticket
  // 处理请求req
  ...
}

整个框架定义好了,接着开始撸具体实现

首先,需要给每个ticket标识一个唯一标识,我们定义一个reqKey序列,通过nextReqKeyLocked方法自增,调用时需要加锁,保证在NumLimiter实例生成的key是唯一,代码如下:

type NumLimiter struct {
  nextKey     int64 // 下一个请求的Key
    ...
}
// 每次调用nextKey自动+1,调用的时候需要加锁,保证协程安全
func (r *NumLimiter) nextReqKeyLocked() int64 {
    next := r.nextKey
    r.nextKey++
    return next
}

接着,我们开始实现核心的Reserve()方法,梳理后的逻辑如下:

  1. 当颁发的Ticket数量小于maxTicket时,创建一个Ticket直接返回。
  2. 如果Ticket数量大于等于maxTicket,就先判断当前wait请求数是否超过maxWait,如果”是“,直接返回相应的error。
  3. 如果wait数没超过,就pending等待Ticket释放,同时还得监听是否超时。

实现逻辑之前需要考虑:

  1. Ticket如何管理。想要统一管理已经发放的Ticket数量,就需要有地方存储,还能对NumLimiter中所有方法可见,所以在NumLimiter中增加一个tickets属性,类型为 :map[int64]*Ticket(注:key 为请求的key,value对应的是已经颁发的Ticket)
  2. 管理等待Ticket。同样等待Ticket的请求需要被存储,并且能够被唤醒。于是也可以在NumLimiter增加一个属性:waitTickets,类型为:map[int64]chan struct{}(注:key同样是请求的key,值比较特殊,使用chan,目的是为了其他协程安全访问,当没数据时读取会pending,被close后会继续,chan的类型我们不关注,所以直接使用空结构体struct{})
  3. 另外,为了保护这些共享资源,还需要一个锁:mu sync.Mutex:
type NumLimiter struct {
  maxTicket   int // 最大请求数
  maxWait   int // 最大等待数量
  mu          sync.Mutex
  nextKey     int64 // 下一个请求的Key
  tickets     map[int64]*Ticket
  waitTickets map[int64]chan struct{}
    ...
}

接下来就可以开始实现Reserve方法

func (r *NumLimiter) Reserve(ctx context.Context) (*Ticket, error) {
    r.mu.Lock()
    reqKey := r.nextReqKeyLocked()
    t := &Ticket{l: r, reqKey: reqKey, lg: r.lg, create: time.Now()}
    // 当请求数量大于maxTicket就放到waitTickets中等待
    if len(r.tickets) >= r.maxTicket {
        if len(waitTickets) > r.maxWait {
            return nil, errors.New("waiting exceed max wait")
        }
        req := make(chan struct{})
        now := time.Now()
        r.lg.Warnf("request num exceed %d, reqkey [%d] waiting for ticket, req processing num = %d, total wait num = %d", r.maxTicket, reqKey, len(r.tickets), len(r.waitTickets)+1)
        r.waitTickets[reqKey] = req
        r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁

        select {
        case <-ctx.Done():
            r.lg.Errorf("limiter wait timeout: key = %d, cost = %f", reqKey, time.Now().Sub(now).Seconds())
            r.mu.Lock()
            delete(r.waitTickets, reqKey)
            r.mu.Unlock()
            select {
            default:
            case <-req:
                t.Close() // 返回ticket
            }
            return nil, ctx.Err()
        case <-req:
            r.mu.Lock()
            r.tickets[reqKey] = t
            r.mu.Unlock()
            r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
            return t, nil
        }
    }
    r.tickets[reqKey] = t
    r.mu.Unlock()
    return t, nil
}

虽然代码看着比较长,但是整个实现没太多复杂逻辑,核心代码就是等待ticket和被唤醒部分:

req := make(chan struct{})
r.waitTickets[reqKey] = req
r.mu.Unlock() // 需要立即解锁,否则会导致其他协程调用Reserve或releaseTicket方法获取不到锁
select {
  ...
  case <-req:
  r.mu.Lock()
  r.tickets[reqKey] = t
  r.mu.Unlock()
  r.lg.Debugf("req key = %d get ticket, waiting time = %f", reqKey, time.Now().Sub(now).Seconds())
  return t, nil
}

这里是利用chan特性,当要pending等待时,会创建一个请求chan:req := make(chan struct{}),然后放到waitTickets后就立即解锁(目的是让其他协程能获取到锁),chan在没数据写入或chan没有被关闭的情况下会pending,如果一旦有ticket释放,会通过close这个chan方式通知继续。 另外,超时的实现是借助context来实现,通过监听ctx.Done()方法,同时还要注意并发问题,超时的时候还是有可能获取到锁,所以还是得再检查一下case <-req是否成立,成立就说明超时的同时也正好获取到ticket,但是由于超时了,ticket就没用了,直接释放t.Close()。

接着,我们来实现ticket释放逻辑

  1. 删除tickets中对应的数据。(从tickets移除了,所以相当于将ticket释放了)
  2. 如果waitTickets没有数据就直接返回。len(tickets)数量已经-1,相当于ticket释放到池中。
  3. 如果waitTickets有等待ticket的请求,就直接通知其中的一个等待ticket的请求可以继续,然后等待请求从waitTickets删除,相当于将要释放的ticket直接移交给等待ticket的请求。
func (r *NumLimiter) releaseTicket(t *Ticket) bool {
    r.mu.Lock()
    defer r.mu.Unlock()
  // 删除tickets中对应的数据
    releaseSuccess := true
    if _, ok := r.tickets[t.reqKey]; ok {
        delete(r.tickets, t.reqKey)
    } else {
        releaseSuccess = false
    }
  // 如果waitTickets有等待ticket的请求
    if len(r.waitTickets) > 0 {
        var req chan struct{}
        var reqKey int64
    // 取出一条
        for reqKey, req = range r.waitTickets {
            break
        }
        close(req) // 通过close方式,通知等待ticket的协程继续
        delete(r.waitTickets, reqKey)// 从waitTickets删除
    }
    return releaseSuccess
}

这里的通知方式采用close(req)的方式传输信号,相应在Reserve()方法的select case <-req等待的请求就会收到信号,继续执行,同时将获取到的ticket保存在tickets中,返回对应的ticket后,客户端获取到ticket就可以继续请求的处理。

另外,实际上releaseTicket方法是不直接暴露给客户端,而是提供给ticket的close方法调用:

func (r *Ticket) Close() {
    if !r.l.releaseTicket(r) {
        r.lg.Errorf("limiter ticket release error: req key = %d", r.reqKey)
    }
}

这样当获得到ticket后,客户端可以把这ticket对象传到方法,释放的时候就直接调用ticket的close方法,就不需要管NumLimiter对象。

最后增加一个初始化方法,方便实例化NumLimiter:

func New(maxTicket, maxWait int) *NumLimiter {
    l := &NumLimiter{
        waitTickets: map[int64]chan struct{}{},
        tickets:     map[int64]*Ticket{},
        maxTicket:   maxTicket,
    maxWait:         maxWait,
    }
    return l
}

这样一个完整限量的功能就完成了。

限量的实现是参考database/sql 设计,核心的思想是如何合理管理ticket,超出时借助chan实现等待,还有context实现超时,当ticket释放,通过close chan来实现广播,通知对应的等待请求可以继续。

我的博客:https://itart.cn/blogs/2022/practice/num-limiter-library.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK