基于.net的分布式系统限流组件示例详解

前言

在互联网应用中,流量洪峰是常有的事情。在应对流量洪峰时,通用的处理模式一般有排队、限流,这样可以非常直接有效的保护系统,防止系统被打爆。另外,通过限流技术手段,可以让整个系统的运行更加平稳。今天要与大家分享一下限流算法和C#版本的组件。

一、令牌桶算法:

令牌桶算法的基本过程如下:

  • 假如用户配置的平均发送速率为r,则每隔1/r秒速率将一个令牌被加入到桶中;
  • 假设桶最多可以存发b个令牌。当桶中的令牌达到上限后,丢弃令牌。
  • 当一个有请求到达时,首先去令牌桶获取令牌,能够取到,则处理这个请求
  • 如果桶中没有令牌,那么请求排队或者丢弃

工作过程包括3个阶段:产生令牌、消耗令牌和判断数据包是否通过。其中涉及到2个参数:令牌产生的速率和令牌桶的大小,这个过程的具体工作如下。

  • 产生令牌:周期性的以固定速率向令牌桶中增加令牌,桶中的令牌不断增多。如果桶中令牌数已到达上限,则丢弃多余令牌。
  • 消费 令牌:业务程序根据具体业务情况消耗桶中的令牌。消费一次,令牌桶令牌减少一个。
  • 判断是否通过:判断是否已有令牌桶是否存在有效令牌,当桶中的令牌数量可以满足需求时,则继续业务处理,否则将挂起业务,等待令牌。

下面是C#的一个实现方式

class TokenBucketLimitingService: ILimitingService
 {
 private LimitedQueue<object> limitedQueue = null;
 private CancellationTokenSource cancelToken;
 private Task task = null;
 private int maxTPS;
 private int limitSize;
 private object lckObj = new object();
 public TokenBucketLimitingService(int maxTPS, int limitSize)
 {
  this.limitSize = limitSize;
  this.maxTPS = maxTPS;

  if (this.limitSize <= 0)
  this.limitSize = 100;
  if(this.maxTPS <=0)
  this.maxTPS = 1;

  limitedQueue = new LimitedQueue<object>(limitSize);
  for (int i = 0; i < limitSize; i++)
  {
  limitedQueue.Enqueue(new object());
  }
  cancelToken = new CancellationTokenSource();
  task = Task.Factory.StartNew(new Action(TokenProcess), cancelToken.Token);
 }

 /// <summary>
 /// 定时消息令牌
 /// </summary>
 private void TokenProcess()
 {
  int sleep = 1000 / maxTPS;
  if (sleep == 0)
  sleep = 1;

  DateTime start = DateTime.Now;
  while (cancelToken.Token.IsCancellationRequested ==false)
  {
  try
  {
   lock (lckObj)
   {
   limitedQueue.Enqueue(new object());
   }
  }
  catch
  {
  }
  finally
  {
   if (DateTime.Now - start < TimeSpan.FromMilliseconds(sleep))
   {
   int newSleep = sleep - (int)(DateTime.Now - start).TotalMilliseconds;
   if (newSleep > 1)
    Thread.Sleep(newSleep - 1); //做一下时间上的补偿
   }
   start = DateTime.Now;
  }
  }
 }

 public void Dispose()
 {
  cancelToken.Cancel();
 }

 /// <summary>
 /// 请求令牌
 /// </summary>
 /// <returns>true:获取成功,false:获取失败</returns>
 public bool Request()
 {
  if (limitedQueue.Count <= 0)
  return false;
  lock (lckObj)
  {
  if (limitedQueue.Count <= 0)
   return false;

  object data = limitedQueue.Dequeue();
  if (data == null)
   return false;
  }

  return true;
 }
 }
public interface ILimitingService:IDisposable
 {
  /// <summary>
  /// 申请流量处理
  /// </summary>
  /// <returns>true:获取成功,false:获取失败</returns>
  bool Request();
 }
public class LimitingFactory
 {
  /// <summary>
  /// 创建限流服务对象
  /// </summary>
  /// <param name="limitingType">限流模型</param>
  /// <param name="maxQPS">最大QPS</param>
  /// <param name="limitSize">最大可用票据数</param>
  public static ILimitingService Build(LimitingType limitingType = LimitingType.TokenBucket, int maxQPS = 100, int limitSize = 100)
  {
  switch (limitingType)
  {
   case LimitingType.TokenBucket:
   default:
   return new TokenBucketLimitingService(maxQPS, limitSize);
   case LimitingType.LeakageBucket:
   return new LeakageBucketLimitingService(maxQPS, limitSize);
  }
  }
 }

 /// <summary>
 /// 限流模式
 /// </summary>
 public enum LimitingType
 {
  TokenBucket,//令牌桶模式
  LeakageBucket//漏桶模式
 }

public class LimitedQueue<T> : Queue<T>
 {
  private int limit = 0;
  public const string QueueFulled = "TTP-StreamLimiting-1001";

 public int Limit
  {
  get { return limit; }
  set { limit = value; }
  }

 public LimitedQueue()
  : this(0)
  { }

 public LimitedQueue(int limit)
  : base(limit)
  {
  this.Limit = limit;
  }

 public new bool Enqueue(T item)
  {
  if (limit > 0 && this.Count >= this.Limit)
  {
   return false;
  }
  base.Enqueue(item);
  return true;
  }
 }

调用方法:

var service = LimitingFactory.Build(LimitingType.TokenBucket, 500, 200);
while (true)
{
 var result = service.Request();
 //如果返回true,说明可以进行业务处理,否则需要继续等待
 if (result)
 {
  //业务处理......
 }
 else
  Thread.Sleep(1);
}

二、漏桶算法

声明一个固定容量的桶,每接受到一个请求向桶中添加一个令牌,当令牌桶达到上线后请求丢弃或等待,具体算法如下:

  • 创建一个固定容量的漏桶,请求到达时向漏桶添加一个令牌
  • 如果请求添加令牌不成功,请求丢弃或等待
  • 另一个线程以固定的速率消费桶里的令牌

工作过程也包括3个阶段:产生令牌、消耗令牌和判断数据包是否通过。其中涉及到2个参数:令牌自动消费的速率和令牌桶的大小,个过程的具体工作如下。

  • 产生令牌:业务程序根据具体业务情况申请令牌。申请一次,令牌桶令牌加一。如果桶中令牌数已到达上限,则挂起业务后等待令牌。
  • 消费令牌:周期性的以固定速率消费令牌桶中令牌,桶中的令牌不断较少。
  • 判断是否通过:判断是否已有令牌桶是否存在有效令牌,当桶中的令牌数量可以满足需求时,则继续业务处理,否则将挂起业务,等待令牌。

C#的一个实现方式:

class LeakageBucketLimitingService: ILimitingService
  {
   private LimitedQueue<object> limitedQueue = null;
   private CancellationTokenSource cancelToken;
   private Task task = null;
   private int maxTPS;
   private int limitSize;
   private object lckObj = new object();
   public LeakageBucketLimitingService(int maxTPS, int limitSize)
   {
    this.limitSize = limitSize;
    this.maxTPS = maxTPS;

   if (this.limitSize <= 0)
     this.limitSize = 100;
    if (this.maxTPS <= 0)
     this.maxTPS = 1;

   limitedQueue = new LimitedQueue<object>(limitSize);
    cancelToken = new CancellationTokenSource();
    task = Task.Factory.StartNew(new Action(TokenProcess), cancelToken.Token);
   }

  private void TokenProcess()
   {
    int sleep = 1000 / maxTPS;
    if (sleep == 0)
     sleep = 1;

   DateTime start = DateTime.Now;
    while (cancelToken.Token.IsCancellationRequested == false)
    {
     try
     {

     if (limitedQueue.Count > 0)
      {
       lock (lckObj)
       {
        if (limitedQueue.Count > 0)
         limitedQueue.Dequeue();
       }
      }
     }
     catch
     {
     }
     finally
     {
      if (DateTime.Now - start < TimeSpan.FromMilliseconds(sleep))
      {
       int newSleep = sleep - (int)(DateTime.Now - start).TotalMilliseconds;
       if (newSleep > 1)
        Thread.Sleep(newSleep - 1); //做一下时间上的补偿
      }
      start = DateTime.Now;
     }
    }
   }

  public void Dispose()
   {
    cancelToken.Cancel();
   }

  public bool Request()
   {
    if (limitedQueue.Count >= limitSize)
     return false;
    lock (lckObj)
    {
     if (limitedQueue.Count >= limitSize)
      return false;

    return limitedQueue.Enqueue(new object());
    }
   }
  }

调用方法:

var service = LimitingFactory.Build(LimitingType.LeakageBucket, 500, 200);
while (true)
{
  var result = service.Request();
  //如果返回true,说明可以进行业务处理,否则需要继续等待
  if (result)
  {
    //业务处理......
  }
  else
   Thread.Sleep(1);
}

两类限流算法虽然非常相似,但是还是有些区别的,供大家参考!

漏桶算法能够强行限制数据的传输速率。在某些情况下,漏桶算法不能够有效地使用网络资源。因为漏桶的漏出速率是固定的。

令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发传输.

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • 浅谈ASP.NET Core中间件实现分布式 Session

    1.1. 中间件原理 1.1.1. 什么是中间件 中间件是段代码用于处理请求和响应,通常多个中间件链接起来形成管道,由每个中间件自己来决定是否要调用下一个中间件. 1.1.2. 中间件执行过程 举一个示例来演示中间件的执行过程(分别有三个中间件:日志记录.权限验证和路由):当请求进入应用程序时,执行执行日志记录的中间件,它记录请求属性并调用链中的下一个中间件权限验证,如果权限验证通过则将控制权传递给下一个中间件,不通过则设置401 HTTP代码并返回响应,响应传递给日志中间件进行返回. 1.1.

  • ASP.NET通过分布式Session提升性能

    如果我们正在使用Session,那么构建高性能可扩展的ASP.NET网站,就必须解决分布式Session的架构,因为单服务器的 SESSION处理能力会很快出现性能瓶颈,这类问题也被称之为Session同步.微软有自己的分布式Session的解决方案,那就是 SessionStateServer,我们可以参考: ASP.NET Session State Partitioning  http://blog.maartenballiauw.be/post/2008/01/23/ASPNET-Ses

  • 基于.net的分布式系统限流组件示例详解

    前言 在互联网应用中,流量洪峰是常有的事情.在应对流量洪峰时,通用的处理模式一般有排队.限流,这样可以非常直接有效的保护系统,防止系统被打爆.另外,通过限流技术手段,可以让整个系统的运行更加平稳.今天要与大家分享一下限流算法和C#版本的组件. 一.令牌桶算法: 令牌桶算法的基本过程如下: 假如用户配置的平均发送速率为r,则每隔1/r秒速率将一个令牌被加入到桶中: 假设桶最多可以存发b个令牌.当桶中的令牌达到上限后,丢弃令牌. 当一个有请求到达时,首先去令牌桶获取令牌,能够取到,则处理这个请求 如

  • 基于Opencv图像识别实现答题卡识别示例详解

    目录 1. 项目分析 2.项目实验 3.项目结果 总结 在观看唐宇迪老师图像处理的课程中,其中有一个答题卡识别的小项目,在此结合自己理解做一个简单的总结. 1. 项目分析 首先在拿到项目时候,分析项目目的是什么,要达到什么样的目标,有哪些需要注意的事项,同时构思实验的大体流程. 图1. 答题卡测试图像 比如在答题卡识别的项目中,针对测试图片如图1 ,首先应当实现的功能是: 能够捕获答题卡中的每个填涂选项. 将获取的填涂选项与正确选项做对比计算其答题正确率. 2.项目实验 在对测试图像进行形态学操

  • Python基于keras训练实现微笑识别的示例详解

    目录 一.数据预处理 二.训练模型 创建模型 训练模型 训练结果 三.预测 效果 四.源代码 pretreatment.py train.py predict.py 一.数据预处理 实验数据来自genki4k 提取含有完整人脸的图片 def init_file():     num = 0     bar = tqdm(os.listdir(read_path))     for file_name in bar:         bar.desc = "预处理图片: "      

  • Java Stream流语法示例详解

    目录 如何使用Stream? Stream的操作分类 1.创建流 2.操作流 1)过滤 2)映射 3)匹配 4)组合 3.转换流 如何使用Stream? 聚合操作是Java 8针对集合类,使编程更为便利的方式,可以与Lambda表达式一起使用,达到更加简洁的目的. 前面例子中,对聚合操作的使用可以归结为3个部分: 1)  创建Stream:通过stream()方法,取得集合对象的数据集. 2)  Intermediate:通过一系列中间(Intermediate)方法,对数据集进行过滤.检索等数

  • Sentinel热门词汇限流的实现详解

    目录 热点参数限流 基本使用 热点参数限流 何为热点?热点即经常访问的数据.很多时候我们希望统计某个热点数据中访问频次最高的 Top K 数据,并对其访问进行限制.比如: 商品 ID 为参数,统计一段时间内最常购买的商品 ID 并进行限制 用户 ID 为参数,针对一段时间内频繁访问的用户 ID 进行限制 热点参数限流会统计传入参数中的热点参数,并根据配置的限流阈值与模式,对包含热点参数的资源调用进行限流.热点参数限流可以看做是一种特殊的流量控制,仅对包含热点参数的资源调用生效. 基本使用 1.引

  • Java I/O流使用示例详解

    目录 1.java IO包 2.创建文件 3.获取文件信息 4.目录操作 5.字节输入流InputStream 6.字节输出流FileOutputStream 7.模拟文件拷贝 8.字符输入流FileReader 9.字符输出流FileWriter 1.java IO包 Java.io 包几乎包含了所有操作输入.输出需要的类.所有这些流类代表了输入源和输出目标. Java.io 包中的流支持很多种格式,比如:基本类型.对象.本地化字符集等等. 一个流可以理解为一个数据的序列.输入流表示从一个源读

  • vue实现鼠标滑动预览视频封面组件示例详解

    目录 组件效果 组件设计 1.视频截取关键帧 2.鼠标移入封面时显示对应关键帧 3.视频和封面的状态切换 功能实现 1.视频截取关键帧图片列表 1.1 截取指定帧 1.2 截取stepNums张关键帧图片 2.鼠标移入封面时显示对应关键帧 2.1 鼠标移动事件监听 2.2 鼠标移出事件监听 3.视频和封面的状态切换 3.1 播放视频 3.2 视频暂停 组件使用 组件库引用 组件效果 https://www.jb51.net/Special/926.htm 组件设计 我们首先应该要对组件进行一个简

  • Vue.extend实现组件库message组件示例详解

    目录 概述 Vue.extend message 组件配置对象(就是.vue文件) message 生成组件的函数 使用方法 效果图 总结 概述 当我们使用组件库的时候,某些组件并不是直接放到模板当中进行使用,而是通过api的方式调用生成组件并且挂在到我们的页面中,其中最常见的就是message组件,我们在组件库中看到的多数都是api调用的方式生成.记录自己基本实现message组件. Vue.extend 在vue中,要实现通过api方式实现组件的使用,这个aip是必不可少的,因此我们先了解下

  • Sentinel热点key限流的实现详解

    目录 基本介绍 兜底方法 参数例外项 基本介绍 何为热点 热点即经常访问的数据,很多时候我们希望统计或者限制某个热点数据中访问频次最高的TopN数据,并对其访问进行限流或者其它操作 兜底方法 分为系统默认和客户自定义两种 之前的case,限流出问题后,都是用sentinel系统默认的提示:Blocked by Sentinel (flow limiting) 我们能不能自定?类似hystrix,某个方法出问题了,就找对应的兜底降级方法? 结论: 从HystrixCommand 到@Sentine

  • Python 异步之非阻塞流使用示例详解

    目录 1. 异步流 2. 如何打开连接 3. 如何启动服务器 4. 如何使用 StreamWriter 写入数据 5. 如何使用 StreamReader 读取数据 6. 如何关闭连接 1. 异步流 asyncio 的一个主要好处是能够使用非阻塞流. Asyncio 提供非阻塞 I/O 套接字编程.这是通过流提供的. 可以打开提供对流写入器和流写入器的访问的套接字.然后可以使用协同程序从流中写入和读取数据,并在适当的时候暂停.完成后,可以关闭套接字. 异步流功能是低级的,这意味着必须手动实现所需

随机推荐