redisson分布式限流RRateLimiter源码解析

目录
  • 分布式限流-单位时间多实例多线程访问次数限制
    • 1.简单使用
    • 2. 实现限流redisson使用了哪些redis数据结构
    • 3. 超过10s,我再次获取一个令牌,数据结构发生的变化
    • 4. 源码浅析

分布式限流-单位时间多实例多线程访问次数限制

接前面聊一聊redisson及优雅实现 和 说一说spring boot优雅集成redisson,简单以源码的方式给大家介绍了redisson的:可重入性、阻塞、续约、红锁、联锁、加锁解锁流程和集成spring boot注意点和优雅实现方式。

接下来在讲一讲平时用的比较多的限流模块--RRateLimiter

1.简单使用

 public static void main(String[] args) throws InterruptedException {
        RRateLimiter rateLimiter = createLimiter();
        int allThreadNum = 20;
        CountDownLatch latch = new CountDownLatch(allThreadNum);
        long startTime = System.currentTimeMillis();
        for (int i = 0; i < allThreadNum; i++) {
//            new Thread(() -> {
            if(i % 3 == 0) Thread.sleep(1000);
            boolean pass = rateLimiter.tryAcquire();
            if(pass) {
                log.info("get ");
            } else {
                log.info("no");
            }
//          latch.countDown();
//            }).start();
        }
//        latch.await();
        System.out.println("Elapsed " + (System.currentTimeMillis() - startTime));
    }
    public static RRateLimiter createLimiter() {
        Config config = new Config();
        config.useSingleServer()
                .setTimeout(1000000)
                .setPassword("123456")
                .setAddress("redis://xxxx:6379");
        RedissonClient redisson = Redisson.create(config);
        RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter3");
        // 初始化:PER_CLIENT 单实例执行,OVERALL 全实例执行
        // 最大流速 = 每10秒钟产生3个令牌
        rateLimiter.trySetRate(RateType.OVERALL, 3, 10, RateIntervalUnit.SECONDS);
        return rateLimiter;
    }

实际结果:

[2022-10-29 14:32:46.261][INFO ][main][][] RedisTest - get 
[2022-10-29 14:32:46.312][INFO ][main][][] RedisTest - get 
[2022-10-29 14:32:46.358][INFO ][main][][] RedisTest - get 
[2022-10-29 14:32:47.416][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:47.469][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:47.517][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:48.577][INFO ][main][][] RedisTest - no
[2022-10-29 14:32:48.623][INFO ][main][][] RedisTest - no

2. 实现限流redisson使用了哪些redis数据结构

  • Hash结构 -- 限流器结构:

参数rate代表速率

参数interval代表多少时间内产生的令牌

参数type代表单机还是集群

  • ZSET结构 -- 记录获取令牌的时间戳,用于时间对比。

1667025166312 --> 2022-10-29 14:32:46

1667025166262 --> 2022-10-29 14:32:46

1667025166215 --> 2022-10-29 14:32:46

  • String结构 --记录的是当前令牌桶中的令牌数【很明显被我用完了现在是0】

3. 超过10s,我再次获取一个令牌,数据结构发生的变化

  • ZSET结构。-- 新生成一个ZSET结构,存放获取令牌的时间戳

  • String 结构 --当前令牌桶还有2个令牌

4. 源码浅析

RRateLimiter rateLimiter = redisson.getRateLimiter("myRateLimiter3");
// 初始化
// 最大流速 = 每10秒钟产生3个令牌
rateLimiter.trySetRate(RateType.PER_CLIENT, 3, 10, RateIntervalUnit.SECONDS);

初始化定义没有什么好讲的,就是创建HASH结构

主要还是讲讲: rateLimiter.tryAcquire()

private <T> RFuture<T> tryAcquireAsync(RedisCommand<T> command, Long value) {
    return this.commandExecutor.evalWriteAsync(
        this.getName(), LongCodec.INSTANCE, command,
        "local rate = redis.call('hget', KEYS[1], 'rate');
        local interval = redis.call('hget', KEYS[1], 'interval');
        local type = redis.call('hget', KEYS[1], 'type');
        assert(
            rate ~= false and interval ~= false and type ~= false,  'RateLimiter is not initialized'
            )
            local valueName = KEYS[2];
            local permitsName = KEYS[4];
            if type == '1' then valueName = KEYS[3];
            permitsName = KEYS[5];
            end;
            local currentValue = redis.call('get', valueName);
            if currentValue ~= false
            then
            local expiredValues = redis.call(
                'zrangebyscore', permitsName, 0, tonumber(ARGV[2]
                )
                - interval
                );
                local released = 0;
                for i, v in ipairs(expiredValues)
                do local random, permits = struct.unpack('fI', v);
                released = released + permits;end;
                if released > 0
                then
                redis.call('zrem', permitsName, unpack(expiredValues));
                currentValue = tonumber(currentValue) + released;
                redis.call('set', valueName, currentValue);
                end;
                if tonumber(currentValue) < tonumber(ARGV[1])
                then
                local nearest = redis.call(
                    'zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2])
                    - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1);
                    local random, permits = struct.unpack('fI', nearest[1]);
                    return tonumber(nearest[2])
                    - (tonumber(ARGV[2])
                    - interval);
                    else
                    redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1]));
                    redis.call('decrby', valueName, ARGV[1]);
                    return nil;
                    end;
                    else
                    assert(tonumber(rate) >= tonumber(ARGV[1]), 'Requested permits amount could not exceed defined rate');
                    redis.call('set', valueName, rate);
                    redis.call('zadd', permitsName, ARGV[2], struct.pack('fI', ARGV[3], ARGV[1]));
                    redis.call('decrby', valueName, ARGV[1]);
                    return nil;
                    end;", Arrays.asList(this.getName(),
                    this.getValueName(),
                    this.getClientValueName(),
                    this.getPermitsName(),
                    this.getClientPermitsName()),
                    new Object[]{value,
                    System.currentTimeMillis(),
                    ThreadLocalRandom.current().nextLong()
                    }
                );
}

主要就是这段lua代码,下面我详细过一下

作者目前用的3.16.3版本,刚好遇见redisson的bug,见3197,请大家用最新版本,以下为修复后解析。

-- 获取hash结构的速率
local rate = redis.call("hget", KEYS[1], "rate")
-- 获取hash结构的时间区间(ms)
local interval = redis.call("hget", KEYS[1], "interval")
-- 获取hash结构的时间类型
local type = redis.call("hget", KEYS[1], "type")
-- 判断是否初始化限流结构
assert(rate ~= false and interval ~= false and type ~= false, "RateLimiter is not initialized")
-- {name}:value string结构,这个key记录的是当前令牌桶中的令牌数
local valueName = KEYS[2]
-- {name}:permits zset结构,记录了请求的令牌数,score则为请求的时间戳
local permitsName = KEYS[4]
-- 单机限流才会用到,集群模式不用关注
if type == "1" then
    valueName = KEYS[3]
    permitsName = KEYS[5]
end
-- 生产速率rate必须比请求的令牌数大
assert(tonumber(rate) >= tonumber(ARGV[1]), "Requested permits amount could not exceed defined rate")
-- 初始化RateLimiter并不会初始化stirng结构,因此第一次获取这里currentValue是null
local currentValue = redis.call("get", valueName)
if currentValue ~= false then
    -- 第二次获取令牌执行
    -------------------------- 获取zset结构:统计之前的请求令牌数
    -- 范围是0 ~ (第二次请求时间戳 - 令牌生产的时间)
    local expiredValues = redis.call("zrangebyscore", permitsName, 0, tonumber(ARGV[2]) - interval)
    local released = 0
    -- lua迭代器,遍历expiredValues,如果有值,那么released等于之前所有请求的令牌数之和,表示应该释放多少令牌
    for i, v in ipairs(expiredValues) do
        -- 获取请求数permits
        local random, permits = struct.unpack("fI", v)
        released = released + permits
    end
    -- 之前的请求令牌数 > 0, 例如10s产生3个令牌,现在超过10s了,重置周期并计算剩余令牌数
    if released > 0 then
        -- 移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】
        redis.call("zrem", permitsName, unpack(expiredValues))
        currentValue = tonumber(currentValue) + released
        ------------------------- 更新string结构:=剩下令牌数+释放令牌数
        redis.call("set", valueName, currentValue)
    end
    -- 如果当前令牌数 < 请求的令牌数
    if tonumber(currentValue) < tonumber(ARGV[1]) then
        -- 从zset中找到距离当前时间最近的那个请求,也就是上一次放进去的请求信息
        local nearest = redis.call('zrangebyscore', permitsName, '(' .. (tonumber(ARGV[2]) - interval), tonumber(ARGV[2]), 'withscores', 'limit', 0, 1);
        local random, permits = struct.unpack("fI", nearest[1])
        -- 返回 上一次请求的时间戳 - (当前时间戳 - 令牌生成的时间间隔) 这个值表示还需要多久才能生产出足够的令牌
        return tonumber(nearest[2]) - (tonumber(ARGV[2]) - interval)
    else
        -- 如果当前令牌数 ≥ 请求的令牌数,表示令牌够多,更新zset
        ------------------------- 更新zset结构
        redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
        ------------------------- 更新Stringt结构,减少一个剩下的令牌数
        redis.call("decrby", valueName, ARGV[1])
        return nil
    end
else
    --------汀雨笔记----------------- 初始化Stringt结构,当前限流器的令牌数
    redis.call("set", valueName, rate)
    --------汀雨笔记----------------- 初始化zset结构
    redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))
    -- struct.pack第一个参数表示格式字符串,f是浮点数、I是长整数。所以这个格式字符串表示的是把一个浮点数和长整数拼起来的结构体,
    -- ARGV[2]就是请求时间戳,ARGV[1]是请求的令牌数,统计会用到,ARGV[3]是当前时间戳为种子的随机数,具体用处还不知道,知道的网友可以留言
    ------------------------- 更新Stringt结构,因为这是获取令牌操作,减掉一个令牌
    -------------------------【本文作者认为,这里可以直接初始化string结构,值为rate - 1】
    redis.call("decrby", valueName, ARGV[1])
    return nil
end

这段lua代码也并不复杂,令牌桶的数量主要是通过时间窗口来控制,判断上一个请求是否超过了令牌生产周期。

留下一个疑问?

-- 移除zset中所有元素【要求是同一个限流器permitsName,不然就移除不了,尴尬】
redis.call("zrem", permitsName, unpack(expiredValues))

我自己在本地测试,只要超过10s,permitsName就不一样,这就导致了这部分数据是不能移除的,就产生了冗余数据,从前面的截图也可以看出,是新生成了一个zset数据结构。

相当于直接走到了这一步:

------------------------- 更新zset结构
redis.call("zadd", permitsName, ARGV[2], struct.pack("fI", ARGV[3], ARGV[1]))

至于为什么会产生这样的结果,会的小伙伴可以留言,或者过段时间我提个issue。

以上就是redisson分布式限流RRateLimiter源码解析的详细内容,更多关于redisson分布式限流RRateLimiter的资料请关注我们其它相关文章!

(0)

相关推荐

  • 基于Redis实现分布式单号及分布式ID(自定义规则生成)

    目录 背景 Redis实现方式 代码实例 单号生成枚举 单号生成工具类 单号生成接口 单号生成接口实现 使用测试 总结 背景 一些业务背景下,业务要求单号需要有区分不同的前缀,那么在分布式的架构下如何自定义单号而且还能保证唯一呢? 注:分布式ID也可以此方式 Redis实现方式 Redis的所有命令操作都是单线程的,本身提供像 incr 和 increby 这样的自增原子命令,所以能保证生成的 ID 肯定是唯一有序的. 优点:不依赖于数据库,灵活方便,且性能优于数据库:数字ID天然排序,对分页或

  • Redis分布式锁解决秒杀超卖问题

    目录 分布式锁应用场景 单体锁的分类 分布式锁核心逻辑 分布式锁实现的问题——死锁和解决 Redis解决删除别人锁的问题 分布式锁应用场景 秒杀环境下:订单服务从库存中心拿到库存数,如果库存总数大于0,则进行库存扣减,并创建订单订单服务负责创建订单库存服务负责扣减库存 模拟用户访问库存 多线程并发访问,出现超卖问题,线程不安全.没有保证原子性 单体锁的分类 单体应用锁指的是只能在 一个JVM 进程内有效的锁.我们把这种锁叫做单体应用锁 synchronized锁ReentrantLock锁一个

  • redis分布式ID解决方案示例详解

    目录 常用的分布式ID解决方案 UUID Snowflake Snowflake算法的Java代码: Leaf Leaf算法的Java代码: 基于数据库自增ID生成 基于UUID生成 基于Redis生成 基于ZooKeeper生成 常用的分布式ID解决方案 在分布式系统中,生成全局唯一ID是非常重要的,因为在分布式系统中,多个节点同时生成ID可能会导致ID冲突. 下面介绍几种常用的分布式ID解决方案. UUID UUID(通用唯一标识符)是由128位数字组成的标识符,它可以保证在全球范围内的唯一

  • Java实现redis分布式锁的三种方式

    目录 一.引入原因 二.分布式锁实现过程中的问题 问题一:异常导致锁没有释放 问题二:获取锁与设置过期时间操作不是原子性的 问题三:锁过期之后被别的线程重新获取与释放 问题四:锁的释放不是原子性的 问题五:其他的问题? 三.具体实现 1. RedisTemplate 2. RedisLockRegistry 3. 使用redisson实现分布式锁 一.引入原因 在分布式服务中,常常有如定时任务.库存更新这样的场景. 在定时任务中,如果不使用quartz这样的分布式定时工具,只是简单的使用定时器来

  • Spring Boot 集成Redisson实现分布式锁详细案例

    目录 前言 分布式锁实现 引入jar包 Redisson的配置 application.yml中引入redisson.yml配置 redisson.yml配置 封装Redisson工具类 模拟秒杀扣减库存 测试代码 总结 前言 Spring Boot集成Redis实现单机分布式锁针对单机分布式锁还是存在锁定续期.可重入的问题,本文将采用Spring Boot 集成Ression实现分布式锁进行详细讲解. 分布式锁实现 引入jar包 <dependency> <groupId>org

  • Redisson分布式限流的实现原理解析

    目录 正文 RRateLimiter使用 RRateLimiter的实现 RRateLimiter使用时注意事项 RRateLimiter是非公平限流器 Rate不要设置太大 限流的上限取决于Redis单实例的性能 分布式限流的本质 正文 我们目前在工作中遇到一个性能问题,我们有个定时任务需要处理大量的数据,为了提升吞吐量,所以部署了很多台机器,但这个任务在运行前需要从别的服务那拉取大量的数据,随着数据量的增大,如果同时多台机器并发拉取数据,会对下游服务产生非常大的压力.之前已经增加了单机限流,

  • Netty分布式NioEventLoop优化selector源码解析

    目录 优化selector selector的创建过程 代码剖析 这里一步创建了这个优化后的数据结构 最后返回优化后的selector 优化selector selector的创建过程 在剖析selector轮询之前, 我们先讲解一下selector的创建过程 回顾之前的小节, 在创建NioEventLoop中初始化了唯一绑定的selector: NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider

  • Netty分布式ByteBuf缓冲区分配器源码解析

    目录 缓冲区分配器 以其中的分配ByteBuf的方法为例, 对其做简单的介绍 跟到directBuffer()方法中 我们回到缓冲区分配的方法 然后通过validate方法进行参数验证 缓冲区分配器 顾明思议就是分配缓冲区的工具, 在netty中, 缓冲区分配器的顶级抽象是接口ByteBufAllocator, 里面定义了有关缓冲区分配的相关api 抽象类AbstractByteBufAllocator实现了ByteBufAllocator接口, 并且实现了其大部分功能 和AbstractByt

  • Netty分布式行解码器逻辑源码解析

    目录 行解码器LineBasedFrameDecoder 首先看其参数 我们跟到重载的decode方法中 我们看findEndOfLine(buffer)方法 这一小节了解下行解码器LineBasedFrameDecoder, 行解码器的功能是一个字节流, 以\r\n或者直接以\n结尾进行解码, 也就是以换行符为分隔进行解析 同样, 这个解码器也继承了ByteToMessageDecoder 行解码器LineBasedFrameDecoder 首先看其参数 //数据包的最大长度, 超过该长度会进

  • Redis分布式限流组件设计与使用实例

    目录 1.背景 2.Redis计数器限流设计 2.1Lua脚本 2.2自定义注解 2.3限流组件 2.4限流切面实现 3.测试一下 3.1方法限流示例 3.2动态入参限流示例 4.其它扩展 5.源码地址 本文主要讲解基于 自定义注解+Aop+反射+Redis+Lua表达式 实现的限流设计方案.实现的限流设计与实际使用. 1.背景 在互联网开发中经常遇到需要限流的场景一般分为两种 业务场景需要(比如:5分钟内发送验证码不超过xxx次); 对流量大的功能流量削峰; 一般我们衡量系统处理能力的指标是每

  • Nett分布式分隔符解码器逻辑源码剖析

    目录 分隔符解码器 我们看其中的一个构造方法 我们跟到重载decode方法中 我们看初始化该属性的构造方法 章节总结 前文传送门:Netty分布式行解码器逻辑源码解析 分隔符解码器 基于分隔符解码器DelimiterBasedFrameDecoder, 是按照指定分隔符进行解码的解码器, 通过分隔符, 可以将二进制流拆分成完整的数据包 同样继承了ByteToMessageDecoder并重写了decode方法 我们看其中的一个构造方法 public DelimiterBasedFrameDeco

  • Netty分布式客户端处理接入事件handle源码解析

    目录 处理接入事件创建handle 我们看其RecvByteBufAllocator接口 跟进newHandle()方法中 继续回到read()方法 我们跟进reset中 前文传送门 :客户端接入流程初始化源码分析 上一小节我们剖析完成了与channel绑定的ChannelConfig初始化相关的流程, 这一小节继续剖析客户端连接事件的处理 处理接入事件创建handle 回到上一章NioEventLoop的processSelectedKey ()方法 private void processS

  • Netty分布式ByteBuf的分类方式源码解析

    目录 ByteBuf根据不同的分类方式 会有不同的分类结果 1.Pooled和Unpooled 2.基于直接内存的ByteBuf和基于堆内存的ByteBuf 3.safe和unsafe 上一小节简单介绍了AbstractByteBuf这个抽象类, 这一小节对其子类的分类做一个简单的介绍 ByteBuf根据不同的分类方式 会有不同的分类结果 我们首先看第一种分类方式 1.Pooled和Unpooled pooled是从一块内存里去取一段连续内存封装成byteBuf 具体标志是类名以Pooled开头

  • Netty分布式ByteBuf使用的底层实现方式源码解析

    目录 概述 AbstractByteBuf属性和构造方法 首先看这个类的属性和构造方法 我们看几个最简单的方法 我们重点关注第二个校验方法ensureWritable(length) 我们跟到扩容的方法里面去 最后将写指针后移length个字节 概述 熟悉Nio的小伙伴应该对jdk底层byteBuffer不会陌生, 也就是字节缓冲区, 主要用于对网络底层io进行读写, 当channel中有数据时, 将channel中的数据读取到字节缓冲区, 当要往对方写数据的时候, 将字节缓冲区的数据写到cha

  • 使用nginx实现分布式限流的方法

    1.前言 一般对外暴露的系统,在促销或者黑客攻击时会涌来大量的请求,为了保护系统不被瞬间到来的高并发流量给打垮, 就需要限流 . 本文主要阐述如何用nginx 来实现限流. 听说 Hystrix 也可以, 各位有兴趣可以去研究哈 . 2.首先部署一个对外暴露接口的程序 我这里部署的是一个spring boot 项目 里面暴露了如下接口, 很简单 暴露了一个 get 请求返回 hello world 的restful 接口. 将此程序部署到 linux 服务器上. 部署步奏不再赘述, 自行百度 s

随机推荐