4

Asp.Net Core 7 preview 4 重磅新特性--限流中间件 - gui.h

 1 year ago
source link: https://www.cnblogs.com/springhgui/p/16264864.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

限流是应对流量暴增或某些用户恶意攻击等场景的重要手段之一,然而微软官方从未支持这一重要特性,AspNetCoreRateLimit这一第三方库限流库一般作为首选使用,然而其配置参数过于繁多,对使用者造成较大的学习成本。令人高兴的是,在刚刚发布的.NET 7 Preview 4中开始支持限流中间件。

UseRateLimiter尝鲜

  1. 安装.NET 7.0 SDK(v7.0.100-preview.4)
  2. 通过nuget包安装Microsoft.AspNetCore.RateLimiting
  3. 创建.Net7网站应用,注册中间件

全局限流并发1个

app.UseRateLimiter(new RateLimiterOptions
{
    Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
    {
        return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
            _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
    })
});

根据不同资源不同限制并发数/api前缀的资源租约数2,等待队列长度为2,其他默认租约数1,队列长度1。

app.UseRateLimiter(new RateLimiterOptions()
{
    // 触发限流的响应码
    DefaultRejectionStatusCode = 500,
    OnRejected = async (ctx, rateLimitLease) =>
    {
        // 触发限流回调处理
    },
    Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
    {
        if (resource.Request.Path.StartsWithSegments("/api"))
        {
            return RateLimitPartition.CreateConcurrencyLimiter("WebApiLimiter",
                _ => new ConcurrencyLimiterOptions(2, QueueProcessingOrder.NewestFirst, 2));
        }
        else
        {
            return RateLimitPartition.CreateConcurrencyLimiter("DefaultLimiter",
                _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
        }
    })
});

  1. 新建一个webapi项目,并注册限流中间件如下
using Microsoft.AspNetCore.RateLimiting;
using System.Threading.RateLimiting;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseRateLimiter(new RateLimiterOptions
{
    DefaultRejectionStatusCode = 500,
    OnRejected = async (ctx, lease) =>
    {
        await Task.FromResult(ctx.Response.WriteAsync("ConcurrencyLimiter"));
    },
    Limiter = PartitionedRateLimiter.Create<HttpContext, string>(resource =>
    {
        return RateLimitPartition.CreateConcurrencyLimiter("MyLimiter",
            _ => new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1));
    })
});

app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

  1. 启动项目,使用jmeter测试100并发,请求接口/WeatherForecast
    27057e92-a89d-4071-b2ce-cd2c5e78641c.png

    所有请求处理成功,失败0!

这个结果是不是有点失望,其实RateLimitPartition.CreateConcurrencyLimiter创建的限流器是
ConcurrencyLimiter,后续可以实现个各种策略的限流器进行替换之。
看了ConcurrencyLimiter的实现,其实就是令牌桶的限流思想,上面配置的new ConcurrencyLimiterOptions(1, QueueProcessingOrder.NewestFirst, 1)),第一个1代表令牌的个数,第二个1代表可以当桶里的令牌为空时,进入等待队列,而不是直接失败,当前面的请求结束后,会归还令牌,此时等待的请求就可以拿到令牌了,QueueProcessingOrder.NewestFirst代表最新的请求优先获取令牌,也就是获取令牌时非公平的,还有另一个枚举值QueueProcessingOrder.OldestFirst老的优先,获取令牌是公平的。只要我们获取到令牌的人干活速度快,虽然我们令牌只有1,并发就很高。
3. 测试触发失败场景
只需要让我们拿到令牌的人持有时间长点,就能轻易的触发。

f94a9647-5a82-4740-ad5e-350e97f60c05.png


调整jmater并发数为10

dc9128dc-bbd3-44bb-ac72-c2ecd1c4d20f.png

相应内容也是我们设置的内容。

933a3181-2c32-4608-a381-5f186eff3c2e.png

ConcurrencyLimiter源码

令牌桶限流思想

        protected override RateLimitLease AcquireCore(int permitCount)
        {
            // These amounts of resources can never be acquired
            if (permitCount > _options.PermitLimit)
            {
                throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));
            }

            ThrowIfDisposed();

            // Return SuccessfulLease or FailedLease to indicate limiter state
            if (permitCount == 0)
            {
                return _permitCount > 0 ? SuccessfulLease : FailedLease;
            }

            // Perf: Check SemaphoreSlim implementation instead of locking
            if (_permitCount >= permitCount)
            {
                lock (Lock)
                {
                    if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
                    {
                        return lease;
                    }
                }
            }

            return FailedLease;
        }

尝试获取令牌核心逻辑

        private bool TryLeaseUnsynchronized(int permitCount, [NotNullWhen(true)] out RateLimitLease? lease)
        {
            ThrowIfDisposed();

            // if permitCount is 0 we want to queue it if there are no available permits
            if (_permitCount >= permitCount && _permitCount != 0)
            {
                if (permitCount == 0)
                {
                    // Edge case where the check before the lock showed 0 available permits but when we got the lock some permits were now available
                    lease = SuccessfulLease;
                    return true;
                }

                // a. if there are no items queued we can lease
                // b. if there are items queued but the processing order is newest first, then we can lease the incoming request since it is the newest
                if (_queueCount == 0 || (_queueCount > 0 && _options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst))
                {
                    _idleSince = null;
                    _permitCount -= permitCount;
                    Debug.Assert(_permitCount >= 0);
                    lease = new ConcurrencyLease(true, this, permitCount);
                    return true;
                }
            }

            lease = null;
            return false;
        }

令牌获取失败后进入等待队列

 protected override ValueTask<RateLimitLease> WaitAsyncCore(int permitCount, CancellationToken cancellationToken = default)
        {
            // These amounts of resources can never be acquired
            if (permitCount > _options.PermitLimit)
            {
                throw new ArgumentOutOfRangeException(nameof(permitCount), permitCount, SR.Format(SR.PermitLimitExceeded, permitCount, _options.PermitLimit));
            }

            // Return SuccessfulLease if requestedCount is 0 and resources are available
            if (permitCount == 0 && _permitCount > 0 && !_disposed)
            {
                return new ValueTask<RateLimitLease>(SuccessfulLease);
            }

            // Perf: Check SemaphoreSlim implementation instead of locking
            lock (Lock)
            {
                if (TryLeaseUnsynchronized(permitCount, out RateLimitLease? lease))
                {
                    return new ValueTask<RateLimitLease>(lease);
                }

                // Avoid integer overflow by using subtraction instead of addition
                Debug.Assert(_options.QueueLimit >= _queueCount);
                if (_options.QueueLimit - _queueCount < permitCount)
                {
                    if (_options.QueueProcessingOrder == QueueProcessingOrder.NewestFirst && permitCount <= _options.QueueLimit)
                    {
                        // remove oldest items from queue until there is space for the newest request
                        do
                        {
                            RequestRegistration oldestRequest = _queue.DequeueHead();
                            _queueCount -= oldestRequest.Count;
                            Debug.Assert(_queueCount >= 0);
                            if (!oldestRequest.Tcs.TrySetResult(FailedLease))
                            {
                                // Updating queue count is handled by the cancellation code
                                _queueCount += oldestRequest.Count;
                            }
                        }
                        while (_options.QueueLimit - _queueCount < permitCount);
                    }
                    else
                    {
                        // Don't queue if queue limit reached and QueueProcessingOrder is OldestFirst
                        return new ValueTask<RateLimitLease>(QueueLimitLease);
                    }
                }

                CancelQueueState tcs = new CancelQueueState(permitCount, this, cancellationToken);
                CancellationTokenRegistration ctr = default;
                if (cancellationToken.CanBeCanceled)
                {
                    ctr = cancellationToken.Register(static obj =>
                    {
                        ((CancelQueueState)obj!).TrySetCanceled();
                    }, tcs);
                }

                RequestRegistration request = new RequestRegistration(permitCount, tcs, ctr);
                _queue.EnqueueTail(request);
                _queueCount += permitCount;
                Debug.Assert(_queueCount <= _options.QueueLimit);

                return new ValueTask<RateLimitLease>(request.Tcs.Task);
            }
        }
 private void Release(int releaseCount)
        {
            lock (Lock)
            {
                if (_disposed)
                {
                    return;
                }

                _permitCount += releaseCount;
                Debug.Assert(_permitCount <= _options.PermitLimit);

                while (_queue.Count > 0)
                {
                    RequestRegistration nextPendingRequest =
                        _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                        ? _queue.PeekHead()
                        : _queue.PeekTail();

                    if (_permitCount >= nextPendingRequest.Count)
                    {
                        nextPendingRequest =
                            _options.QueueProcessingOrder == QueueProcessingOrder.OldestFirst
                            ? _queue.DequeueHead()
                            : _queue.DequeueTail();

                        _permitCount -= nextPendingRequest.Count;
                        _queueCount -= nextPendingRequest.Count;
                        Debug.Assert(_permitCount >= 0);

                        ConcurrencyLease lease = nextPendingRequest.Count == 0 ? SuccessfulLease : new ConcurrencyLease(true, this, nextPendingRequest.Count);
                        // Check if request was canceled
                        if (!nextPendingRequest.Tcs.TrySetResult(lease))
                        {
                            // Queued item was canceled so add count back
                            _permitCount += nextPendingRequest.Count;
                            // Updating queue count is handled by the cancellation code
                            _queueCount += nextPendingRequest.Count;
                        }
                        nextPendingRequest.CancellationTokenRegistration.Dispose();
                        Debug.Assert(_queueCount >= 0);
                    }
                    else
                    {
                        break;
                    }
                }

                if (_permitCount == _options.PermitLimit)
                {
                    Debug.Assert(_idleSince is null);
                    Debug.Assert(_queueCount == 0);
                    _idleSince = Stopwatch.GetTimestamp();
                }
            }
        }

虽然这次官方对限流进行了支持,但貌似还不能支持对ip或client级别的限制支持,对于更高级的限流策略仍需要借助第三方库或自己实现,期待后续越来越完善。

__EOF__


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK