详解redis是如何实现队列消息的ack

前言

由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列。

原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整。

大体思路为在POP时将pop出的数据放到备份的地方,当有ACK请求(确认消息被消耗)后将备份的信息删除掉;每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则PUSH到list中后再从list中POP出来。

以下脚本使用lua实现,只需要在执行前加载到redis中即可。

消息本身需要包含id属性

push没什么问题,原生即可(此处以LPUSH为例)

pop时脚本

local not_empty = function(x)
 return (type(x) == "table") and (not x.err) and (#x ~= 0)
end

local qName = ARGV[1] --队列名称
local currentTime = ARGV[2] --当前时间,这个需要从外部传入,不能使用redis自身时间,如果使用自身时间可能导致redis本身的backup在重放请求时出现不一致性
local considerAsFailMaxTimeSpan = ARGV[3] --超时时间设定,当消息超过一定时间还没有ack则认为此消息需要再次入队

local zsetName= qName ..'BACKUP'
local hashName= qName ..'CONTEXT'

local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1)
if (not_empty(tmp)) then
 redis.call('ZREM', zsetName, tmp[1]) --此处拿出的为消息的唯一id
 redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp[1]))
end
tmp = redis.call('RPOP', qName)
if (tmp) then
 local msg = cjson.decode(tmp)
 local id = msg['id']
 redis.call('ZADD', zsetName, tonumber(currentTime), id)
 redis.call('HSET',hashName , id, tmp)
end
return tmp

ack时候比较简单,只需要将指定id从set和hash中删除即可

 local key = ARGV[1]
 local qName=ARGV[2]
 redis.call('ZREM', qName..'BACKUP', key)
 redis.call('HDEL', qName..'CONTEXT', key)

在程序中使用前需要显示load这两个脚本,后面直接调用这两个脚本的sha值即可执行。

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对我们的支持。

(0)

相关推荐

  • 详解Redis用链表实现消息队列

    前言 Redis链表经常会被用于消息队列的服务,以完成多程序之间的消息交换.个人认为redis消息队列有一个好处,就是可以实现分布式和共享,就和memcache作为mysql的缓存和mysql自带的缓存一样. 链表实现消息队列 Redis链表支持前后插入以及前后取出,所以如果往尾部插入元素,往头部取出元素,这就是一种消息队列,也可以说是消费者/生产者模型.可以利用lpush和rpop来实现.但是有一个问题,如果链表中没有数据,那么消费者将要在while循环中调用rpop,这样以来就浪费cpu资源

  • PHP使用php-resque库配合Redis实现MQ消息队列的教程

    消息队列处理后台任务带来的问题 项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,显然会有更好的用户体验. 为了实现类似的需求,Web项目中一般的实现方法是使用消息队列(Message Queue),比如MemcacheQ,RabbitMQ等等,都是很著名的产品. 消息队列说白了就是一个最简单的先进先出队列,队列的一个成员就是一段文本.正是因为消息队列实在太简单了,当拿着消息

  • 详解redis是如何实现队列消息的ack

    前言 由于公司提供的队列实在太过于蛋疼而且还限制不能使用其他队列,但为了保证数据安全性需要一个可以有ack功能的队列. 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列)做个小小的调整. 大体思路为在POP时将pop出的数据放到备份的地方,当有ACK请求(确认消息被消耗)后将备份的信息删除掉:每次在pop前需要检查备份队列中有没有过期的数据没有ack的,如果有则PUSH到list中后再从li

  • 详解Redis中的List类型

    本系列将和大家分享Redis分布式缓存,本章主要简单介绍下Redis中的List类型,以及如何使用Redis解决博客数据分页.生产者消费者模型和发布订阅等问题. Redis List的实现为一个双向链表,即可以支持反向查找和遍历,更方便操作,不过带来了部分额外的内存开销,Redis内部的很多实现,包括发送缓冲队列等也都是用这个数据结构. List类型主要用于队列和栈,先进先出,后进先出等. 存储形式:key--LinkList<value> 首先先给大家Show一波Redis中与List类型相

  • 详解Java中的延时队列 DelayQueue

    当用户超时未支付时,给用户发提醒消息.另一种场景是,超时未付款,订单自动取消.通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行.其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描.这里我们用延时队列来做.RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列. Queue 队列是一种集合.除了基本的集合操作以外,队列还提供了额外的插入.提取和检查操作.队列的每个方法都以两种形式存在:一

  • 详解Redis基本命令与使用场景

    Redis和Memcached对比 其中有一个比较重要的区别是关于其提供的数据结构区别 Memcached 在其数据结构中仅使用字符串和整数.因此,您保存的所有内容都可以是字符串或整数.它很复杂,因为对于整数,您可以做的唯一数据操作是添加或减去它们.如果需要保存数组或对象,则必须先将它们序列化然后保存.要阅读它们,您需要取消序列化. Redis 具有更强大的数据结构,它不仅可以处理字符串整数,还可以处理二进制安全字符串,二进制安全字符串列表,二进制安全字符串集和有序集. 关于Redis的数据结构

  • 详解玩转直播系列之消息模块演进

    目录 一.背景 二.直播消息业务 2.1.主播与用户 2.2.房间号 2.3.消息类型划分 2.4.消息优先级 三.消息技术点 3.1.消息架构模型 3.2.短轮询 VS 长链接 3.2.1.短轮询 3.2.2.长连接 3.2.3.直播间IM消息分发 3.3.消息丢弃 四.写在最后 一.背景 即时消息(IM)系统是直播系统重要的组成部分,一个稳定的,有容错的,灵活的,支持高并发的消息模块是影响直播系统用户体验的重要因素.IM长连接服务在直播系统有发挥着举足轻重的作用. 在目前大部分主流的直播业务

  • 详解Redis数据类型实现原理

    目录 1. 对象的类型与编码 ① type属性 ② encoding 属性和 *prt 指针 2. 字符串对象 ① 编码 ② 编码的转换 3. 列表对象 ① 编码 ② 编码转换 4. 哈希对象 ① 编码 ② 编码转换 5. 集合对象 ① 编码 ② 编码转换 6. 有序集合对象 ① 编码 ② 编码转换 7. 五大数据类型的应用场景 1. 对象的类型与编码 Redis使用前面说的五大数据类型来表示键和值,每次在Redis数据库中创建一个键值对时,至少会创建两个对象,一个是键对象,一个是值对象,而Re

  • 一文详解Redis中的持久化

    目录 1. 前言 2. RDB 2.1 手动触发 2.2 自动触发 3. bgsave大致流程 4. RDB持久化方式的优缺点 5. AOF 6. AOF的使用方式 7. AOF流程剖析 7.1 命令写入 7.2 文件同步 7.3 重写机制 7.4 重启加载 8. 问题定位与优化 8.1 关于fork操作 8.2 关于子进程开销 8.3 关于AOF追加阻塞 1. 前言 为什么要进行持久化?:持久化功能有效地避免因进程退出造成的数据丢失问题,当下次重启时利用之前持久化的文件即可实现数据恢复. 持久

  • 详解Redis的慢查询日志

    Redis慢查询日志帮助开发和运维人员定位系统存在的慢操作.慢查询日志就是系统在命令执行前后计算每条命令的执行时间,当超过预设阀值,就将这条命令的相关信息(慢查询ID,发生时间戳,耗时,命令的详细信息)记录下来. Redis客户端一条命令分为如下四部分执行: 需要注意的是,慢查询日志只是统计步骤3)执行命令的时间,所以慢查询并不代表客户端没有超时问题.需要注意的是,慢查询日志只是统计步骤3)执行命令的时间,所以慢查询并不代表客户端没有超时问题. 一.慢查询的配置参数: 慢查询的预设阀值 slow

  • 详解redis中的锁以及使用场景

    分布式锁 什么是分布式锁? 分布式锁是控制分布式系统之间同步访问共享资源的一种方式. 为什么要使用分布式锁? ​ 为了保证共享资源的数据一致性. 什么场景下使用分布式锁? ​ 数据重要且要保证一致性 如何实现分布式锁? 主要介绍使用redis来实现分布式锁 redis事务 redis事务介绍: ​ 1.redis事务可以一次执行多个命令,本质是一组命令的集合. ​ 2.一个事务中的所有命令都会序列化,按顺序串行化的执行而不会被其他命令插入 ​ **作用:**一个队列中,一次性.顺序性.排他性的执

  • 详解Redis中key的命名规范和值的命名规范

    数据库中得热点数据key命名惯例 表名:主键名:主键值:字段名 例如 user:id:0001:name 例如 user:id:0002:name 例如 order:id:s2002:price 上面的key对应的值则可以是 存放的方式 key value 优点 单独的key:value形式 order:id:s2002:price 2000 方便简单的操作,例如incr自增或自减 json格式 user:id:0001 {id:0001,name:"张三"} 方便一次性存和取数据,但

随机推荐