利用Redis实现延时处理的方法实例

背景

在开发中,往往会遇到一些关于延时任务的需求。例如

•生成订单30分钟未支付,则自动取消

•生成订单60秒后,给用户发短信

对上述的任务,我们给一个专业的名字来形容,那就是延时任务。

最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下。

实现过程

说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下几点:

  • Timer使用的是绝对时间,系统时间的改变会对Timer产生一定的影响;而ScheduledThreadPoolExecutor使用的是相对时间,所以不会有这个问题。
  • Timer使用单线程来处理任务,长时间运行的任务会导致其他任务的延时处理,而ScheduledThreadPoolExecutor可以自定义线程数量。
  • Timer没有对运行时异常进行处理,一旦某个任务触发运行时异常,会导致整个Timer崩溃,而ScheduledThreadPoolExecutor对运行时异常做了捕获(可以在 afterExecute() 回调方法中进行处理),所以更加安全。

1、ScheduledThreadPoolExecutor决定了用ScheduledThreadPoolExecutor来进行实现,接下来就是代码编写啦(大体流程代码)。

主要的延时实现如下:

ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(10, new NamedThreadFactory("scheduleThreadPool"), new
ThreadPoolExecutor.AbortPolicy());
//从消息中取出延迟时间及相关信息的代码略
int delayTime = 0;
executorService.scheduleWithFixedDelay(new Runnable() {
  @Override
  public void run() {
   //具体操作逻辑
  }},0,delayTime, TimeUnit.SECONDS);

其中NamedThreadFactory是我自定义的一个线程工厂,主要给线程池定义名称及相关日志打印便于后续的问题分析,这里就不多做介绍了。拒绝策略也是采用默认的拒绝策略。

然后测试了一下,满足目标需求的功能,可以做到延迟指定时间后执行,至此似乎功能就被完成了。

大家可能疑问,这也太简单了有什么好说的,但是这种方式实现简单是简单但是存在一个潜在的问题,问题在哪呢,让我们看一下ScheduledThreadPoolExecutor的源码:

public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
 super(corePoolSize, Integer.MAX_VALUE, 0,
 TimeUnit.NANOSECONDS,new DelayedWorkQueue(), threadFactory);}

ScheduledThreadPoolExecutor由于它自身的延时和周期的特性,默认使用了DelayWorkQueue,而并不像我们平时使用的SingleThreadExecutor等构造是可以使用自己定义的LinkedBlockingQueue并且设置队列大小,问题就出在这里。

DelayWrokQueue是一个无界队列,而我们的目标数据源是kafka,也就是一个高并发高吞吐的消息队列,很大可能在某一时间段有大量的消息过来从而导致OOM,在使用多线程时我们是肯定要考虑到OOM的可能性的,因为OOM带来的后果往往比较严重,系统OOM临时的解决办法一般只能是重启,可能会导致用户数据丢失等不可能挽回的问题,所以从编码设计阶段要采用尽可能稳妥的手段来避免这些问题。

2、采用redis和线程结合

这一次换了思路,采用redis来帮助我们做缓冲,从而避免消息过多OOM的问题。

相关redis zset api:

//添加元素
ZADD key score member [[score member] [score member] …]
//根据分值及限制数量查询
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
//从zset中删除指定成员
ZREM key member [member …]

我们采用redis基础数据结构的zset结构,采用score来存储我们目标发送时间的数值,整体处理流程如下:

  • 第一步数据存储:9:10分从kafka接收了一条a的订单消息,要求30分钟后进行发货通知,那我们就将当前时间加上30分钟然后转为时间戳作为a的score,key为a的订单号存入redis中。代码如下:
public void onMessage(String topic, String message) {
  String orderId;
		int delayTime = 0;
  try {
   Map<String, String> msgMap = gson.fromJson(message, new TypeToken<Map<String, String>>() {
   }.getType());
   if (msgMap.isEmpty()) {
    return;
   }
   LOGGER.info("onMessage kafka content:{}", msgMap.toString());
	 orderId = msgMap.get("orderId");
   if(StringUtils.isNotEmpty(orderId)){
    delayTime = Integer.parseInt(msgMap.get("delayTime"));
    Calendar calendar = Calendar.getInstance();
    //计算出预计发送时间
    calendar.add(Calendar.MINUTE, delayTime);
    long sendTime = calendar.getTimeInMillis();
    RedisUtils.getInstance().zetAdd(Constant.DELAY, sendTime, orderId);
    LOGGER.info("orderId:{}---放入redis中等待发送---sendTime:{}", ---orderId:{}, sendTime);
   }
  } catch (Exception e) {
   LOGGER.info("onMessage 延时发送异常:{}", e);
  }
 }
  • 第二步数据处理:另起一个线程具体调度时间根据业务需求来定,我这里3分钟执行一次,内部逻辑:从redis中取出一定量的zset数据,如何取呢,使用zset的zrangeByScore方法,根据数据的score进行排序,当然可以带上时间段,这里从0到现在,来进行消费,需要注意的一点是,在取出数据后我们需要用zrem方法将取出的数据从zset中删除,防止其他线程重复消费数据。在此之后进行接下来的发货通知等相关逻辑。代码如下:
public void run(){
  //获取批量大小
  int orderNum = Integer.parseInt(PropertyUtil.get(Constant.ORDER_NUM,"100"));
  try {
   //批量获取离发送时间最近的orderNum条数据
	 Calendar calendar = Calendar.getInstance();
	 long now = calendar.getTimeInMillis();
	 //获取无限早到现在的事件key(防止上次批量数量小于放入数量,存在历史数据未消费情况)
	 Set<String> orderIds = RedisUtils.getInstance().zrangeByScore(Constant.DELAY, 0, now, 0, orderNum);
	 LOGGER.info("task.getOrderFromRedis---size:{}---orderIds:{}", orderIds.size(), gson.toJson(orderIds));
   if (CollectionUtils.isNotEmpty(orders)){
    //删除key 防止重复发送
    for (String orderId : orderIds) {
     RedisUtils.getInstance().zrem(Constant.DELAY, orderId);
    }
	  //接下来执行发送等业务逻辑
   }
  } catch (Exception e) {
   LOGGER.warn("task.run exception:{}", e);
  }
 }

至此完成了依赖redis和线程完成了延时发送的功能。

结语

那么对上面两种不同的实现方式进行一下优缺点比较:

  • 第一种方式实现简单,不依赖外部组件,能够快速的实现目标功能,但缺点也很明显,需要在特定的场景下使用,如果是我这种消息量大的情况下使用很可能是有问题,当然在数据源消息不多的情况下不失为好的选择。
  • 第二种方式实现稍微复杂一点,但是能够适应消息量大的场景,采用redis的zset作为了“中间件”的效果,并且帮助我们进行延时的功能实现能够较好的适应高并发场景,缺点在于在编写的过程中需要考虑实际的因素较多,例如线程的执行周期时间,发送可能会有一定时间的延迟,批量数据大小的设置等等。

综上是本人这次延时功能的实现过程的两种实现方式的总结,具体采用哪种方式还需大家根据实际情况选择,希望能给大家带来帮助。ps:由于本人的技术能力有限,文章中可能出现技术描述不准确或者错误的情况恳请各位大佬指出,我立马进行改正,避免误导大家,谢谢!

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。

(0)

相关推荐

  • 64位Windows下安装Redis教程

    Redis对于Linux是官方支持的,安装和使用没有什么好说的,普通使用按照官方指导,5分钟以内就能搞定.详情请参考:http://redis.io/download 但有时候又想在windows下折腾下Redis,可以从redis下载页面看到如下提示: 复制代码 代码如下: Win64 Unofficial The Redis project does not directly support Windows,  however the Microsoft Open Tech group de

  • 超强、超详细Redis数据库入门教程

    [本教程目录] 1.redis是什么 2.redis的作者何许人也 3.谁在使用redis 4.学会安装redis 5.学会启动redis 6.使用redis客户端 7.redis数据结构 – 简介 8.redis数据结构 – strings 9.redis数据结构 – lists 10.redis数据结构 – 集合 11.redis数据结构 – 有序集合 12.redis数据结构 – 哈希 13.聊聊redis持久化 – 两种方式 14.聊聊redis持久化 – RDB 15.聊聊redis持

  • Redis操作命令总结

    一.key pattern 查询相应的key (1)redis允许模糊查询key 有3个通配符  *.?.[] (2)randomkey:返回随机key (3)type key:返回key存储的类型 (4)exists key:判断某个key是否存在 (5)del key:删除key (6)rename key newkey:改名 (7)renamenx key newkey:如果newkey不存在则修改成功 (8)move key 1:将key移动到1数据库 (9)ttl key:查询key的

  • redis常用命令小结

    1.redis-benchmark redis基准信息,redis服务器性能检测 redis-benchmark -h localhost -p 6379 -c 100 -n 100000 100个并发连接,100000个请求,检测host为localhost 端口为6379的redis服务器性能 [root@Architect redis-1.2.6]# redis-benchmark -h localhost -p 6379 -c 100 -n 100000 ====== PING ====

  • Redis中5种数据结构的使用场景介绍

    一.redis 数据结构使用场景 原来看过 redisbook 这本书,对 redis 的基本功能都已经熟悉了,从上周开始看 redis 的源码.目前目标是吃透 redis 的数据结构.我们都知道,在 redis 中一共有5种数据结构,那每种数据结构的使用场景都是什么呢? String--字符串 Hash--字典 List--列表 Set--集合 Sorted Set--有序集合 下面我们就来简单说明一下它们各自的使用场景: 1. String--字符串 String 数据结构是简单的 key-

  • Linux下Redis的安装和部署

    一.Redis介绍 Redis是当前比较热门的NOSQL系统之一,它是一个key-value存储系统.和Memcache类似,但很大程度补偿了Memcache的不足,它支持存储的value类型相对更多,包括string.list.set.zset和hash.这些数据类型都支持push/pop.add/remove及取交集并集和差集及更丰富的操作.在此基础上,Redis支持各种不同方式的排序. 和Memcache一样,Redis数据都是缓存在计算机内存中,不同的是,Memcache只能将数据缓存到

  • 利用Redis实现延时处理的方法实例

    背景 在开发中,往往会遇到一些关于延时任务的需求.例如 •生成订单30分钟未支付,则自动取消 •生成订单60秒后,给用户发短信 对上述的任务,我们给一个专业的名字来形容,那就是延时任务. 最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下. 实现过程 说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下

  • 利用Redis实现SQL伸缩的方法

    这篇文章主要介绍了利用Redis实现SQL伸缩的方法,包括讲到了锁和时间序列等方面来提升传统数据库的性能,需要的朋友可以参考下. 缓解行竞争 我们在Sentry开发的早起采用的是sentry.buffers. 这是一个简单的系统,它允许我们以简单的Last Write Wins策略来实现非常有效的缓冲计数器. 重要的是,我们借助它完全消除了任何形式的耐久性 (这是Sentry工作的一个非常可接受的方式). 操作非常简单,每当一个更新进来我们就做如下几步: 创建一个绑定到传入实体的哈希键(hash

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

  • 利用Redis实现SQL伸缩的方法简介

    缓解行竞争 我们在Sentry开发的早起采用的是sentry.buffers. 这是一个简单的系统,它允许我们以简单的Last Write Wins策略来实现非常有效的缓冲计数器. 重要的是,我们借助它完全消除了任何形式的耐久性 (这是Sentry工作的一个非常可接受的方式). 操作非常简单,每当一个更新进来我们就做如下几步: 创建一个绑定到传入实体的哈希键(hash key) 使用HINCRBY使计数器值增加 HSET所有的LWW数据(比如 "最后一次见到的") 用当前时间戳ZADD

  • Spring Boot利用JSR303实现参数验证的方法实例

    简介 JSR-303 是 JAVA EE 6 中的一项子规范,叫做 Bean Validation. 在任何时候,当你要处理一个应用程序的业务逻辑,数据校验是你必须要考虑和面对的事情.应用程序必须通过某种手段来确保输入进来的数据从语义上来讲是正确的.在通常的情况下,应用程序是分层的,不同的层由不同的开发人员来完成.很多时候同样的数据验证逻辑会出现在不同的层,这样就会导致代码冗余和一些管理的问题,比如说语义的一致性等.为了避免这样的情况发生,最好是将验证逻辑与相应的域模型进行绑定. Bean Va

  • JavaScript利用fetch实现异步请求的方法实例

    前言 相信大家应该都有所了解,在这个AJAX时代,如果想进行 API 等网络请求都是通过 XMLHttpRequest 或者封装后的框架进行网络请求. 现在产生的 fetch 框架简直就是为了提供更加强大.高效的网络请求而生,虽然在目前会有一点浏览器兼容的问题,但是当我们进行一些异步请求时,都可以使用 fetch 进行完美的网络请求.下面话不多说,来一起看看详细的介绍吧. 先来看看各个浏览器对fetch的原生支持情况,可以看到支持性并不是很高,safari在10.1 之后才支持,ios更是10.

  • Java利用反射实现框架类的方法实例

    框架类的简单实现 实现步骤: 1. 加载配置文件 2. 获取配置文件中定义的数据 3. 加载该类进内存 主要讲解第一步:加载配置文件 的相关知识. //1.加载配置文件 //1.1创建Properties对象 Properties pro = new Properties(); //1.2加载配置文件,转换为一个集合 //1.2.1获取class目录下的配置文件 ClassLoader classLoader = ReflectTest.class.getClassLoader(); Input

  • python3利用pathlib替代os.path的方法实例

    目录 前言 pathlib 库 pathlib 获取文件路径 Path.cwd 获取当前文件夹路径 获取当前文件路径 获取 Path 对象绝对路径 一些常用的获取文件属性 获取上层,上上层目录 获取用户home目录 判断文件,文件夹 is_file()判断是不是文件 is_dir() 判断是否是文件夹 exists() 判断文件 或文件夹是否存在 is_absolute() 判断是否是绝对路径 joinpath 拼接目录 iterdir()遍历文件目录 glob() 和 rglob() 模式匹配

  • node.js利用redis数据库缓存数据的方法

    一.运行redis Redis服务器默认使用6379端口 redis-server 自定义端口 redis-server –port 6390 客户端 redis-cli 指定ip和端口连接 redis-cli -h 127.0.0.1 -p 6390 测试客户端和服务器是否连通 ping 二.Nodejs连接redis 通过redis.createClient(port,host,options)来连接redis服务器 var redis = require("redis") var

  • Python3利用openpyxl读写Excel文件的方法实例

    前言 Python中常用的操作Excel的三方包有xlrd,xlwt和openpyxl等,xlrd支持读取.xls和.xlsx格式的Excel文件,只支持读取,不支持写入.xlwt只支持写入.xls格式的文件,不支持读取. openpyxl不支持.xls格式,但是支持.xlsx格式的读取写入,并且支持写入公式等. 原始数据文件apis.xlsx内容: name method url data json result get接口 get https://httpbin.org/get?a=1&b=

随机推荐