42

来谈谈限流:从概念到实现

 5 years ago
source link: https://github.com/farmerjohngit/myblog/issues/18?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

后端服务的接口都是有访问上限的,如果外部QPS或并发量超过了访问上限会导致应用瘫痪。所以一般都会对接口调用加上限流保护,防止超出预期的请求导致系统故障。

从限流类型来说一般来说分为两种:并发数限流和qps限流,并发数限流就是限制同一时刻的最大并发请求数量,qps限流指的是限制一段时间内发生的请求个数。

从作用范围的层次上来看分单机限流和分布式限流,前者是针对单机的,后者是针对集群的,他们的思想都是一样的,只不过是范围不一样,本文分析的都是 单机限流

接下来我们看看并发数限流和QPS限流。

更多文章见个人博客: https://github.com/farmerjohngit/myblog

并发数限流

并发数限流限制的是同一时刻的并发数,所以不考虑线程安全的话,我们只要用一个int变量就能实现,伪代码如下:

int maxRequest=100;
int nowRequest=0;

public void request(){
    if(nowRequest>=maxRequest){
        return ;
    }
    nowRequest++;
    //调用接口
    try{
         invokeXXX();    
    }finally{
         nowRequest--;
    }
}

显然,上述实现会有线程安全的问题,最直接的做法是加锁:

int maxRequest=100;
int nowRequest=0;
 
public void request(){
    if(nowRequest>=maxRequest){
        return ;
    }
	synchronized(this){
         if(nowRequest>=maxRequest){
        	return ;
    	}
    	nowRequest++;
	}
   
    //调用接口
    try{
         invokeXXX();    
    }finally{
        synchronized(this){
         	nowRequest--;
        }
    }
}

当然也可以用AtomicInteger实现:

int maxRequest=100;
AtomicInteger nowRequest=new AtomicInteger(0);
 
public void request(){
    for(;;){
        int currentReq=nowRequest.get();
        if(currentReq>=maxRequest){
            return;
        }
        if(nowRequest.compareAndSet(currentReq,currentReq+1)){
            break;
        }
    }
 
    //调用接口
    try{
         invokeXXX();    
    }finally{
        nowRequest.decrementAndGet();
    }
}

熟悉JDK并发包的同学会说干嘛这么麻烦,这不就是信号量(Semaphore)做的事情吗? 对的,其实最简单的方法就是用信号量来实现:

int maxRequest=100;
Semaphore reqSemaphore = new Semaphore(maxRequest);
 
public void request(){
    if(!reqSemaphore.tryAcquire()){
        return ;
    }
 
    //调用接口
    try{
         invokeXXX();    
    }finally{
       reqSemaphore.release();
    }
}

条条大路通罗马,并发数限流比较简单,一般来说用信号量就好。

QPS限流

QPS限流限制的是一段时间内(一般指1秒)的请求个数。

计数器法

最简单的做法用一个int型的count变量做计数器:请求前计数器+1,如超过阈值并且与第一个请求的间隔还在1s内,则限流。

伪代码如下:

int maxQps=100;
int count;
long timeStamp=System.currentTimeMillis();
long interval=1000;

public synchronized boolean grant(){
	long now=System.currentTimeMillis();
    if(now<timeStamp+interval){
        count++;
        return count<maxQps;
    }else{
        timeStamp=now;
        count=1;
        return true;
    }
}

该种方法实现起来很简单,但其实是有临界问题的,假如在第一秒的后500ms来了100个请求,第2秒的前500ms来了100个请求,那在这1秒内其实最大QPS为200。如下图:

VzIrE3n.png!web

计数器法会有临界问题,主要还是统计的精度太低,这点可以通过滑动窗口算法解决

滑动窗口

我们用一个长度为10的数组表示1秒内的QPS请求,数组每个元素对应了相应100ms内的请求数。用一个 sum 变量代码当前1s的请求数。同时每隔100ms将淘汰过期的值。

伪代码如下:

int maxQps=100;
AtomicInteger[] count=new AtomicInteger[10];
long timeStamp=System.currentTimeMillis();
long interval=1000;
AtomicInteger sum;
volatile int index;

public void init(){
    for(int i=0;i<count.length;i++){
        count[i]=new AtomicInteger(0);
    }
    sum=new AtomicInteger(0);
}

public synchronized boolean  grant(){
    count[index].incrementAndGet();
    return sum.incrementAndGet()<maxQps;
}

//每100ms执行一次
public void run(){
    index=(index+1)%count.length;
    int val=count[index].getAndSet(0);
    sum.addAndGet(-val);
}

滑动窗口的窗口越小,则精度越高,相应的资源消耗也更高。

漏桶算法

漏桶算法思路是,有一个固定大小的桶,水(请求)忽快忽慢的进入到漏桶里,漏桶以一定的速度出水。当桶满了之后会发生溢出。

eeUFNnF.jpg!web

维基百科 上可以看到,漏桶算法有两种实现,一种是 as a meter ,另一种是 as a queue网上大多数文章都没有提到其有两种实现,且对这两种概念混乱。

As a meter

第一种实现是和令牌桶等价的,只是表述角度不同。

伪代码如下:

long timeStamp=System.currentTimeMillis();//上一次调用grant的时间
int bucketSize=100;//桶大小
int rate=10;//每ms流出多少请求
int count;//目前的水量

public synchronized boolean grant(){
    long now = System.currentTimeMillis();
    if(now>timeStamp){
         count = Math.max(0,count-(now-timeStamp)*rate); 
         timeStamp = now;
    }
 
    if(count+1<=bucketSize){
        count++;
        return true;
    }else{
        return false;
    }
}

该种实现允许一段时间内的突发流量,比如初始时桶中没有水,这时1ms内来了100个请求,这100个请求是不会被限流的,但之后每ms最多只能接受10个请求(比如下1ms又来了100个请求,那其中90个请求是会被限流的)。

其达到的效果和令牌桶一样。

As a queue

第二种实现是用一个队列实现,当请求到来时如果队列没满则加入到队列中,否则拒绝掉新的请求。同时会以恒定的速率从队列中取出请求执行。

VZnMneb.png!web

伪代码如下:

Queue<Request> queue=new LinkedBlockingQueue(100);
int gap;
int rate;

public synchronized boolean grant(Request req){
	if(!queue.offer(req)){return false;}
}

// 单独线程执行
void consume(){
    while(true){
        for(int i=0;i<rate;i++){
            //执行请求
            Request req=queue.poll();
            if(req==null){break;}
            req.doRequest();
        }
        Thread.sleep(gap);
    }
}

对于该种算法,固定的限定了请求的速度,不允许流量突发的情况。

比如初始时桶是空的,这时1ms内来了100个请求,那只有前10个会被接受,其他的会被拒绝掉。注意与上文中 as a meter 实现的区别。

**不过,当桶的大小等于每个ticket流出的水大小时,第二种漏桶算法和第一种漏桶算法是等价的。**也就是说, as a queueas a meter 的一种特殊实现。如果你没有理解这句话,你可以再看看上面 as a meter 的伪代码,当 bucketSize==rate 时,请求速度就是恒定的,不允许突发流量。

令牌桶算法

令牌桶算法的思想就是,桶中最多有N个令牌,会以一定速率往桶中加令牌,每个请求都需要从令牌桶中取出相应的令牌才能放行,如果桶中没有令牌则被限流。

6rQjAbi.jpg!web

令牌桶算法与上文的漏桶算法 as a meter 实现是等价的,能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。伪代码:

int token;
int bucketSize;
int rate;
long timeStamp=System.currentTimeMillis();

public synchronized boolean grant(){
	long now=System.currentTimeMillis();
    if(now>timeStamp){
         token=Math.max(bucketSize,token+(timeStamp-now)*rate);
         timeStamp=now;
    }
    if(token>0){
        token--;
    	return true;
    }else{
        return false;
    }
    
}

漏桶算法两种实现和令牌桶算法的对比

as a meter 的漏桶算法和令牌桶算法是一样的,只是思想角度有所不同。

as a queue 的漏桶算法能强行限制数据的传输速率,而令牌桶和 as a meter 漏桶则 能够在限制数据的平均传输速率的同时还允许某种程度的突发传输。

一般业界用的比较多的是令牌桶算法,像guava中的 RateLimiter 就是基于令牌桶算法实现的。当然不同的业务场景会有不同的需要,具体的选择还是要结合场景。

End

本文介绍了后端系统中常用的限流算法,对于每种算法都有对应的伪代码,结合伪代码理解起来应该不难。但伪代码中只是描述了大致思想,对于一些细节和效率问题并没有关注,所以下篇文章将会分析常用限流API:guava的 RateLimiter 的源码实现,让读者对于限流有个更清晰的认识。

参考文章

http://blog.zhuxingsheng.com/blog/counter-algorithm.html

http://blog.51cto.com/leyew/860302

https://en.wikipedia.org/wiki/Leaky_bucket


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK