C++ 实现线程安全的频率限制器(推荐)

很早以前,在学习使用 Python 的deque容器时,我写了一篇文章python3 deque 双向队列创建与使用方法分析。最近需要压测线上服务的性能,又不愿意总是在 QA 那边排队,于是需要自己写一个压测用的客户端。其中一个核心需求就是要实现 QPS 限制。

于是,终究逃不开要在 C++ 中实现一个线程安全的频率限制器。

设计思路

所谓频率限制,就是要在一个时间段(inteval)中,限制操作的次数(limit)。这又可以引出两种强弱不同的表述:

  • 强表述:在任意一个长度等于设定的时间段(interval)的滑动窗口中,频率限制器放行的操作次数(count)都不高于限制次数(limit)。
  • 弱表述:在一组长度等于设定的时间段(interval)且紧密相连的固定窗口中,每一个窗口里频率限制器放行的操作次数(count)都不高于限制次数(limit)。

不难发现,强表述通过「滑动窗口」的方式,不仅限制了频率,还要求了操作在时间上的均匀性。前作的频率限制器,实际上对应了这里的强表述。但实际工程中,我们通常只需要实现弱表述的频率限制器就足够使用了。

对于弱表述,实现起来主要思路如下:

当操作计数(count)小于限制(limit)时直接放行;

单线程实现

在不考虑线程安全时,不难给出这样的实现。

struct ms_clock {
 using rep = std::chrono::milliseconds::rep;
 using period = std::chrono::milliseconds::period;
 using duration = std::chrono::duration<rep, period>;
 using time_point = std::chrono::time_point<ms_clock, duration>;

 static
 time_point now() noexcept {
 return time_point(std::chrono::duration_cast<duration>(
   std::chrono::steady_clock::now().time_since_epoch()));
 }
};
} // namespace __details

class RateLimiter {
 public:
 using clock = __details::ms_clock; // 1.

 private:
 const uint64_t limit_;
 const clock::duration interval_;
 const clock::rep interval_count_;

 mutable uint64_t count_{std::numeric_limits<uint64_t>::max()}; // 2.a.
 mutable clock::rep timestamp_{0};     // 2.b.

 public:
 constexpr RateLimiter(uint64_t limit, clock::duration interval) :
 limit_(limit), interval_(interval), interval_count_(interval_.count()) {}

 RateLimiter(const RateLimiter&) = delete;  // 3.a.
 RateLimiter& operator=(const RateLimiter&) = delete; // 3.b.
 RateLimiter(RateLimiter&&) = delete;   // 3.c.
 RateLimiter& operator=(RateLimiter&&) = delete; // 3.d.

 bool operator()() const;
 double qps() const {
 return 1000.0 * this->limit_ / this->interval_count_;
 }
};

bool RateLimiter::operator()() const {
 auto orig_count = this->count_++;
 if (orig_count < this->limit_) { // 4.
 return true;
 } else {
 auto ts = this->timestamp_;
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) { // 5.
 return false;
 }
 this->timestamp_ = now;
 this->count_ = 1;
 return true;
 }
}

这里,

(1) 表明频率限制器使用单位为毫秒的时钟。在实际使用时,也可以按需改成微妙甚至纳秒。

(2) 使用mutable修饰内部状态count_timestamp_。这是因为两个limit_interval_相同的频率限制器,在逻辑上是等价的,但他们的内部状态却不一定相同。因此,为了让const成员函数能够修改内部状态(而不改变逻辑等价),我们要给内部状态数据成员加上mutable修饰。

(2.a) 处将count_设置为类型可表示的最大值是为了让

(4) 的判断失败,而对timestamp_进行初始化。

(2.b) 处将timestamp_设置为0则是基于同样的原因,让 (5) 的判断失败。

(2.a) 和 (2.b) 处通过巧妙的初值设计,将初始化状态与后续正常工作状态的逻辑统一了起来,而无须丑陋的判断。

(3) 禁止了对象的拷贝和移动。这是因为一个频率限制器应绑定一组操作,而不应由两组或更多组操作共享(对于拷贝的情形),或是中途失效(对于移动的情形)。

如此一来,函数调用运算符就忠实地实现了前述逻辑。

多线程改造

第一步改造

当有多线程同时调用RateLimiter::operator()时,显而易见,在count_timestamp_上会产生竞争。我们有两种办法解决这个问题:要不然加锁,要不然把count_timestamp_设为原子变量然后用原子操作解决问题。于是,对函数调用运算符,我们有如下第一步的改造。

class RateLimiter {
 // 其余保持不变
 private:
 mutable std::atomic<uint64_t> count_{std::numeric_limits<uint64_t>::max()}; // 1.a.
 mutable std::atomic<clock::rep> timestamp_{0};     // 1.b.
 // 其余保持不变
};

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL); // 2.
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load(); // 3.
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 this->timestamp_.store(now); // 4.
 this->count_.store(1UL); // 5.
 return true;
 }
}

这里,

  • (1) 将count_timestamp_声明为原子的,从而方便后续进行原子操作。
  • (2) -- (5) 则将原有操作分别改为对应的原子操作。

这样看起来很完美,但其实是有 bug 的。我们重点关注 (4) 这里。(4) 的本意是更新timestamp_,以备下一次orig_count >= this->limit_时进行判断。准确设置这一timestamp是频率限制器正确工作的基石。但是,如果有两个(或更多)线程,同时走到了 (4)会发生什么?

  • 因为原子操作的存在,两个线程会先后执行 (4)。于是timestamp_的值究竟是什么,我们完全不可预期。
  • 此外,两个线程会先后执行 (5),即原子地将count_置为1。但是你想,频率限制器先后放行了两次操作,但为什么count_1呢?这是不是就「偷跑」了一次操作?

为此,我们要保证只有一个线程会真正设置timestamp_,而拒绝其他同样走到 (4) 位置的线程的操作,以避免其重复设置timestamp_以及错误地将count_再次置为1

第二步改进

于是有以下改进。

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL); // 3.
 if (orig_count < this->limit_) { // 4.
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) { // 5.
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) { // 1.
 return false;
 }
 this->count_.store(1UL); // 2.
 return true;
 }
}

这里,(1) 是一个 CAS 原子操作。它会原子地比较timestamp_ts的值(Compare):若他们相等,则将now的值写入timestamp_(Swap),并返回true;若他们不相等,则将timestamp_的值写入ts,并返回false。如果没有其他线程抢先修改timestamp_的值,那么 CAS 操作应该成功并返回true,继续执行后面的代码;否则,说明其他线程已经抢先修改了timestamp_,当前线程的操作被记入上一个周期而被频率限制器拒绝。

现在要考虑 (2)。如果执行完 (1) 之后立即立刻马上没有任何延迟地执行 (2),那么当然一切大吉。但如果这时候当前线程被切出去,会发生什么?我们要分情况讨论。

如果ts == 0,也就是「当前线程」是频率限制器第一次修改timestamp_。于是,当前线程可能会在 (3) 处将count_(溢出地)自增为0,从而可能有其他线程通过 (4)。此时,当前线程在当前分片有可能应当被拒绝操作。为此,我们需要在 (1) 和 (2) 之间做额外的判断。

if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL);
 return (orig_count < this->limit_);
}

如果ts != 0,也就是「当前线程」并非频率限制器第一次修改timestamp_。那么其他线程在 (4) 处必然判断失败,但在 (5) 处的判断可能成功,从而可能继续成功执行 (1),从而接连两次执行 (2)。但这不影响正确性。因为通过 (5) 表明相对当前线程填入的timestamp_,已经由过了一个时间段(interval),而在这个时间段里,只有当前线程的一次操作会被接受。

第三次改进

由此,我们得到:

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL);
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
 return false;
 }
 if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL);
 return (orig_count < this->limit_);
 }
 this->count_.store(1UL);
 return true;
 }
}

至此,我们的代码在逻辑上已经成立了,接下来要做一些性能方面的优化。

原子操作默认采用std::memory_order_seq_cst的内存模型。这是 C++ 中最严格的内存模型,它有两个保证:

  • 程序指令和源码顺序一致;
  • 所有线程的所有操作都有一致的顺序。

为了实现顺序一致性(sequential consistency),编译器会使用很多对抗编译器优化和 CPU 乱序执行(Out-of-Order Execution)的手段,因而性能较差。对于此处的count_,我们无需顺序一致性模型,只需要获取释放模型(Aquire-Release)模型就足够了。

第四次改进

于是有第四次改进:

bool RateLimiter::operator()() const {
 auto orig_count = this->count_.fetch_add(1UL, std::memory_order_acq_rel);
 if (orig_count < this->limit_) {
 return true;
 } else {
 auto ts = this->timestamp_.load();
 auto now = clock::now().time_since_epoch().count();
 if (now - ts < this->interval_count_) {
 return false;
 }
 if (not this->timestamp_.compare_and_exchange_strong(ts, now)) {
 return false;
 }
 if (ts == 0) {
 auto orig_count = this->count.fetch_add(1UL, std::memory_order_acq_rel);
 return (orig_count < this->limit_);
 }
 this->count_.store(1UL, std::memory_order_release);
 return true;
 }
}

至此,我们就完整实现了一个频率限制器,可以愉快地开始拉 QPS 压测了!

总结

到此这篇关于在 C++ 中实现一个线程安全的频率限制器的文章就介绍到这了,更多相关在 C++ 中实现一个线程安全的频率限制器内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • C++11/14 线程调用类对象和线程传参的方法

    线程调用类对象 在前面的示例中,我们为线程任务使用了通常的函数.实际上,我们可以使用任何可调用对象或者lambda函数,如下调用类对象的例子: #include <iostream> #include <thread> class MyFunctor { public: void operator()() { std::cout << "functor\n"; } }; int main() { MyFunctor fnctor; std::thre

  • C++ 线程(串行 并行 同步 异步)详解

    C++  线程(串行 并行 同步 异步)详解 看了很多关于这类的文章,一直没有总结.不总结的话就会一直糊里糊涂,以下描述都是自己理解的非官方语言,不一定严谨,可当作参考. 首先,进程可理解成一个可执行文件的执行过程.在ios app上的话我们可以理解为我们的app的.ipa文件执行过程也即app运行过程.杀掉app进程就杀掉了这个app在系统里运行所占的内存. 线程:线程是进程的最小单位.一个进程里至少有一个主线程.就是那个main thread.非常简单的app可能只需要一个主线程即UI线程.

  • 基于C++11的threadpool线程池(简洁且可以带任意多的参数)

    C++11 加入了线程库,从此告别了标准库不支持并发的历史.然而 c++ 对于多线程的支持还是比较低级,稍微高级一点的用法都需要自己去实现,譬如线程池.信号量等.线程池(thread pool)这个东西,在面试上多次被问到,一般的回答都是:"管理一个任务队列,一个线程队列,然后每次取一个任务分配给一个线程去做,循环往复." 貌似没有问题吧.但是写起程序来的时候就出问题了. 废话不多说,先上实现,然后再啰嗦.(dont talk, show me ur code !) 代码实现 #pra

  • C++多线程中的锁和条件变量使用教程

    在做多线程编程时,有两个场景我们都会遇到: 多线程访问共享资源,需要用到锁: 多线程间的状态同步,这个可用的机制很多,条件变量是广泛使用的一种. 今天我用一个简单的例子来给大家介绍下锁和条件变量的使用. 代码使用C++11 示例代码 #include <iostream> #include <mutex> #include <thread> #include <condition_variable> std::mutex g_mutex; // 用到的全局锁

  • C++ 实现线程安全的频率限制器(推荐)

    很早以前,在学习使用 Python 的deque容器时,我写了一篇文章python3 deque 双向队列创建与使用方法分析.最近需要压测线上服务的性能,又不愿意总是在 QA 那边排队,于是需要自己写一个压测用的客户端.其中一个核心需求就是要实现 QPS 限制. 于是,终究逃不开要在 C++ 中实现一个线程安全的频率限制器. 设计思路 所谓频率限制,就是要在一个时间段(inteval)中,限制操作的次数(limit).这又可以引出两种强弱不同的表述: 强表述:在任意一个长度等于设定的时间段(in

  • java多线程编程之线程的生命周期

    复制代码 代码如下: // 开始线程public void start( );public void run( ); // 挂起和唤醒线程public void resume( ); // 不建议使用public void suspend( );// 不建议使用public static void sleep(long millis);public static void sleep(long millis, int nanos); // 终止线程public void stop( );   /

  • 用Python生成器实现微线程编程的教程

    微线程领域(至少在 Python 中)一直都是 Stackless Python 才能涉及的特殊增强部分.关于 Stackless 的话题以及最近它经历的变化,可能本身就值得开辟一个专栏了.但其中简单的道理就是,在"新的 Stackless"下,延续(continuation)显然是不合时宜的,但微线程还是这个项目 存在的理由.这一点很复杂-- 刚开始,我们还是先来回顾一些内容.那么,什么是微线程呢? 微线程基本上可以说是只需要很少的内部资源就可以运行的进程 ― 并且是在 Python

  • java线程之线程的生命周期的使用

    与人有生老病死一样,线程也同样要经历开始(等待).运行.挂起和停止四种不同的状态.这四种状态都可以通过Thread类中的方法进行控制.下面给出了Thread类中和这四种状态相关的方法. 复制代码 代码如下: // 开始线程     public void start( );     public void run( ); // 挂起和唤醒线程     public void resume( );     // 不建议使用     public void suspend( );    // 不建议

  • C#使用委托的形式调用线程代码实例

    委托 对于委托,我们都知道他是一个引用类型,具有引用类型所具有的通性.需要知道的是它保存的不是实际值,只是是保存对存储在托管堆中的对象的引用.或说的直接点,委托就相当于叫人帮忙,让你帮你做一些事情.我这里就使用委托的形式,调用线程,来简单的说一下. 代码如下: using System; using System.Threading; namespace _012_线程 { class Program { static void Main(string[] args) //在mian中线程是执行

  • 搞懂Java线程池

    身为程序员我们对线程是再熟悉不过了,多线程并发算是Java进阶的知识,用好多线程不容易有太多的坑.创建线程也算是一个"重"操作.创建线程的语句是new Thread()咋一看好像就是new了一个对象. 没错是new了个对象,但是不仅仅是普通对象那样在堆中分配了一块内存,它还需要调用操作系统内核API,然后操作系统再为线程分配一些资源.所以较普通对象,线程就比较"重了".所以我们要避免频繁的创建和销毁线程,还得控制一下线程的数量.线程池就是用来完成这一项使命的. 所以

  • 浅析java线程中断的办法

    中断线程相关的方法 中断线程有一些相应的方法,这里列出来一下. 注意,如果是Thread.method(),则代表是静态方法.如果是thread.method()则代表着是类方法 void thread.stop() 这个方法能中断正在运行的线程,但是已经不推荐使用了,在将来的版本或许弃用,因为强行中断运行中的线程,是不安全的. void thread.interrupt() 如果正在运行wait(),sleep(),join()这三个方法阻塞了线程,那么将会使得线程抛出InterruptedE

  • Winform中如何跨线程访问UI元素

    在C# 的应用程序开发中, 我们经常要把UI线程和工作线程分开,防止界面停止响应, 同时我们又需要在工作线程中更新UI界面上的控件.但直接访问会出现"线程间操作无效"的情况,因为.NET禁止了跨线程调用控件, 否则谁都可以操作控件,最后可能造成错误. 下面介绍几种跨线程访问的方法: 1.禁止对跨线程访问做检查 (不推荐使用这种方法) 这种方法不检查跨线程访问,允许各个线程操作UI元素,容易出现错误. public Form2() { InitializeComponent(); //禁

  • 如何计算Web动画帧率FPS

    目录 流畅动画的标准 法一:借助 Chrome 开发者工具 法二:借助 Frame Timing API Blink 内核早期架构 JS 动画与 CSS 动画的细微区别 什么是 Frame Timing API ? Frame Timing API 示意 法三:借助 requestAnimationFrame API 使用 requestAnimationFrame 计算 FPS 原理 流畅动画的标准 首先,理清一些概念.FPS 表示的是每秒钟画面更新次数.我们平时所看到的连续画面都是由一幅幅静

  • Java多线程的用法详解

    1.创建线程 在Java中创建线程有两种方法:使用Thread类和使用Runnable接口.在使用Runnable接口时需要建立一个Thread实例.因此,无论是通过Thread类还是Runnable接口建立线程,都必须建立Thread类或它的子类的实例.Thread构造函数: public Thread( );  public Thread(Runnable target);  public Thread(String name);  public Thread(Runnable target

随机推荐