ASP.NET Core 3.x 并发限制的实现代码

前言

Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于传入的请求进行排队处理,避免线程池的不足.
我们日常开发中可能常做的给某web服务器配置连接数以及,请求队列大小,那么今天我们看看如何在通过中间件形式实现一个并发量以及队列长度限制.

Queue策略

添加Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

    public void ConfigureServices(IServiceCollection services)
    {
      services.AddQueuePolicy(options =>
      {
        //最大并发请求数
        options.MaxConcurrentRequests = 2;
        //请求队列长度限制
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
      //添加并发限制中间件
      app.UseConcurrencyLimiter();
      app.Run(async context =>
      {
        Task.Delay(100).Wait(); // 100ms sync-over-async

        await context.Response.WriteAsync("Hello World!");
      });
      if (env.IsDevelopment())
      {
        app.UseDeveloperExceptionPage();
      }

      app.UseHttpsRedirection();

      app.UseRouting();

      app.UseAuthorization();

      app.UseEndpoints(endpoints =>
      {
        endpoints.MapControllers();
      });
    }

通过上面简单的配置,我们就可以将他引入到我们的代码中,从而做并发量限制,以及队列的长度;那么问题来了,他是怎么实现的呢?

 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
    services.Configure(configure);
    services.AddSingleton<IQueuePolicy, QueuePolicy>();
    return services;
}

QueuePolicy采用的是SemaphoreSlim信号量设计,SemaphoreSlim、Semaphore(信号量)支持并发多线程进入被保护代码,对象在初始化时会指定 最大任务数量,当线程请求访问资源,信号量递减,而当他们释放时,信号量计数又递增。

   /// <summary>
    ///   构造方法(初始化Queue策略)
    /// </summary>
    /// <param name="options"></param>
    public QueuePolicy(IOptions<QueuePolicyOptions> options)
    {
      _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
      if (_maxConcurrentRequests <= 0)
      {
        throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
      }

      _requestQueueLimit = options.Value.RequestQueueLimit;
      if (_requestQueueLimit < 0)
      {
        throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
      }
      //使用SemaphoreSlim来限制任务最大个数
      _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
    }

ConcurrencyLimiterMiddleware中间件

    /// <summary>
    /// Invokes the logic of the middleware.
    /// </summary>
    /// <param name="context">The <see cref="HttpContext"/>.</param>
    /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
    public async Task Invoke(HttpContext context)
    {
      var waitInQueueTask = _queuePolicy.TryEnterAsync();

      // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
      bool result;

      if (waitInQueueTask.IsCompleted)
      {
        ConcurrencyLimiterEventSource.Log.QueueSkipped();
        result = waitInQueueTask.Result;
      }
      else
      {
        using (ConcurrencyLimiterEventSource.Log.QueueTimer())
        {
          result = await waitInQueueTask;
        }
      }

      if (result)
      {
        try
        {
          await _next(context);
        }
        finally
        {
          _queuePolicy.OnExit();
        }
      }
      else
      {
        ConcurrencyLimiterEventSource.Log.RequestRejected();
        ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
        context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
        await _onRejected(context);
      }
    }

每次当我们请求的时候首先会调用_queuePolicy.TryEnterAsync(),进入该方法后先开启一个私有lock锁,再接着判断总请求量是否≥(请求队列限制的大小+最大并发请求数),如果当前数量超出了,那么我直接抛出,送你个503状态;

 if (result)
 {
     try
     {
       await _next(context);
     }
     finally
    {
      _queuePolicy.OnExit();
    }
    }
    else
    {
      ConcurrencyLimiterEventSource.Log.RequestRejected();
      ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
      context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
      await _onRejected(context);
    }

问题来了,我这边如果说还没到你设置的大小呢,我这个请求没有给你服务器造不成压力,那么你给我处理一下吧.

await _serverSemaphore.WaitAsync();异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止

 lock (_totalRequestsLock)
  {
    if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
    {
       return false;
    }
      TotalRequests++;
    }
    //异步等待进入信号量,如果没有线程被授予对信号量的访问权限,则进入执行保护代码;否则此线程将在此处等待,直到信号量被释放为止
    await _serverSemaphore.WaitAsync();
    return true;
  }

返回成功后那么中间件这边再进行处理,_queuePolicy.OnExit();通过该调用进行调用_serverSemaphore.Release();释放信号灯,再对总请求数递减

Stack策略

再来看看另一种方法,栈策略,他是怎么做的呢?一起来看看.再附加上如何使用的代码.

   public void ConfigureServices(IServiceCollection services)
    {
      services.AddStackPolicy(options =>
      {
        //最大并发请求数
        options.MaxConcurrentRequests = 2;
        //请求队列长度限制
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }

通过上面的配置,我们便可以对我们的应用程序执行出相应的策略.下面再来看看他是怎么实现的呢

 public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
    {
      services.Configure(configure);
      services.AddSingleton<IQueuePolicy, StackPolicy>();
      return services;
    }

可以看到这次是通过StackPolicy类做的策略.来一起来看看主要的方法

    /// <summary>
    ///   构造方法(初始化参数)
    /// </summary>
    /// <param name="options"></param>
    public StackPolicy(IOptions<QueuePolicyOptions> options)
    {
      //栈分配
      _buffer = new List<ResettableBooleanCompletionSource>();
      //队列大小
      _maxQueueCapacity = options.Value.RequestQueueLimit;
      //最大并发请求数
      _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
      //剩余可用空间
      _freeServerSpots = options.Value.MaxConcurrentRequests;
    }

当我们通过中间件请求调用,_queuePolicy.TryEnterAsync()时,首先会判断我们是否还有访问请求次数,如果_freeServerSpots>0,那么则直接给我们返回true,让中间件直接去执行下一步,如果当前队列=我们设置的队列大小的话,那我们需要取消先前请求;每次取消都是先取消之前的保留后面的请求;

  public ValueTask<bool> TryEnterAsync()
    {
      lock (_bufferLock)
      {
        if (_freeServerSpots > 0)
        {
          _freeServerSpots--;
          return _trueTask;
        }
        // 如果队列满了,取消先前的请求
        if (_queueLength == _maxQueueCapacity)
        {
          _hasReachedCapacity = true;
          _buffer[_head].Complete(false);
          _queueLength--;
        }
        var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
        _cachedResettableTCS = null;
        if (_hasReachedCapacity || _queueLength < _buffer.Count)
        {
          _buffer[_head] = tcs;
        }
        else
        {
          _buffer.Add(tcs);
        }
        _queueLength++;
        // increment _head for next time
        _head++;
        if (_head == _maxQueueCapacity)
        {
          _head = 0;
        }
        return tcs.GetValueTask();
      }
    }

当我们请求后调用_queuePolicy.OnExit();出栈,再将请求长度递减

  public void OnExit()
    {
      lock (_bufferLock)
      {
        if (_queueLength == 0)
        {
          _freeServerSpots++;

          if (_freeServerSpots > _maxConcurrentRequests)
          {
            _freeServerSpots--;
            throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
          }

          return;
        }

        // step backwards and launch a new task
        if (_head == 0)
        {
          _head = _maxQueueCapacity - 1;
        }
        else
        {
          _head--;
        }
        //退出,出栈
        _buffer[_head].Complete(true);
        _queueLength--;
      }
    }

总结

基于栈结构的特点,在实际应用中,通常只会对栈执行以下两种操作:

  • 向栈中添加元素,此过程被称为"进栈"(入栈或压栈);
  • 从栈中提取出指定元素,此过程被称为"出栈"(或弹栈);

队列存储结构的实现有以下两种方式:

  • 顺序队列:在顺序表的基础上实现的队列结构;
  • 链队列:在链表的基础上实现的队列结构;

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • asp.net core 系列之并发冲突的深入理解

    本文介绍如何处理多个用户并发更新同一实 体(同时)时出现的冲突 . 主要是两种:一种,检查属性并发冲突,使用 [ConcurrencyCheck] ;另一种,检测行的并发冲突,使用 rowversion 跟踪属性,如果在保存之前有修改,就报错 发生并发冲突的情况: 1.用户导航到实体编辑页面: 2.第一个用户的更改还未写入数据库之前,另一个用户更新同一实体: 此时,如果未启用并发检测,当发生更新时: 最后一个更新优先.即最后一个更新的值保存到数据库.而第一个保存的值将丢失. 举个例子: 1. J

  • ASP.NET Core 3.x 并发限制的实现代码

    前言 Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0后增加的,用于传入的请求进行排队处理,避免线程池的不足. 我们日常开发中可能常做的给某web服务器配置连接数以及,请求队列大小,那么今天我们看看如何在通过中间件形式实现一个并发量以及队列长度限制. Queue策略 添加Nuget Install-Package Microsoft.AspNetCore.ConcurrencyLimiter public void ConfigureSe

  • ASP.NET Core 使用Cookie验证身份的示例代码

    ASP.NET Core 1.x提供了通过Cookie 中间件将用户主体序列化为一个加密的Cookie,然后在后续请求中验证Cookie并重新创建主体,并将其分配给HttpContext.User属性.如果您要提供自己的登录界面和用户数据库,可以使用作为独立功能的Cookie中间件. ASP.NET Core 2.x的一个主要变化是不再存在Cookie中间件.取而代之的是在Startup.cs文件中的Configure方法中的调用UseAuthentication方法会添加设置HttpConte

  • ASP.NET Core使用SkiaSharp实现验证码的示例代码

    前言 本文并没有实现一个完成的验证码样例,只是提供了在当前.NET Core 2.0下使用Drawing API的另一种思路,并以简单Demo的形式展示出来. Skia Skia是一个开源的二维图形库,提供各种常用的API,并可在多种软硬件平台上运行.谷歌Chrome浏览器.Chrome OS.安卓.火狐浏览器.火狐操作系统以及其它许多产品都使用它作为图形引擎. Skia由谷歌出资管理,任何人都可基于BSD免费软件许可证使用Skia.Skia开发团队致力于开发其核心部分, 并广泛采纳各方对于Sk

  • asp.net Core中同名服务注册的实现代码

    目录 1.使用.net Core自带容器 2.AutoFac中的实现 通常情况下,在使用注入时一个服务接口对应一个实现类,注入方式采用构造函数注入即可,但如果存在多个类实现同一个接口的情况下,则需要根据实际情况来选择不同的实现类. 如以下代码中的MyEmailService和EmailService都实现了IEmailService接口: public class MyEmailService : IEmailService { public string Send(string Email)

  • 如何在ASP.NET Core中给上传图片功能添加水印实例代码

    前言 因某些原因需要在图片上添加文字水印.图片水印,所以这里做个简单的记录.下面话不多说了,来一起看看详细的实现过程吧 实现方法: 在传统的.NET框架中,我们给图片添加水印有的是通过HttpModules或者是HttpHandler,然后可以通过以下代码添加水印: var image = new WebImage(imageBytes); image.AddTextWatermark( Settings.Instance.WatermarkText, "White", Setting

  • asp.net core 腾讯验证码的接入示例代码

    Intro 之前使用的验证码服务是用的极验验证,而且是比较旧的,好久之前接入的,而且验证码服务依赖 Session,有点不太灵活,后来发现腾讯也有验证码服务,而且支持小程序,并且是唯一支持小程序的验证码..(垄断么..) 而且相比之下,腾讯验证码不需要依赖 Session,集成起来也比较方便,于是就用了腾讯验证码,详细参考:https://007.qq.com/product.html?ADTAG=index.block 验证流程 服务器端接入 using System.ComponentMod

  • ASP.NET Core中间件实现限流的代码

    目录 一.限流算法 1.计数器算法 1.1固定窗口算法 1.2滑动窗口算法 2.令牌桶算法 3.漏桶算法 二.ASP.NETCore中间件实现限流 1.中间件代码 2.在管道中的使用 一.限流算法 在高并发系统中,有三把利器用来保护系统:缓存.降级和限流. 本文主要是介绍限流,限流算法主要有以下三种: 1.计数器算法 固定窗口 滑动窗口 2.令牌桶算法 3.漏桶算法 1.计数器算法 1.1 固定窗口算法 计数器算法是限流算法里最简单也是最容易实现的一种算法.比如我们规定,对于A接口来说,我们1分

  • 详解ASP.NET Core 处理 404 Not Found

    问题 在没有修改任何配置的情况下,这是用户使用 Chrome 访问不存在的URL时会看到的内容: 幸运的是,处理错误状态代码非常简单,我们将在下面介绍三种技术. 解决方案 在以前的ASP.NET MVC版本中,主要在 web.config 中处理404错误的. 您可能记得在 <customErrors> 节点中配置ASP.NET管道处理404错误,以及在低版本的IIS中通过 <httpErrors> 节点处理 404错误.好像有点混乱. 在.Net Core中,情况就不同了,没有必

  • ASP.NET Core使用微软官方类库实现汉字转拼音

    本文实例为大家分享了ASP.NET Core实现汉字转拼音的具体代码,供大家参考,具体内容如下 一.NuGet包 拼音:Install-Package PinYinConverterCore 简体-繁体互转:Install-Package TraditionalChineseToSimplifiedConverter 二.C#代码 class Program { static void Main(string[] args) { string Name= "刘大大"; Console.

随机推荐