go实现一个分布式限流器的方法步骤

目录
  • 1. 接口定义
  • 2. LocalCounterLimiter
  • 3. LocalTokenBucketLimiter
  • 4. RedisCounterLimiter
  • 5. RedisTokenBucketLimiter

项目中需要对 api 的接口进行限流,但是麻烦的是,api 可能有多个节点,传统的本地限流无法处理这个问题。限流的算法有很多,比如计数器法,漏斗法,令牌桶法,等等。各有利弊,相关博文网上很多,这里不再赘述。

项目的要求主要有以下几点:

  • 支持本地/分布式限流,接口统一
  • 支持多种限流算法的切换
  • 方便配置,配置方式不确定

go 语言不是很支持 OOP,我在实现的时候是按 Java 的思路走的,所以看起来有点不伦不类,希望能抛砖引玉。

1. 接口定义

package ratelimit

import "time"

// 限流器接口
type Limiter interface {
    Acquire() error
    TryAcquire() bool
}

// 限流定义接口
type Limit interface {
    Name() string
    Key() string
    Period() time.Duration
    Count() int32
    LimitType() LimitType
}

// 支持 burst
type BurstLimit interface {
    Limit
    BurstCount() int32
}

// 分布式定义的 burst
type DistLimit interface {
    Limit
    ClusterNum() int32
}

type LimitType int32
const (
    CUSTOM LimitType = iota
    IP
)

Limiter 接口参考了 Google 的 guava 包里的 Limiter 实现。Acquire 接口是阻塞接口,其实还需要加上 context 来保证调用链安全,因为实际项目中并没有用到 Acquire 接口,所以没有实现完善;同理,超时时间的支持也可以通过添加新接口继承自 Limiter 接口来实现。TryAcquire 会立即返回。

Limit 抽象了一个限流定义,Key() 方法返回这个 Limit 的唯一标识,Name() 仅作辅助,Period() 表示周期,单位是秒,Count() 表示周期内的最大次数,LimitType()表示根据什么来做区分,如 IP,默认是 CUSTOM.

BurstLimit 提供突发的能力,一般是配合令牌桶算法。DistLimit 新增 ClusterNum() 方法,因为 mentor 要求分布式遇到错误的时候,需要退化为单机版本,退化的策略即是:2 节点总共 100QPS,如果出现分区,每个节点需要调整为各 50QPS

2. LocalCounterLimiter

package ratelimit

import (
    "errors"
    "fmt"
    "math"
    "sync"
    "sync/atomic"
    "time"
)

// todo timer 需要 stop
type localCounterLimiter struct {
    limit Limit

    limitCount int32 // 内部使用,对 limit.count 做了 <0 时的转换

    ticker *time.Ticker
    quit chan bool

    lock sync.Mutex
    newTerm *sync.Cond
    count int32
}

func (lim *localCounterLimiter) init() {
    lim.newTerm = sync.NewCond(&lim.lock)
    lim.limitCount = lim.limit.Count()

    if lim.limitCount < 0 {
        lim.limitCount = math.MaxInt32 // count 永远不会大于 limitCount,后面的写法保证溢出也没问题
    } else if lim.limitCount == 0  {
        // 禁止访问, 会无限阻塞
    } else {
        lim.ticker = time.NewTicker(lim.limit.Period())
        lim.quit = make(chan bool, 1)

        go func() {
            for {
                select {
                case <- lim.ticker.C:
                    fmt.Println("ticker .")
                    atomic.StoreInt32(&lim.count, 0)
                    lim.newTerm.Broadcast()

                    //lim.newTerm.L.Unlock()
                case <- lim.quit:
                    fmt.Println("work well .")
                    lim.ticker.Stop()
                    return
                }
            }
        }()
    }
}

// todo 需要机制来防止无限阻塞, 不超时也应该有个极限时间
func (lim *localCounterLimiter) Acquire() error {
    if lim.limitCount == 0 {
        return errors.New("rate limit is 0, infinity wait")
    }

    lim.newTerm.L.Lock()
    for lim.count >= lim.limitCount {
        // block instead of spinning
        lim.newTerm.Wait()
        //fmt.Println(count, lim.limitCount)
    }
    lim.count++
    lim.newTerm.L.Unlock()

    return nil
}

func (lim *localCounterLimiter) TryAcquire() bool {
    count := atomic.AddInt32(&lim.count, 1)
    if count > lim.limitCount {
        return false
    } else {
        return true
    }
}

代码很简单,就不多说了

3. LocalTokenBucketLimiter

golang 的官方库里提供了一个 ratelimiter,就是采用令牌桶的算法。所以这里并没有重复造轮子,直接代理了 ratelimiter。

package ratelimit

import (
    "context"
    "golang.org/x/time/rate"
    "math"
)

type localTokenBucketLimiter struct {
    limit Limit

    limiter *rate.Limiter // 直接复用令牌桶的
}

func (lim *localTokenBucketLimiter) init() {
    burstCount := lim.limit.Count()
    if burstLimit, ok := lim.limit.(BurstLimit); ok {
        burstCount = burstLimit.BurstCount()
    }

    count := lim.limit.Count()
    if count < 0 {
        count = math.MaxInt32
    }

    f := float64(count) / lim.limit.Period().Seconds()
    if f < 0 {
        f = float64(rate.Inf) // 无限
    } else if f == 0 {
        panic("为 0 的时候,底层实现有问题")
    }

    lim.limiter = rate.NewLimiter(rate.Limit(f), int(burstCount))
}

func (lim *localTokenBucketLimiter) Acquire() error {
    err := lim.limiter.Wait(context.TODO())
    return err
}

func (lim *localTokenBucketLimiter) TryAcquire() bool {
    return lim.limiter.Allow()
}

4. RedisCounterLimiter

package ratelimit

import (
    "math"
    "sync"
    "xg-go/log"
    "xg-go/xg/common"
)

type redisCounterLimiter struct {
    limit      DistLimit
    limitCount int32 // 内部使用,对 limit.count 做了 <0 时的转换

    redisClient *common.RedisClient

    once sync.Once // 退化为本地计数器的时候使用
    localLim Limiter

    //script string
}

func (lim *redisCounterLimiter) init() {
    lim.limitCount = lim.limit.Count()
    if lim.limitCount < 0 {
        lim.limitCount = math.MaxInt32
    }

    //lim.script = buildScript()
}

//func buildScript() string {
//  sb := strings.Builder{}
//
//  sb.WriteString("local c")
//  sb.WriteString("\nc = redis.call('get',KEYS[1])")
//  // 调用不超过最大值,则直接返回
//  sb.WriteString("\nif c and tonumber(c) > tonumber(ARGV[1]) then")
//  sb.WriteString("\nreturn c;")
//  sb.WriteString("\nend")
//  // 执行计算器自加
//  sb.WriteString("\nc = redis.call('incr',KEYS[1])")
//  sb.WriteString("\nif tonumber(c) == 1 then")
//  sb.WriteString("\nredis.call('expire',KEYS[1],ARGV[2])")
//  sb.WriteString("\nend")
//  sb.WriteString("\nif tonumber(c) == 1 then")
//  sb.WriteString("\nreturn c;")
//
//  return sb.String()
//}

func (lim *redisCounterLimiter) Acquire() error {
    panic("implement me")
}

func (lim *redisCounterLimiter) TryAcquire() (success bool) {
    defer func() {
        // 一般是 redis 连接断了,会触发空指针
        if err := recover(); err != nil {
            //log.Errorw("TryAcquire err", common.ERR, err)
            //success = lim.degradeTryAcquire()
            //return
            success = true
        }

        // 没有错误,判断是否开启了 local 如果开启了,把它停掉
        //if lim.localLim != nil {
        //  // stop 线程安全
        //  lim.localLim.Stop()
        //}
    }()

    count, err := lim.redisClient.IncrBy(lim.limit.Key(), 1)
    //panic("模拟 redis 出错")
    if err != nil {
        log.Errorw("TryAcquire err", common.ERR, err)
        panic(err)
    }

    // *2 是为了保留久一点,便于观察
    err = lim.redisClient.Expire(lim.limit.Key(), int(2 * lim.limit.Period().Seconds()))
    if err != nil {
        log.Errorw("TryAcquire error", common.ERR, err)
        panic(err)
    }

    // 业务正确的情况下 确认超限
    if int32(count) > lim.limitCount {
        return false
    }

    return true

    //keys := []string{lim.limit.Key()}
    //
    //log.Errorw("TryAcquire ", keys, lim.limit.Count(), lim.limit.Period().Seconds())
    //count, err := lim.redisClient.Eval(lim.script, keys, lim.limit.Count(), lim.limit.Period().Seconds())
    //if err != nil {
    //  log.Errorw("TryAcquire error", common.ERR, err)
    //  return false
    //}
    //
    //
    //typeName := reflect.TypeOf(count).Name()
    //log.Errorw(typeName)
    //
    //if count != nil && count.(int32) <= lim.limitCount {
    //
    //  return true
    //}
    //return false
}

func (lim *redisCounterLimiter) Stop() {
    // 判断是否开启了 local 如果开启了,把它停掉
    if lim.localLim != nil {
        // stop 线程安全
        lim.localLim.Stop()
    }
}

func (lim *redisCounterLimiter) degradeTryAcquire() bool {
    lim.once.Do(func() {
        count := lim.limit.Count() / lim.limit.ClusterNum()
        limit := LocalLimit {
            name: lim.limit.Name(),
            key: lim.limit.Key(),
            count: count,
            period: lim.limit.Period(),
            limitType: lim.limit.LimitType(),
        }

        lim.localLim = NewLimiter(&limit)
    })

    return lim.localLim.TryAcquire()
}

代码里回退的部分注释了,因为线上为了稳定,实习生的代码毕竟,所以先不跑。

本来原有的思路是直接用 lua 脚本在 redis 上保证原子操作,但是底层封装的库对于直接调 eval 跑的时候,会抛错,而且 source 是 go-redis 里面,赶 ddl 没有时间去 debug,所以只能用 incrBy + expire 分开来。

5. RedisTokenBucketLimiter

令牌桶的状态变量得放在一个 线程安全/一致 的地方,redis 是不二人选。但是令牌桶的算法核心是个延迟计算得到令牌数量,这个是一个很长的临界区,所以要么用分布式锁,要么直接利用 redis 的单线程以原子方式跑。一般业界是后者,即 lua 脚本维护令牌桶的状态变量、计算令牌。代码类似这种

local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)

local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local intval = tonumber(ARGV[5])

local fill_time = capacity/rate
local ttl = math.floor(fill_time*2) * intval

local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
  last_tokens = capacity
end

local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
  last_refreshed = 0
end

local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
if allowed then
  new_tokens = filled_tokens - requested
end

redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)

return { allowed, new_tokens }

到此这篇关于go实现一个分布式限流器的方法步骤的文章就介绍到这了,更多相关go 分布式限流器内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Go 基于令牌桶的限流器实现

    目录 简介 原理概述 具体实现原理 限流器如何限流 简介 如果一般流量过大,下游系统反应不过来,这个时候就需要限流了,其实和上地铁是一样的,就是减慢上游访问下游的速度. 限制访问服务的频次或者频率,防止服务过载,被刷爆等. Golang 官方扩展包 time(golang.org/x/time/rate) 中,提供了一个基于令牌桶等限流器实现. 原理概述 令牌:每次拿到令牌,才可访问 桶 ,桶的最大容量是固定的,以固定的频率向桶内增加令牌,直至加满 每个请求消耗一个令牌. 限流器初始化的时候,令

  • Golang实现请求限流的几种办法(小结)

    在开发高并发系统时,有三把利器用来保护系统:缓存.降级和限流.那么何为限流呢?顾名思义,限流就是限制流量,就像你宽带包了1个G的流量,用完了就没了. 简单的并发控制 利用 channel 的缓冲设定,我们就可以来实现并发的限制.我们只要在执行并发的同时,往一个带有缓冲的 channel 里写入点东西(随便写啥,内容不重要).让并发的 goroutine在执行完成后把这个 channel 里的东西给读走.这样整个并发的数量就讲控制在这个 channel的缓冲区大小上. 比如我们可以用一个 bool

  • Go实现各类限流的方法

    前 言 在开发高并发系统时,我们可能会遇到接口访问频次过高,为了保证系统的高可用和稳定性,这时候就需要做流量限制,你可能是用的 Nginx 这种来控制请求,也可能是用了一些流行的类库实现.限流是高并发系统的一大杀器,在设计限流算法之前我们先来了解一下它们是什么. 限 流 限流的目的是通过对并发访问请求进行限速,或者对一个时间窗口内的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务.排队或等待.降级等处理.通过对并发(或者一定时间窗口内)请求进行限速来保护系统,一旦达到限制速率则拒绝服务(定

  • golang高并发限流操作 ping / telnet

    需求 当需要同时ping/telnet多个ip时,可以通过引入ping包/telnet包实现,也可以通过go调用cmd命令实现,不过后者调用效率较差,所以这里选择ping包和telnet包 还有就是高并发的问题,可以通过shell脚本或者go实现高并发,所以我选择的用go自带的协程实现,但是如果要同时处理1000+个ip,考虑到机器的性能,需要ratelimit控制开辟的go协程数量,这里主要写一下我的建议和淌过的坑 ping 参考链接: https://github.com/sparrc/go

  • golang接口IP限流,IP黑名单,IP白名单的实例

    增加中间件 可以选择普通模式和LUA脚本模式,建议选择普通模式,实际上不需要控制的那么精确. package Middlewares import ( "github.com/gin-gonic/gin" "strconv" "time" "voteapi/pkg/app/response" "voteapi/pkg/gredis" "voteapi/pkg/util" ) const

  • go实现一个分布式限流器的方法步骤

    目录 1. 接口定义 2. LocalCounterLimiter 3. LocalTokenBucketLimiter 4. RedisCounterLimiter 5. RedisTokenBucketLimiter 项目中需要对 api 的接口进行限流,但是麻烦的是,api 可能有多个节点,传统的本地限流无法处理这个问题.限流的算法有很多,比如计数器法,漏斗法,令牌桶法,等等.各有利弊,相关博文网上很多,这里不再赘述. 项目的要求主要有以下几点: 支持本地/分布式限流,接口统一 支持多种限

  • 从零开始在NPM上发布一个Vue组件的方法步骤

    TL;DR 本文细致讲解了在NPM上发布一个Vue组件的全过程,包括创建项目.编写组件.打包和发布四个环节. 创建项目 这里我们直接利用@vue/cli来生成项目.如果没有安装@vue/cli的话,可以使用以下方法进行安装: # 如果喜欢npm npm i -g @vue/cli # 如果喜欢yarn yarn global add @vue/cli 此外,如果安装了npx(高版本的nodejs发行版会自带这一工具)的话,还可以很方便地通过npx vue这一方式实现免安装使用. 接下来就可以创建

  • Redis和Lua实现分布式限流器的方法详解

    主要是依靠 redis + lua 来实现限流器, 使用 lua 的原因是将多条命令合并在一起作为一个原子操作, 无需过多考虑并发. 计数器模式 原理 计数器算法是指在一段窗口时间内允许通过的固定数量的请求, 比如10次/秒, 500次/30秒. 如果设置的时间粒度越细, 那么限流会更平滑. 实现 所使用的 Lua 脚本 -- 计数器限流 -- 此处支持的最小单位时间是秒, 若将 expire 改成 pexpire 则可支持毫秒粒度. -- KEYS[1] string 限流的key -- AR

  • 如何给element添加一个抽屉组件的方法步骤

    近来因为业务需要,对比iview和element库,发现element确实要比实习期间使用的iview强大点,尤其文档更为友好,但是iview的组件功能更多一点,比如分割线和抽屉组件 今天特意手写一个抽屉组件,方便自己使用element库,写好的组件我已经放在我的githup了, 点这里 一.实践 1.分析 一个抽屉组件的z-index必定是在当前页面之上的,在抽屉主体之外的区域还会有一层半透明的遮罩层,知道这些就很容易了 // drawer.vue <template> <div cl

  • 阿里云快速搭建一个静态网站的方法步骤

    前言: 作为一个初级程序员,都梦想着自己能搭建一个自己的个人网站,同时展示给其他人浏览.如果你刚开始接触可看一下,我建议先给自己的静态网站发布到服务器上去. 准备: 1.申请注册一个服务器 申请注册一个云服务器,可以阿里云.腾讯云等等.学生党使用服务器有优惠哈~ 2.配置ftp\ssh环境 ps:我知道的是阿里云已经把ftp和ssh配置好了,如果有可以跳过此步骤. 具体步骤: 为了方便你后期的操作和使用,你需要配置ftp和ssh环境.(ftp:文件传输协议,通俗说就是上传下载文件:ssh:安全外

  • 基于Docker的Etcd分布式部署的方法步骤

    一 环境准备 1.1 基础环境 ntp配置:略 #建议配置ntp服务,保证时间一致性 etcd版本:v3.3.9 防火墙及SELinux:关闭防火墙和SELinux 名称 地址 主机名 备注 etcd1 172.24.8.71 etcd1.example.com 用于保存相关IP信息 docker01 172.24.8.72 docker01.example.com   docker02 172.24.8.73 docker02.example.com   # hostnamectl set-h

  • Docker创建一个Nginx服务器的方法步骤

    运行环境: MAC Docker 版本: Docker version 17.12.0-ce, build c97c6d6 一.启动Nginx 服务器 启动Nginx 服务器,并进入模拟终端 docker run -p 8080:80 --name nginx_web -it nginx /bin/bash 二.了解Nginx 镜像的配置文件位置 日志文件位置:/var/log/nginx 配置文件位置: /etc/nginx 资源存放的位置: /usr/share/nginx/html 上面的

  • 使用Django简单编写一个XSS平台的方法步骤

    1) 简要描述 原理十分简单2333,代码呆萌,大牛勿喷 >_< 2) 基础知识 XSS攻击基本原理和利用方法 Django框架的使用 3) Let's start 0x01 工欲善其事必先利其器,首先我们需要准备编写代码的各种工具和环境,这里不细说.我这里的环境和工具如下: python 3.7.0 pycharm windows 10 mysql 8.0.15 Django 2.1.3 需要用到的第三方库: django pymysql requests 0x02 我们先看一下XSS脚本是

  • 如何使用webpack打包一个库library的方法步骤

    日常我们开发了一个库之后,如何打包之后提供给别人使用呢?如果你不清楚,就继续看吧!!! 初始化库 mkdir library cd library npm init -y 经过以上步骤后会生成一个library文件夹,里面包含一个package.json文件.然后简单修改为如下所示: { "name": "library", "version": "1.0.0", "description": "

  • 基于vue如何发布一个npm包的方法步骤

    前言:工作的时候总是使用别人的npm包,然而我有时心底会好奇自己如何发布一个npm包呢,什么时候自己的包能够被很多人喜欢并使用呢...今天我终于迈出了第一步. 前提:会使用 npm,有 vue 基础,了解一点 webpack Are you ready? Go! 一.编写自己的npm包 1. 新建一个空文件夹 2. 进入文件夹,终端(cmd)运行 npm init 完成后会在目录下生成一个 package.json 文件 我们可以根据自己的需要补充文件内容 这是我的: { "name"

随机推荐