Redis分布式限流组件设计与使用实例

目录
  • 1.背景
  • 2.Redis计数器限流设计
    • 2.1Lua脚本
    • 2.2自定义注解
    • 2.3限流组件
    • 2.4限流切面实现
  • 3.测试一下
    • 3.1方法限流示例
    • 3.2动态入参限流示例
  • 4.其它扩展
  • 5.源码地址

本文主要讲解基于 自定义注解+Aop+反射+Redis+Lua表达式 实现的限流设计方案。实现的限流设计与实际使用。

1.背景

在互联网开发中经常遇到需要限流的场景一般分为两种

  • 业务场景需要(比如:5分钟内发送验证码不超过xxx次);
  • 对流量大的功能流量削峰;

一般我们衡量系统处理能力的指标是每秒的QPS或者TPS,假设系统每秒的流量阈值是2000,
理论上第2001个请求进来时,那么这个请求就需要被限流。

本文演示项目使用的是 SpringBoot 项目,项目构建以及其他配置,这里不做演示。文末附限流Demo源码

2.Redis计数器限流设计

本文演示项目使用的是 SpringBoot 项目,这里仅挑选了重点实现代码展示,
项目构建以及其他配置,这里不做演示,详细配置请参考源码demo工程。

2.1Lua脚本

Lua 是一种轻量小巧的脚本语言可以理解为就是一组命令。
使用Redis的计数器达到限流的效果,表面上Redis自带命令多个组合也可以支持了,那为什么还要用Lua呢?
因为要保证原子性,这也是使用redis+Lua表达式原因,一组命令要么全成功,要么全失败。
相比Redis事务,Lua脚本的优点:

  • 减少网络开销:多个请求通过脚本一次发送,减少网络延迟
  • 原子操作:将脚本作为一个整体执行,中间不会插入其他命令,无需使用事务
  • 复用:客户端发送的脚本永久存在redis中,其他客户端可以复用脚本
  • 可嵌入性:可嵌入JAVA,C#等多种编程语言,支持不同操作系统跨平台交互

实现限流Lua脚本示例

# 定义计数变量
local count
# 获取调用脚本时传入的第一个key值(用作限流的 key)
count = redis.call('get',KEYS[1])
# 限流最大值比较,若超过最大值,则直接返回
if count and tonumber(count) > tonumber(ARGV[1]) then
return count;
end
# incr 命令 执行计算器累加
count = redis.call('incr',KEYS[1])
# 从第一次调用开始限流,并设置失效时间
if tonumber(count) == 1 then
redis.call('expire',KEYS[1],ARGV[2])
end
return count;

参数说明

  • KEYS[1] - redis的Key
  • ARGV[1] - 限流次数
  • ARGV[2] - 失效时间

2.2自定义注解

支持范围:任意接口

/**
 * 描述: 限流注解
 *
 * @author 程序员小强
 **/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface RateLimit {

    /**
     * 限流唯一标示 key
     * 若同时使用 keyFiled 则当前 key作为前缀
     */
    String key();

    /**
     * 限流时间-单位:秒数
     * 默认 60s
     */
    int time() default 60;

    /**
     * 限流次数
     * 失效时间段内最大放行次数
     */
    int count();

    /**
     * 可作为限流key-参数类中属性名,动态值
     * 示例:phone、userId 等
     */
    String keyField() default "";

    /**
     * 超过最大访问次数后的,提示内容
     */
    String msg() default "over the max request times please try again";

}

属性介绍

  • key - 必填,限流key唯一标识,redis存储key
  • time -过期时间,单位 秒,默认60s
  • count - 必填,失效时间段内最大放行次数
  • keyField - 动态限流key,比如参数是一个自定义的类,里面有属性userId 等。可以使用keyField=“userId”,

这样生成的key为参数中userId的值。一般与key属性组合使用。不支持java基本类型参数,
仅支持参数是一个对象的接口。

msg - 超过限流的提示内容

示例:

@RateLimit(key = "limit-phone-key", time = 300, count = 10, keyField = "phone", msg = "5分钟内,验证码最多发送10次")

含义 - 5分钟内根据手机号限流10次
RedisKey- limit-phone-key:后面拼接的是参数中phone的值。

2.3限流组件

这里用的是jedis客户端,配置就不列在这里的,详见源码,文末附源码地址

/**
 * Redis限流组件
 *
 * @author 程序员小强
 */
@Component
public class RedisRateLimitComponent {
    private static final Logger logger = LoggerFactory.getLogger(RedisRateLimitComponent.class);

    private JedisPool jedisPool;

    @Autowired
    public RedisRateLimitComponent(JedisPool jedisPool) {
        this.jedisPool = jedisPool;
    }

    /**
     * 限流方法
     * 1.执行 lua 表达式
     * 2.通过 lua 表达式实现-限流计数器
     *
     * @param redisKey
     * @param time           超时时间-秒数
     * @param rateLimitCount 限流次数
     */
    public Long rateLimit(String redisKey, Integer time, Integer rateLimitCount) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            Object obj = jedis.evalsha(jedis.scriptLoad(this.buildLuaScript()), Collections.singletonList(redisKey),
                    Arrays.asList(String.valueOf(rateLimitCount), String.valueOf(time)));
            return Long.valueOf(obj.toString());
        } catch (JedisException ex) {
            logger.error("[ executeLua ] >> messages:{}", ex.getMessage(), ex);
            throw new RateLimitException("[ RedisRateLimitComponent ] >> jedis run lua script exception" + ex.getMessage());
        } finally {
            if (jedis != null) {
                if (jedis.isConnected()) {
                    jedis.close();
                }
            }
        }
    }

    /**
     * 构建lua 表达式
     * KEYS[1] -- 参数key
     * ARGV[1]-- 失效时间段内最大放行次数
     * ARGV[2]-- 失效时间|秒
     */
    private String buildLuaScript() {
        StringBuilder luaBuilder = new StringBuilder();
        //定义变量
        luaBuilder.append("local count");
        //获取调用脚本时传入的第一个key值(用作限流的 key)
        luaBuilder.append("\ncount = redis.call('get',KEYS[1])");
        // 获取调用脚本时传入的第一个参数值(限流大小)-- 调用不超过最大值,则直接返回
        luaBuilder.append("\nif count and tonumber(count) > tonumber(ARGV[1]) then");
        luaBuilder.append("\nreturn count;");
        luaBuilder.append("\nend");
        //执行计算器自增
        luaBuilder.append("\ncount = redis.call('incr',KEYS[1])");
        //从第一次调用开始限流
        luaBuilder.append("\nif tonumber(count) == 1 then");
        //设置过期时间
        luaBuilder.append("\nredis.call('expire',KEYS[1],ARGV[2])");
        luaBuilder.append("\nend");
        luaBuilder.append("\nreturn count;");
        return luaBuilder.toString();
    }
}

2.4限流切面实现

/**
 * 描述:限流切面实现
 *
 * @author 程序员小强
 **/
@Aspect
@Configuration
public class RateLimitAspect {
    private static final Logger logger = LoggerFactory.getLogger(RateLimitAspect.class);

    private RedisRateLimitComponent redisRateLimitComponent;

    @Autowired
    public RateLimitAspect(RedisRateLimitComponent redisRateLimitComponent) {
        this.redisRateLimitComponent = redisRateLimitComponent;
    }

    /**
     * 匹配所有使用以下注解的方法
     *
     * @see RateLimit
     */
    @Pointcut("@annotation(com.example.ratelimit.annotation.RateLimit)")
    public void pointCut() {
    }

    @Around("pointCut()&&@annotation(rateLimit)")
    public Object logAround(ProceedingJoinPoint joinPoint, RateLimit rateLimit) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        String methodName = signature.getMethod().getName();

        //组装限流key
        String rateLimitKey = this.getRateLimitKey(joinPoint, rateLimit);

        //限流组件-通过计数方式限流
        Long count = redisRateLimitComponent.rateLimit(rateLimitKey, rateLimit.time(), rateLimit.count());
        logger.debug("[ RateLimit ] method={},rateLimitKey={},count={}", methodName, rateLimitKey, count);

        if (null != count && count.intValue() <= rateLimit.count()) {
            //未超过限流次数-执行业务方法
            return joinPoint.proceed();
        } else {
            //超过限流次数
            logger.info("[ RateLimit ] >> over the max request times method={},rateLimitKey={},currentCount={},rateLimitCount={}",
                    methodName, rateLimitKey, count, rateLimit.count());
            throw new RateLimitException(rateLimit.msg());
        }
    }

    /**
     * 获取限流key
     * 默认取 RateLimit > key 属性值
     * 若设置了 keyField 则从参数中获取该字段的值拼接到key中
     * 示例:user_phone_login_max_times:13235777777
     *
     * @param joinPoint
     * @param rateLimit
     */
    private String getRateLimitKey(ProceedingJoinPoint joinPoint, RateLimit rateLimit) {
        String fieldName = rateLimit.keyField();
        if ("".equals(fieldName)) {
            return rateLimit.key();
        }

        //处理自定义-参数名-动态属性key
        StringBuilder rateLimitKeyBuilder = new StringBuilder(rateLimit.key());
        for (Object obj : joinPoint.getArgs()) {
            if (null == obj) {
                continue;
            }
            //过滤基本类型参数
            if (ReflectionUtil.isBaseType(obj.getClass())) {
                continue;
            }
            //属性值
            Object fieldValue = ReflectionUtil.getFieldByClazz(fieldName, obj);
            if (null != fieldValue) {
                rateLimitKeyBuilder.append(":").append(fieldValue.toString());
                break;
            }
        }
        return rateLimitKeyBuilder.toString();
    }
}

由于演示项目中做了统一异常处理
在限流切面这里未做异常捕获,若超过最大限流次数会抛出自定义限流异常。可以根据业务自行处理。

/**
 * 反射工具
 *
 * @author 程序员小强
 */
public class ReflectionUtil {

    private static final Logger logger = LoggerFactory.getLogger(ReflectionUtil.class);

    /**
     * 根据属性名获取属性元素,
     * 包括各种安全范围和所有父类
     *
     * @param fieldName
     * @param object
     * @return
     */
    public static Object getFieldByClazz(String fieldName, Object object) {
        Field field = null;
        Class<?> clazz = object.getClass();
        try {
            for (; clazz != Object.class; clazz = clazz.getSuperclass()) {
                try {
                    //子类中查询不到属性-继续向父类查
                    field = clazz.getDeclaredField(fieldName);
                } catch (NoSuchFieldException ignored) {
                }
            }
            if (null == field) {
                return null;
            }
            field.setAccessible(true);
            return field.get(object);
        } catch (Exception e) {
            //通过反射获取 属性值失败
            logger.error("[ ReflectionUtil ] >> [getFieldByClazz] fieldName:{} ", fieldName, e);
        }
        return null;
    }

    /**
     * 判断对象属性是否是基本数据类型,包括是否包括string | BigDecimal
     *
     * @param clazz
     * @return
     */
    public static boolean isBaseType(Class clazz) {
        if (null == clazz) {
            return false;
        }
        //基本类型
        if (clazz.isPrimitive()) {
            return true;
        }
        //String
        if (clazz.equals(String.class)) {
            return true;
        }
        //Integer
        if (clazz.equals(Integer.class)) {
            return true;
        }
        //Boolean
        if (clazz.equals(Boolean.class)) {
            return true;
        }
        //BigDecimal
        if (clazz.equals(BigDecimal.class)) {
            return true;
        }
        //Byte
        if (clazz.equals(Byte.class)) {
            return true;
        }
        //Long
        if (clazz.equals(Long.class)) {
            return true;
        }
        //Double
        if (clazz.equals(Double.class)) {
            return true;
        }
        //Float
        if (clazz.equals(Float.class)) {
            return true;
        }
        //Character
        if (clazz.equals(Character.class)) {
            return true;
        }
        //Short
        return clazz.equals(Short.class);
    }
}

3.测试一下

基本属性已经配置好了,写个接口测试一下。

3.1方法限流示例

  /**
   * 计数器
   * 演示 demo 为了方便计数
   */
  private static final AtomicInteger COUNTER = new AtomicInteger();    

  /**
   * 普通限流
   * <p>
   * 30 秒中,可以访问10次
   */
  @RequestMapping("/limitTest")
  @RateLimit(key = "limit-test-key", time = 30, count = 10)
  public Response limitTest() {
      Map<String, Object> dataMap = new HashMap<>();
      dataMap.put("date", DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
      dataMap.put("times", COUNTER.incrementAndGet());
      return Response.success(dataMap);
  }

3.2动态入参限流示例

3.2.1场景一:5分钟内,方法最多访问10次,根据入参手机号限流

入参类

public class UserPhoneCaptchaRateParam implements Serializable {

    private static final long serialVersionUID = -1L;

    private String phone;
    //省略 get/set
}
  private static final Map<String, AtomicInteger> COUNT_PHONE_MAP = new HashMap<>();

  /**
   * 根据手机号限流-限制验证码发送次数
   * <p>
   * 示例:5分钟内,验证码最多发送10次
   */
  @RequestMapping("/limitByPhone")
  @RateLimit(key = "limit-phone-key", time = 300, count = 10, keyField = "phone", msg = "5分钟内,验证码最多发送10次")
  public Response limitByPhone(UserPhoneCaptchaRateParam param) {
      Map<String, Object> dataMap = new HashMap<>();
      dataMap.put("date", DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
      if (COUNT_PHONE_MAP.containsKey(param.getPhone())) {
          COUNT_PHONE_MAP.get(param.getPhone()).incrementAndGet();
      } else {
          COUNT_PHONE_MAP.put(param.getPhone(), new AtomicInteger(1));
      }
      dataMap.put("times", COUNT_PHONE_MAP.get(param.getPhone()).intValue());
      dataMap.put("reqParam", param);
      return Response.success(dataMap);
  }

3.2.2场景二:根据订单ID限流

入参类

@Data
public class OrderRateParam implements Serializable {

    private static final long serialVersionUID = -1L;

    private String orderId;
    //省略 get\set
}
  private static final Map<String, AtomicInteger> COUNT_ORDER_MAP = new HashMap<>();

  /**
   * 根据订单ID限流示例
   * <p>
   * 300 秒中,可以访问10次
   */
  @RequestMapping("/limitByOrderId")
  @RateLimit(key = "limit-order-key", time = 300, count = 10, keyField = "orderId", msg = "订单飞走了,请稍后再试!")
  public Response limitByOrderId(OrderRateParam param) {
      Map<String, Object> dataMap = new HashMap<>();
      dataMap.put("date", DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss.SSS"));
      if (COUNT_ORDER_MAP.containsKey(param.getOrderId())) {
          COUNT_ORDER_MAP.get(param.getOrderId()).incrementAndGet();
      } else {
          COUNT_ORDER_MAP.put(param.getOrderId(), new AtomicInteger(1));
      }
      dataMap.put("times", COUNT_ORDER_MAP.get(param.getOrderId()).intValue());
      dataMap.put("reqParam", param);
      return Response.success(dataMap);
  }

4.其它扩展

根据ip限流

在key中拼接IP即可;

5.源码地址

传送门

到此这篇关于Redis分布式限流组件设计与使用实例的文章就介绍到这了,更多相关Redis分布式限流内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 基于Redis+Lua脚本实现分布式限流组件封装的方法

    创建限流组件项目 pom.xml文件中引入相关依赖 <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springf

  • Redis和Lua实现分布式限流器的方法详解

    主要是依靠 redis + lua 来实现限流器, 使用 lua 的原因是将多条命令合并在一起作为一个原子操作, 无需过多考虑并发. 计数器模式 原理 计数器算法是指在一段窗口时间内允许通过的固定数量的请求, 比如10次/秒, 500次/30秒. 如果设置的时间粒度越细, 那么限流会更平滑. 实现 所使用的 Lua 脚本 -- 计数器限流 -- 此处支持的最小单位时间是秒, 若将 expire 改成 pexpire 则可支持毫秒粒度. -- KEYS[1] string 限流的key -- AR

  • springboot+redis 实现分布式限流令牌桶的示例代码

    1.前言 网上找了很多redis分布式限流方案,要不就是太大,需要引入第三方jar,而且还无法正常运行,要不就是定时任务定时往key中放入数据,使用的时候调用,严重影响性能,所以着手自定义实现redis令牌桶. 只用到了spring-boot-starter-data-redis包,并且就几行代码. 2.环境准备 a.idea新建springboot项目,引入spring-data-redis包 b.编写令牌桶实现方法RedisLimitExcutor c.测试功能,创建全局拦截器,测试功能 3

  • 基于Redis实现分布式应用限流的方法

    限流的目的是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率则可以拒绝服务. 前几天在DD的公众号,看了一篇关于使用 瓜娃 实现单应用限流的方案 -->原文,参考<redis in action> 实现了一个jedis版本的,都属于业务层次限制. 实际场景中常用的限流策略: Nginx接入层限流 按照一定的规则如帐号.IP.系统调用逻辑等在Nginx层面做限流 业务应用系统限流 通过业务代码控制流量这个流量可以被称为信号量,可以理解成是一种锁,它

  • Redis分布式限流组件设计与使用实例

    目录 1.背景 2.Redis计数器限流设计 2.1Lua脚本 2.2自定义注解 2.3限流组件 2.4限流切面实现 3.测试一下 3.1方法限流示例 3.2动态入参限流示例 4.其它扩展 5.源码地址 本文主要讲解基于 自定义注解+Aop+反射+Redis+Lua表达式 实现的限流设计方案.实现的限流设计与实际使用. 1.背景 在互联网开发中经常遇到需要限流的场景一般分为两种 业务场景需要(比如:5分钟内发送验证码不超过xxx次); 对流量大的功能流量削峰; 一般我们衡量系统处理能力的指标是每

  • SpringBoot+Redis+Lua分布式限流的实现

    Redis支持LUA脚本的主要优势 LUA脚本的融合将使Redis数据库产生更多的使用场景,迸发更多新的优势: 高效性:减少网络开销及时延,多次redis服务器网络请求的操作,使用LUA脚本可以用一个请求完成 数据可靠性:Redis会将整个脚本作为一个整体执行,中间不会被其他命令插入. 复用性:LUA脚本执行后会永久存储在Redis服务器端,其他客户端可以直接复用 可嵌入性:可嵌入JAVA,C#等多种编程语言,支持不同操作系统跨平台交互 简单强大:小巧轻便,资源占用率低,支持过程化和对象化的编程

  • kubernetes实现分布式限流

    目录 一.概念 1.1 使用场景 1.2 维度 1.3 分布式限流 二.分布式限流常用方案 三.基于kubernetes的分布式限流 3.1 kubernetes中的副本数 3.2 rateLimiter的创建 3.3 rateLimiter的获取 3.4 filter里的判断 四.性能压测 无限流 使用redis限流 自研限流 五.其他问题 5.1 对于保证qps限频准确的时候,应该怎么解决呢? 5.2 服务从1个节点动态扩为4个节点,这个时候新节点识别为4,但其实有些并没有启动完,会不会造成

  • Redisson分布式限流的实现原理解析

    目录 正文 RRateLimiter使用 RRateLimiter的实现 RRateLimiter使用时注意事项 RRateLimiter是非公平限流器 Rate不要设置太大 限流的上限取决于Redis单实例的性能 分布式限流的本质 正文 我们目前在工作中遇到一个性能问题,我们有个定时任务需要处理大量的数据,为了提升吞吐量,所以部署了很多台机器,但这个任务在运行前需要从别的服务那拉取大量的数据,随着数据量的增大,如果同时多台机器并发拉取数据,会对下游服务产生非常大的压力.之前已经增加了单机限流,

  • 详解springboot+aop+Lua分布式限流的最佳实践

    一.什么是限流?为什么要限流? 不知道大家有没有做过帝都的地铁,就是进地铁站都要排队的那种,为什么要这样摆长龙转圈圈?答案就是为了限流!因为一趟地铁的运力是有限的,一下挤进去太多人会造成站台的拥挤.列车的超载,存在一定的安全隐患.同理,我们的程序也是一样,它处理请求的能力也是有限的,一旦请求多到超出它的处理极限就会崩溃.为了不出现最坏的崩溃情况,只能耽误一下大家进站的时间. 限流是保证系统高可用的重要手段!!! 由于互联网公司的流量巨大,系统上线会做一个流量峰值的评估,尤其是像各种秒杀促销活动,

  • 详解Redis实现限流的三种方式

    面对越来越多的高并发场景,限流显示的尤为重要. 当然,限流有许多种实现的方式,Redis具有很强大的功能,我用Redis实践了三种的实现方式,可以较为简单的实现其方式.Redis不仅仅是可以做限流,还可以做数据统计,附近的人等功能,这些可能会后续写到. 第一种:基于Redis的setnx的操作 我们在使用Redis的分布式锁的时候,大家都知道是依靠了setnx的指令,在CAS(Compare and swap)的操作的时候,同时给指定的key设置了过期实践(expire),我们在限流的主要目的就

  • redis lua限流算法实现示例

    目录 限流算法 计数器算法 场景分析 算法实现 漏铜算法 令牌桶算法: 算法实现 限流算法 常见的限流算法 计数器算法 漏桶算法 令牌桶算法 计数器算法   顾名思义,计数器算法是指在一定的时间窗口内允许的固定数量的请求.比如,2s内允许10个请求,30s内允许100个请求等等.如果设置的时间粒度越细,那么相对而言限流就会越平滑,控制的粒度就会更细. 场景分析 试想,如果设置的粒度比较粗会出现什么样的问题呢?如下图设置一个 1000/3s 的限流计数统计. 图中的限流策略为3s内允许的最大请求量

  • 使用nginx实现分布式限流的方法

    1.前言 一般对外暴露的系统,在促销或者黑客攻击时会涌来大量的请求,为了保护系统不被瞬间到来的高并发流量给打垮, 就需要限流 . 本文主要阐述如何用nginx 来实现限流. 听说 Hystrix 也可以, 各位有兴趣可以去研究哈 . 2.首先部署一个对外暴露接口的程序 我这里部署的是一个spring boot 项目 里面暴露了如下接口, 很简单 暴露了一个 get 请求返回 hello world 的restful 接口. 将此程序部署到 linux 服务器上. 部署步奏不再赘述, 自行百度 s

随机推荐