Redis 定长队列探索及实践

目录
  • 一、业务背景
  • 二、技术选型
  • 三、技术原理
    • 3.1 Lua 脚本
    • 3.2 List 对象
    • 3.3 Set 对象
  • 四、技术应用
    • 4.1 生产消息
    • 4.2 消费消息
    • 4.3 注意事项
  • 五、线上效果
  • 六、适用场景
  • 七、总结

一、业务背景

从技术的角度来说,技术方案的选型都是受限于实际的业务场景,都以解决实际业务场景为目标。

在我们的实际业务场景中,需要以游戏的维度收集和上报行为数据,考虑数据的量级,执行尽最大努力交付且允许数据的部分丢弃。

数据上报支持游戏的维度的批量上报,支持同一款游戏128个行为进行批量上报。

数据上报需要时效控制,上报的数据必须是上报时刻的前3分钟的数据。

整体数据的业务形态如下图所示:

二、技术选型

从业务的角度来说包含数据的收集和数据的上报,我们把数据的收集比作生产者,数据的上报比作消费者,是一个典型的生产消费模型。

生产消费模型在JVM进程内部通过队列+锁或者无锁的Disruptor来实现,在跨进程场景下通过MQ(RocketMQ/kafka)进行处理解耦。

但是细化到具体业务场景来看,消息的消费有诸多限制,包括:游戏维度的批量行为上报,行为上报的时效限制,细化到各个技术方案选型进行对比。

方案一

使用RocketMQ 或者Kafaka等消息队列来存储上报的消息,但是消费侧需要考虑在业务进程中按照游戏维度进行聚合,其中技术细节涉及按照游戏维度进行拆分,在满足消息时效性和批量性的前提下触发上报。在这种方案下消息中间件扮演的角色本质上消息的中转站,没有解决任何业务场景中提及的游戏维度拆分、批量性和时效性。

方案二

在方案一的基础上,寻求一种技术方案来解决游戏维度的消息分组、批量消费 、时效性。通过Redis的list结构来实现队列(进一步要求实现定长队列)来解决游戏维度的消息分组;通过Redis的list支持的Lrange来实现批量消费;通过业务侧的多线程来解决时效问题,针对高频的游戏使用单独的线程池进行处理,上述两个手段能够保证消费速度大于生产速度。

方案对比:

对比两种方案后决定使用Redis的实现了一个伪消息中间件:

通过List对象实现定长队列来保存游戏维度的行为消息(以游戏作为key的List对象来保存用户行为);

通过List来保存所有的存在行为数据的游戏列表;

通过Set来进行去重判断来保证2中的List对象的唯一性。

整体的技术方案如下图所示:

生产过程

步骤一:游戏维度的某行为数据PUSH到游戏维度的队列当中。

步骤二:判断游戏是否在游戏的集合Set中,如果在就直接返回,如果不在进行步骤三。

步骤三:往游戏列表中PUSH游戏。

消费过程

步骤一:从游戏对象的列表中循环取出一款游戏。

步骤二:通过步骤一获取的游戏对象去该游戏对象的行为数据队列中批量获取数据处理。

三、技术原理

在Redis的支持命令中,在List和Set的基础命令,结合Lua脚本来实现整个技术方案。

消息数据层面,通过单独的List循环维护待消费的游戏维度的数据,每个游戏维度使用定长的List来保存消息。

消息生产过程中,通过结合List的llen+lpop+rpush来实现游戏维度的定长队列,保证队列的长度可控。

消息消费过程中,通过结合List的lrange+ltrim来实现游戏维度的消息的批量消费。

在整个执行的复杂度层面,需要保证时间复杂度在0(N)常量维度,保证时间可控。

3.1 Lua 脚本

EVAL script numkeys key [key ...] arg [arg ...]
时间复杂度:取决于脚本本身的执行的时间复杂度。

> eval "return {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}" 2 key1 key2 first second
1) "key1"
2) "key2"
3) "first"
4) "second"

Redis uses the same Lua interpreter to run all the commands.
Also Redis guarantees that a script is executed in an atomic way:
no other script or Redis command will be executed while a script is being executed.
This semantic is similar to the one of MULTI / EXEC.
From the point of view of all the other clients the effects of a script are either still not visible or

Redis采用相同的Lua解释器去运行所有命令,我们可以保证,脚本的执行是原子性的。作用就类似于加了MULTI/EXEC。

  • Lua 脚本内多个命令以原子性的方式执行,保证了命令执行的线程安全。
  • Lua 脚本结合List命令实现定长队列,实现批量消费。
  • Lua 脚本仅支持单个key的操作,不支持多key的操作。

3.2 List 对象

LLEN key
计算List的长度
时间复杂度:O(1)。

LPOP key [count]
从List的左侧移除元素
时间复杂度:O(N),N为移除元素的个数。

RPUSH key element [element ...]
从List的右侧保存元素
时间复杂度:O(N),N为保存元素的个数。

  • List的基础命令包括计算List的长度,移除数据,添加数据,整体命令的复杂度都在O(N)的常量时间。
  • 整合上述三个命令,我们能保证实现固定长度的队列,通过判断队列长度是否达到定长结合新增队列元素和移除队列元素来完成。

LRANGE key start end
时间复杂度:O(S+N), S为偏移量start, N为指定区间内元素的数量。

下标(index)参数 start 和 stop 都以 0 为底,也就是说,以 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。
你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。

LTRIM key start stop
时间复杂度:O(N) where N is the number of elements to be removed by

  • List的基础命令包括批量返回数据和裁剪数据,整体命令的复杂度都在O(N)的常量时间。
  • 整合上述两个命令,我们能够批量消费数据并移除队列数据,通过LRANGE批量返回数据并通过LTRIM保留剩余数据。

3.3 Set 对象

SADD key member [member ...]
往Set集合添加数据。
时间复杂度:O(1)。

SISMEMBER key member
判断Set集合是否存在元素。
时间复杂度:O(1)。

四、技术应用

4.1 生产消息

/*定义LUA脚本*/
CACHE_NPPA_EVENT_LUA =
"local retVal = 0 " +
"local key = KEYS[1] " +
"local num = tonumber(ARGV[1]) " +
"local val = ARGV[2] " +
"local expire = tonumber(ARGV[3]) " +
"if (redis.call('llen', key) < num) then redis.call('rpush', key, val) " +
"else redis.call('lpop', key) redis.call('rpush', key, val) retVal = 1 end " +
"redis.call('expire', key, expire) return retVal";

/*执行LUA脚本*/
String data = JSON.toJSONString(nppaBehavior);
Long retVal = (Long)jedisClusterTemplate.eval(CACHE_NPPA_EVENT_LUA, 1, NPPA_PREFIX + nppaBehavior.getGamePackage(), String.valueOf(MAX_GAME_EVENT_PER_GAME), data, String.valueOf(NPPA_TTL_MINUTE * 60));

/*执行效果
实现固长队列的数据存储并设置过期时间
  • 通过整合llen+rpush+lpop三个命令实现定长队列。
  • 通过lua脚本保证上述命令的原子性执行。

  • 整体的执行流程如上图所示,核心理念通过lua脚本的原子性保证了队列长度计算(llen)、队列数据移除(lpop)、队列数据保存(rpush)的原子性执行。

4.2 消费消息

/*定义LUA脚本*/
QUERY_NPPA_EVENT_LUA =
"local data = {} " +
"local key = KEYS[1] " +
"local num = tonumber(ARGV[1]) " +
"data = redis.call('lrange', key, 0, num) redis.call('ltrim', key, num+1, -1) return data";

/*执行LUA脚本*/
Integer batchSize = NppaConfigUtils.getInteger("nppa.report.batch.size", 1);
Object result = jedisClusterTemplate.eval(QUERY_NPPA_EVENT_LUA, 1,NPPA_PREFIX + gamePackage, String.valueOf(batchSize));

/*执行效果
取固定数量的对象,然后保留队列的剩余的消息对象。
  • 通过整合lrange+ltrim两个命令实现消息的批量消费。
  • 通过lua脚本保证上述命令的原子性执行。

  • 整体的执行流程如上图所示,核心理念通过lua脚本的原子性保证了数据获取(Lrange)和数据裁剪(Ltrim)的原子性执行。
  • 整体的消费流程选择pull模式,通过多线程循环轮询可消费的队列进行消费。与借助于redis的pub/sub的通知机制实现消费流程的push模式相比,pull模式成本更低效果更佳。

4.3 注意事项

  • Redis集群模式下,执行Lua脚本建议传单key,多key会报重定向错误。
  • 在不同的Redis版本下,Lua脚本针对null的返回值处理不同,参考官方文档。
  • 消费者的消费过程中通过循环遍历游戏列表,然后根据游戏去获取对应的消息对象,但是不同的游戏对应的热度不同,所以在消费端我们通过配置的方式为热门游戏单独开启消费线程进行消费,相当于针对不同游戏配置不同优先级的消费者。

五、线上效果

  • 生产和消费的QPS约为1w qps左右,整体上报QPS通过批量上报后会远低于生产的消息生产和消费的QPS。
  • 整体数据的使用游戏包名作为key进行存储,性能上不存在热点的问题。

六、适用场景

在描述完方案的原理和实现细节之后,进一步对适用的业务场景进行下总结。整体方案是基于redis的基本数据结构构建一个伪消息队列,用以解决消息的单个生产批量消费的场景,通过多key形式实现消息队列的多Topic模式,重要的是能够借助于redis的原生能力在O(N)的时间复杂度完成批量消费。另外该方案也可以降级作为实现先进先出定长的日志队列。

七、总结

本文主要探索在特定业务场景下通过Redis的原生命令实现类MQ的功能,创新式的通过Lua脚本组合Redis的List的基础命令,实现了消息的分组,消息的定长队列,消息的批量消费功能;整体解决方案在线上环境落地并平稳运行,为特定场景提供了一种通用的解决方案。

到此这篇关于Redis 定长队列探索及实践的文章就介绍到这了,更多相关Redis 定长队列 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • redis用list做消息队列的实现示例

    目录 生产消息服务 消费消息服务,定时任务 日志 测试 leftPush消息入队,rightPop对应,消息出队. rightPop(RedisConstant.MQ_LIST, 0L, TimeUnit.SECONDS)阻塞出队,0表示永久阻塞 生产消息服务 @Service public class RedisService {     @Autowired     private RedisTemplate<String, String> redisTemplate;     publi

  • Java redisTemplate阻塞式处理消息队列

    目录 Redis 消息队列 redis五种数据结构 队列生产者 队列消费者 测试类 并发情况下使用increment递增 补充 Redis 消息队列 redis五种数据结构 队列生产者 package cn.stylefeng.guns.knowledge.modular.knowledge.schedule; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; i

  • Redis 使用 List 实现消息队列的优缺点

    目录 什么是消息队列 消息队列满足哪些特性 消息有序性 重复消息处理 可靠性 List 实现消息队列 LPUSH RPOP 实时消费问题 重复消费 消息可靠性 需要注意的是 Redission 实战 添加依赖 Java 代码实战 单元测试 总结 需要注意的是,我们要避免生产者过快,消费者过慢导致的消息堆积占用 Redis 的内存. 分布式系统中必备的一个中间件就是消息队列,通过消息队列我们能对服务间进行异步解耦.流量消峰.实现最终一致性. 目前市面上已经有 RabbitMQ.RochetMQ.A

  • Java实现Redis延时消息队列

    目录 什么是延时任务 延时任务的特点 实现思路: 代码实现 1.消息模型 2.RedisMq 消息队列实现类 3.消息生产者 4.消息消费者 5. 消息执接口 6. 任务类型的实现类:可以根据自己的情况去实现对应的队列需求 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务.举个例子,假设我们有个发布资讯的功能,运营需要在每天早上7点准时发布资讯,但是早上7点大家都还没上班,这个时候就可以使用延时任务来实现资讯的延时发布了.只要在前一天下班前指定第二天要发送资讯的时间,到了第二天

  • 分布式利器redis及redisson的延迟队列实践

    目录 前言碎语 延迟队列多种实现方式 redisson中的延迟队列实现 文末结语 前言碎语 首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了.这个需求如果不是准时通知,而是每天定点通知就简单了.如果需要准时通知就只能上延迟队列了.使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等. 延迟队列多种实现方式 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后

  • springboot整合redis之消息队列

    目录 一.项目准备 二.配置类 三.redis中list数据类型 定时器监听队列 运行即监控队列 四.发布/订阅模式 五.ZSet实现延迟队列 一.项目准备 依赖 <!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> &

  • Golang实现基于Redis的可靠延迟队列

    目录 前言 原理详解 pending2ReadyScript ready2UnackScript unack2RetryScript ack consume 前言 在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库.不过问题不大,没有轮子我们自己造. 本文的完整代码实现在hdt3213/delayqueue,可以直接 go get 安装使用. 使用有序集合结构实现延时队列的方法已经广为

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

  • Redis 定长队列探索及实践

    目录 一.业务背景 二.技术选型 三.技术原理 3.1 Lua 脚本 3.2 List 对象 3.3 Set 对象 四.技术应用 4.1 生产消息 4.2 消费消息 4.3 注意事项 五.线上效果 六.适用场景 七.总结 一.业务背景 从技术的角度来说,技术方案的选型都是受限于实际的业务场景,都以解决实际业务场景为目标. 在我们的实际业务场景中,需要以游戏的维度收集和上报行为数据,考虑数据的量级,执行尽最大努力交付且允许数据的部分丢弃. 数据上报支持游戏的维度的批量上报,支持同一款游戏128个行

  • 基于Redis实现延时队列的优化方案小结

    目录 一.延时队列的应用 二.延时队列的实现 三.总结 一.延时队列的应用 近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景: 在用户点击充值项后,半小时内未充值,向用户推送充值未完成提醒. 在用户最近一次阅读行为2小时后,向用户推送继续阅读提醒. 在用户新注册或退出应用N分钟后,向用户推送合适的推荐消息. … 上述场景的共同特征就是在某事件触发后延迟一定时间后再执行特定任务,若事件触发时间点可知,则上述逻辑也可等价于在

  • PHP实现基于Redis的MessageQueue队列封装操作示例

    本文实例讲述了PHP实现基于Redis的MessageQueue队列封装操作.分享给大家供大家参考,具体如下: Redis的链表List可以用来做链表,高并发的特性非常适合做分布式的并行消息传递. 项目地址:https://github.com/huyanping/Zebra-PHP-Framework 左进右出 $redis->lPush($key, $value); $redis->rPop($key); 以下程序已在生产环境中正式使用. 基于Redis的PHP消息队列封装 <?ph

  • SpringBoot集成Redis实现消息队列的方法

    list 原理说明 Redis 的 list 是按照插入顺序排序的字符串链表. 如图所示,可以通过 lpush 和 rpop 或者 rpush 和 lpop 实现消息队列. 1 lpush 和 rpop 2 rpush 和 lpop 消息队列功能实现 引入 Redis 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data

  • redis实现延时队列的两种方式(小结)

    背景 项目中的流程监控,有几种节点,需要监控每一个节点是否超时.按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢:2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟.所以就想到用延迟队列的方式去实现. 一,redis的过期key监控 1,开启过期key监听 在redis的配置里把这个注释去掉 notify-keyspace-events Ex 然后重启redis 2,使用redis过期监听实现延迟队列 继承KeyExpirationEventMess

  • 线程池之newFixedThreadPool定长线程池的实例

    newFixedThreadPool定长线程池的实例 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待.newFixedThreadPool固定线程池, 使用完毕必须手动关闭线程池, 否则会一直在内存中存在. 示例代码: public class ThreadPoolFixed { public static void main(String[] args) { //设置线程池大小为3 ExecutorService fixedThread

  • Python技巧之变长和定长序列拆分

    目录 1.元组拆分 2.字符串拆分 3.拆分时丢弃值 4.嵌套序列拆分 5.从任意长度的可迭代对象中拆分 Python中的任何序列(可迭代的对象)都可以通过赋值操作进行拆分,包括但不限于元组.列表.字符串.文件.迭代器.生成器等. 1.元组拆分 元组拆分是最为常见的一种拆分,示例如下: p = (4, 5) x, y = p print(x, y) # 4 5 如果写成 x, y, z = p 那么就会抛出ValueError异常: "not enough values to unpack (e

  • Redis实现延迟队列的全流程详解

    目录 1.前言 1.1.什么是延迟队列 1.2.应用场景 1.3.为什么要使用延迟队列 2.Redis sorted set 3.Redis 过期键监听回调 4.Quartz定时任务 5.DelayQueue 延迟队列 6.RabbitMQ 延时队列 7.时间轮 1.前言 1.1.什么是延迟队列 延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理.从某种意义上来讲

  • Java利用Redis实现消息队列的示例代码

    本文介绍了Java利用Redis实现消息队列的示例代码,分享给大家,具体如下: 应用场景 为什么要用redis? 二进制存储.java序列化传输.IO连接数高.连接频繁 一.序列化 这里编写了一个java序列化的工具,主要是将对象转化为byte数组,和根据byte数组反序列化成java对象; 主要是用到了ByteArrayOutputStream和ByteArrayInputStream; 注意:每个需要序列化的对象都要实现Serializable接口; 其代码如下: package Utils

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

随机推荐