详解Java分布式IP限流和防止恶意IP攻击方案

前言

限流是分布式系统设计中经常提到的概念,在某些要求不严格的场景下,使用Guava RateLimiter就可以满足。但是Guava RateLimiter只能应用于单进程,多进程间协同控制便无能为力。本文介绍一种简单的处理方式,用于分布式环境下接口调用频次管控。

如何防止恶意IP攻击某些暴露的接口呢(比如某些场景下短信验证码服务)?本文介绍一种本地缓存和分布式缓存集成方式判断远程IP是否为恶意调用接口的IP。

分布式IP限流

思路是使用redis incr命令,完成一段时间内接口请求次数的统计,以此来完成限流相关逻辑。

private static final String LIMIT_LUA =
  "local my_limit = redis.call('incr', KEYS[1])\n" +
      " if tonumber(my_limit) == 1 then\n" +
      "  redis.call('expire', KEYS[1], ARGV[1])\n" +
      "  return 1\n" +
      " elseif tonumber(my_limit) > tonumber(ARGV[2]) then\n" +
      "  return 0\n" +
      " else\n" +
      "  return 1\n" +
      " end\n";

这里为啥时候用lua脚本来实现呢?因为要保证incr命令和expire命令的原子性操作。KEYS[1]代表自增key值, ARGV[1]代表过期时间,ARGV[2]代表最大频次,明白了这些参数的含义,整个lua脚本逻辑也就不言而喻了。

/**
 * @param limitKey 限制Key值
 * @param maxRate 最大速率
 * @param expire  Key过期时间
 */
public boolean access(String limitKey, int maxRate, int expire) {
  if (StringUtils.isBlank(limitKey)) {
    return true;
  }

  String cacheKey = LIMIT_KEY_PREFIX + limitKey;

  return REDIS_SUCCESS_STATUS.equals(
      this.cacheService.eval(
          LIMIT_LUA
          , Arrays.asList(cacheKey)
          , Arrays.asList(String.valueOf(expire), String.valueOf(maxRate))
      ).toString()
  );
}

public void unlimit(String limitKey) {
  if (StringUtils.isBlank(limitKey)) {
    return;
  }
  String cacheKey = LIMIT_KEY_PREFIX + limitKey;
  this.cacheService.decr(cacheKey);
}

access方法用来判断 limitKey 是否超过了最大访问频次。缓存服务对象(cacheService)的eval方法参数分别是lua脚本、key list、value list。

unlimit方法其实就是执行redis decr操作,在某些业务场景可以回退访问频次统计。

防止恶意IP攻击

由于某些对外暴露的接口很容易被恶意用户攻击,必须做好防范措施。最近我就遇到了这么一种情况,我们一个快应用产品,短信验证码服务被恶意调用了。通过后台的日志发现,IP固定,接口调用时间间隔固定,明显是被人利用了。虽然我们针对每个手机号每天发送短信验证码的次数限制在5次以内。但是短信验证码服务每天这样被重复调用,会打扰用户并产生投诉。针对这种现象,简单的做了一个方案,可以自动识别恶意攻击的IP并加入黑名单。

思路是这样的,针对某些业务场景,约定在一段时间内同一个IP访问最大频次,如果超过了这个最大频次,那么就认为是非法IP。识别了非法IP后,把IP同时放入本地缓存和分布式缓存中。非法IP再次访问的时候,拦截器发现本地缓存(没有则去分布式缓存)有记录这个IP,直接返回异常状态,不会继续执行正常业务逻辑。

Guava本地缓存集成Redis分布式缓存

public abstract class AbstractCombineCache<K, V> {
  private static Logger LOGGER = LoggerFactory.getLogger(AbstractCombineCache.class);

  protected Cache<K, V> localCache;

  protected ICacheService cacheService;

  public AbstractCombineCache(Cache<K, V> localCache, ICacheService cacheService) {
    this.localCache = localCache;
    this.cacheService = cacheService;
  }

  public Cache<K, V> getLocalCache() {
    return localCache;
  }

  public ICacheService getCacheService() {
    return cacheService;
  }

  public V get(K key) {
    //只有LoadingCache对象才有get方法,如果本地缓存不存在key值, 会执行CacheLoader的load方法,从分布式缓存中加载。
    if (localCache instanceof LoadingCache) {
      try {
        return ((LoadingCache<K, V>) localCache).get(key);
      } catch (ExecutionException e) {
        LOGGER.error(String.format("cache key=%s loading error...", key), e);
        return null;
      } catch (CacheLoader.InvalidCacheLoadException e) {
        //分布式缓存中不存在这个key
        LOGGER.error(String.format("cache key=%s loading fail...", key));
        return null;
      }
    } else {
      return localCache.getIfPresent(key);
    }
  }

  public void put(K key, V value, int expire) {
    this.localCache.put(key, value);
    String cacheKey = key instanceof String ? (String) key : key.toString();
    if (value instanceof String) {
      this.cacheService.setex(cacheKey, (String) value, expire);
    } else {
      this.cacheService.setexObject(cacheKey, value, expire);
    }
  }
}

AbstractCombineCache这个抽象类封装了guava本地缓存和redis分布式缓存操作,可以降低分布式缓存压力。

防止恶意IP攻击缓存服务

public class IPBlackCache extends AbstractCombineCache<String, Object> {
  private static Logger LOGGER = LoggerFactory.getLogger(IPBlackCache.class);

  private static final String IP_BLACK_KEY_PREFIX = "wmhipblack_";

  private static final String REDIS_SUCCESS_STATUS = "1";

  private static final String IP_RATE_LUA =
      "local ip_rate = redis.call('incr', KEYS[1])\n" +
          " if tonumber(ip_rate) == 1 then\n" +
          "  redis.call('expire', KEYS[1], ARGV[1])\n" +
          "  return 1\n" +
          " elseif tonumber(ip_rate) > tonumber(ARGV[2]) then\n" +
          "  return 0\n" +
          " else\n" +
          "  return 1\n" +
          " end\n";

  public IPBlackCache(Cache<String, Object> localCache, ICacheService cacheService) {
    super(localCache, cacheService);
  }

  /**
   * @param ipKey  IP
   * @param maxRate 最大速率
   * @param expire 过期时间
   */
  public boolean ipAccess(String ipKey, int maxRate, int expire) {
    if (StringUtils.isBlank(ipKey)) {
      return true;
    }

    String cacheKey = IP_BLACK_KEY_PREFIX + ipKey;

    return REDIS_SUCCESS_STATUS.equals(
        this.cacheService.eval(
            IP_RATE_LUA
            , Arrays.asList(cacheKey)
            , Arrays.asList(String.valueOf(expire), String.valueOf(maxRate))
        ).toString()
    );
  }

  /**
   * @param ipKey IP
   */
  public void removeIpAccess(String ipKey) {
    if (StringUtils.isBlank(ipKey)) {
      return;
    }
    String cacheKey = IP_BLACK_KEY_PREFIX + ipKey;
    try {
      this.cacheService.del(cacheKey);
    } catch (Exception e) {
      LOGGER.error(String.format("%s, ip access remove error...", ipKey), e);
    }
  }
}

没有错,IP_RATE_LUA 这个lua脚本和上面说的限流方案对应的lua脚本是一样的。

IPBlackCache继承了AbstractCombineCache,构造函数需要guava的本地Cache对象和redis分布式缓存服务ICacheService 对象。

ipAccess方法用来判断当前ip访问次数是否在一定时间内已经达到了最大访问频次。

removeIpAccess方法是直接移除当前ip访问频次统计的key值。

防止恶意IP攻击缓存配置类

@Configuration
public class IPBlackCacheConfig {
  private static final String IPBLACK_LOCAL_CACHE_NAME = "ip-black-cache";
  private static Logger LOGGER = LoggerFactory.getLogger(IPBlackCacheConfig.class);

  @Autowired
  private LimitConstants limitConstants;

  @Bean
  public IPBlackCache ipBlackCache(@Autowired ICacheService cacheService) {
    GuavaCacheBuilder cacheBuilder = new GuavaCacheBuilder<String, Object>(IPBLACK_LOCAL_CACHE_NAME);
    cacheBuilder.setCacheBuilder(
        CacheBuilder.newBuilder()
            .initialCapacity(100)
            .maximumSize(10000)
            .concurrencyLevel(10)
            .expireAfterWrite(limitConstants.getIpBlackExpire(), TimeUnit.SECONDS)
            .removalListener((RemovalListener<String, Object>) notification -> {
              String curTime = LocalDateTime.now().toString();
              LOGGER.info(notification.getKey() + " 本地缓存移除时间:" + curTime);
              try {
                cacheService.del(notification.getKey());
                LOGGER.info(notification.getKey() + " 分布式缓存移除时间:" + curTime);
              } catch (Exception e) {
                LOGGER.error(notification.getKey() + " 分布式缓存移除异常...", e);
              }
            })
    );
    cacheBuilder.setCacheLoader(new CacheLoader<String, Object>() {
      @Override
      public Object load(String key) {
        try {
          Object obj = cacheService.getString(key);
          LOGGER.info(String.format("从分布式缓存中加载key=%s, value=%s", key, obj));
          return obj;
        } catch (Exception e) {
          LOGGER.error(key + " 从分布式缓存加载异常...", e);
          return null;
        }
      }
    });

    Cache<String, Object> localCache = cacheBuilder.build();
    IPBlackCache ipBlackCache = new IPBlackCache(localCache, cacheService);
    return ipBlackCache;
  }
}

注入redis分布式缓存服务ICacheService对象。

通过GuavaCacheBuilder构建guava本地Cache对象,指定初始容量(initialCapacity)、最大容量(maximumSize)、并发级别、key过期时间、key移除监听器。最终要的是CacheLoader这个参数,是干什么用的呢?如果GuavaCacheBuilder指定了CacheLoader对象,那么最终创建的guava本地Cache对象是LoadingCache类型(参考AbstractCombineCache类的get方法),LoadingCache对象的get方法首先从内存中获取key对应的value,如果内存中不存在这个key则调用CacheLoader对象的load方法加载key对应的value值,加载成功后放入内存中。

最后通过ICacheService对象和guava本地Cache对象创建IPBlackCache(防止恶意IP攻击缓存服务)对象。

拦截器里恶意IP校验

定义一个注解,标注在指定方法上,拦截器里会识别这个注解。

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface IPBlackLimit {
  //统计时间内最大速率
  int maxRate();

  //频次统计时间
  int duration();

  //方法名称
  String method() default StringUtils.EMPTY;
}

拦截器里加入ipAccess方法,校验远程IP是否为恶意攻击的IP。

/**
* @param method 需要校验的方法
* @param remoteAddr 远程IP
*/
private boolean ipAccess(Method method, String remoteAddr) {
  if (StringUtils.isBlank(remoteAddr) || !AnnotatedElementUtils.isAnnotated(method, IPBlackLimit.class)) {
    return true;
  }
  IPBlackLimit ipBlackLimit = AnnotatedElementUtils.getMergedAnnotation(method, IPBlackLimit.class);
  try {
    String ip = remoteAddr.split(",")[0].trim();
    String cacheKey = "cipb_" + (StringUtils.isBlank(ipBlackLimit.method()) ? ip : String.format("%s_%s", ip, ipBlackLimit.method()));

    String beginAccessTime = (String) ipBlackCache.get(cacheKey);
    if (StringUtils.isNotBlank(beginAccessTime)) {
      LocalDateTime beginTime = LocalDateTime.parse(beginAccessTime, DateTimeFormatter.ISO_LOCAL_DATE_TIME), endTime = LocalDateTime.now();
      Duration duration = Duration.between(beginTime, endTime);
      if (duration.getSeconds() >= limitConstants.getIpBlackExpire()) {
        ipBlackCache.getLocalCache().invalidate(cacheKey);
        return true;
      } else {
        return false;
      }
    }

    boolean access = ipBlackCache.ipAccess(cacheKey, ipBlackLimit.maxRate(), ipBlackLimit.duration());
    if (!access) {
      ipBlackCache.removeIpAccess(cacheKey);
      String curTime = LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME);
      ipBlackCache.put(cacheKey, curTime, limitConstants.getIpBlackExpire());
    }
    return access;
  } catch (Exception e) {
    LOGGER.error(String.format("method=%sï¼remoteAddr=%s, ip access check error.", method.getName(), remoteAddr), e);
    return true;
  }
}

remoteAddr取的是X-Forwarded-For对应的值。利用 remoteAddr 构造 cacheKey 参数,通过IPBlackCache判断 cacheKey 是否存在。

如果是 cacheKey 存在的请求,判断黑名单IP限制是否已经到达有效期,如果已经超过有效期则清除本地缓存和分布式缓存的 cacheKey ,请求合法;如果没有超过有效期则请求非法。

否则是 cacheKey 不存在的请求,使用IPBlackCache对象的ipAccess方法统计一定时间内的访问频次,如果频次超过最大限制,表明是非法请求IP,需要往IPBlackCache对象写入“ cacheKey =当前时间”。

总结

本文的两种方案都使用redis incr命令,如果不是特殊业务场景,redis的key要指定过期时间,严格来讲需要保证incr和expire两个命令的原子性,所以使用lua脚本方式。如果没有那么严格,完全可以先setex(设置key,value,过期时间),然后再incr(注: incr不会更新key的有效期 )。本文的设计方案仅供参考,并不能应用于所有的业务场景。

到此这篇关于详解Java分布式IP限流和防止恶意IP攻击方案的文章就介绍到这了,更多相关Java 分布式IP限流和防止恶意IP内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java RPC框架如何实现客户端限流配置

    关键资源 关键资源总是有限的,也就意味着处理能力也有限,所以当面对大量业务时,为了保障自己能够有序的提供服务最经济的做法就是限制同一时间处理的事务数.比如银行的工作人员,一个工作人员同时只能为一个客户服务,来多了根本处理不了,不光是一种浪费而且有可以造成混乱的局面导致工作人员无法工作. 网络请求漏斗 越上层的服务器处理的事务越轻,应付请求的能力也越强,也就意味着同一请求越上层处理时间越短.为了有效的保护下层服务器,就需要对发送给下层的请求量做限流,在下层服务器可接受的范围内.否则就可能会出现下层

  • Javaweb应用使用限流处理大量的并发请求详解

    在web应用中,同一时间有大量的客户端请求同时发送到服务器,例如抢购.秒杀等.这个时候如何避免将大量的请求同时发送到业务系统. 第一种方法:在容器中配置最大请求数,如果大于改请求数,则客户端阻塞.该方法有效的阻止了大量的请求同时访问业务系统,但对用户不友好. 第二种方法:使用过滤器,保证一定数量的请求能够正常访问系统,多余的请求先跳转到排队页面,由排队页面定时发起请求.过滤器实现如下: public class ServiceFilter implements Filter { private

  • 详解Java分布式IP限流和防止恶意IP攻击方案

    前言 限流是分布式系统设计中经常提到的概念,在某些要求不严格的场景下,使用Guava RateLimiter就可以满足.但是Guava RateLimiter只能应用于单进程,多进程间协同控制便无能为力.本文介绍一种简单的处理方式,用于分布式环境下接口调用频次管控. 如何防止恶意IP攻击某些暴露的接口呢(比如某些场景下短信验证码服务)?本文介绍一种本地缓存和分布式缓存集成方式判断远程IP是否为恶意调用接口的IP. 分布式IP限流 思路是使用redis incr命令,完成一段时间内接口请求次数的统

  • 详解Java分布式缓存系统中必须解决的四大问题

    目录 缓存穿透 缓存击穿 缓存雪崩 缓存一致性 分布式缓存系统是三高架构中不可或缺的部分,极大地提高了整个项目的并发量.响应速度,但它也带来了新的需要解决的问题,分别是: 缓存穿透.缓存击穿.缓存雪崩和缓存一致性问题. 缓存穿透 第一个比较大的问题就是缓存穿透.这个概念比较好理解,和命中率有关.如果命中率很低,那么压力就会集中在数据库持久层. 假如能找到相关数据,我们就可以把它缓存起来.但问题是,本次请求,在缓存和持久层都没有命中,这种情况就叫缓存的穿透. 举个例子,如上图,在一个登录系统中,有

  • 详解Java分布式Session共享解决方案

    分布式Session一致性? 说白了就是服务器集群Session共享的问题 Session的作用? Session 是客户端与服务器通讯会话跟踪技术,服务器与客户端保持整个通讯的会话基本信息. 客户端在第一次访问服务端的时候,服务端会响应一个sessionId并且将它存入到本地cookie中,在之后的访问会将cookie中的sessionId放入到请求头中去访问服务器,如果通过这个sessionid没有找到对应的数据那么服务器会创建一个新的sessionid并且响应给客户端. 分布式Sessio

  • 详解Golang实现请求限流的几种办法

    简单的并发控制 利用 channel 的缓冲设定,我们就可以来实现并发的限制.我们只要在执行并发的同时,往一个带有缓冲的 channel 里写入点东西(随便写啥,内容不重要).让并发的 goroutine在执行完成后把这个 channel 里的东西给读走.这样整个并发的数量就讲控制在这个 channel的缓冲区大小上. 比如我们可以用一个 bool 类型的带缓冲 channel 作为并发限制的计数器. chLimit := make(chan bool, 1) 然后在并发执行的地方,每创建一个新

  • 详解JAVA 字节流和字符流

    1.InputStream 和 Reader InputStream 和 Reader 是所有输入流的抽象基类,本身并不能创建实例来执行输入,但它们将成为所有输入流的模板,所以它们的方法是所有输入流都可使用的方法. 在 InputStream 里包含如下三个方法. int read():从输入流中读取单个字节,返回所读取的字节数据(字节数据可直接转换为int类型). int read(byte[] b):从输入流中最多读取 b.length 个字节的数据,并将其存储在字节数组 b 中,返回实际读

  • 详解Java分布式事务的 6 种解决方案

    介绍 在分布式系统.微服务架构大行其道的今天,服务间互相调用出现失败已经成为常态.如何处理异常,如何保证数据一致性,成为微服务设计过程中,绕不开的一个难题. 在不同的业务场景下,解决方案会有所差异,常见的方式有: 阻塞式重试: 2PC.3PC 传统事务: 使用队列,后台异步处理: TCC 补偿事务: 本地消息表(异步确保): MQ 事务. 本文侧重于其他几项,关于 2PC.3PC 传统事务,网上资料已经非常多了,这里不多做重复. 阻塞式重试 在微服务架构中,阻塞式重试是比较常见的一种方式.伪代码

  • 详解Spring Cloud Gateway 限流操作

    开发高并发系统时有三把利器用来保护系统:缓存.降级和限流. API网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性. 常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等. 在Zuul中我们可以自己去实现限流的功能 (Zuul中如何限流在我的书 <Spring Cloud微服务-全栈技术与案例解析>  中有详细讲解) ,Spring Cloud Gateway的出现本身就是用来替代Zuul的. 要想替代那肯定得有强大的功能,除了性能上的优势之外,Spr

  • 详解5种Java中常见限流算法

    目录 01固定窗口 02滑动窗口 03漏桶算法 04令牌桶 05滑动日志 06分布式限流 07总结 1.瞬时流量过高,服务被压垮? 2.恶意用户高频光顾,导致服务器宕机? 3.消息消费过快,导致数据库压力过大,性能下降甚至崩溃? ...... 在高并发系统中,出于系统保护角度考虑,通常会对流量进行限流:不但在工作中要频繁使用,而且也是面试中的高频考点. 今天我们将图文并茂地对常见的限流算法分别进行介绍,通过各个算法的特点,给出限流算法选型的一些建议,并给出Java语言实现的代码示例. 01固定窗

  • 详解Java Web如何限制访问的IP的两种方法

    前一阵子因为在做项目时碰到了这个功能,现在好好总结一下,至于为什么要限制IP访问,我就不多说了.然后百度了一下,现在主要有两种方式去限制IP访问,第一种是最简单的方便的,第二种是通过过滤器来限制访问.下面我简单介绍一下第一种方式,着重介绍第二种. 第一种方式(Tomcat配置项配置允许或限制IP访问) 这种是最简单的快捷的,主要就涉及Tomcat的server.xml配置. 第一步:找到server.xml文件在哪,在Tomcat的目录下的conf文件夹下. 第二步:打开server.xml文件

  • 详解JAVA Stream流

    摘要 Stream 是对集合对象功能的增强,它专注于对集合对象进行各种非常便利.高效的聚合操作,或者大批量数据操作.通常我们需要多行代码才能完成的操作,借助于Stream流式处理可以很简单的实现. Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的. 创建Steam流 调用Collection.stream()函数创建一个Stream对象 Stream 接口的静态方法 of 可以获取数组对应的流 List<String> list = new ArrayList<

随机推荐