Go+Redis实现延迟队列实操

目录
  • 前言
  • 简单的实现
    • 定义消息
    • Push
    • Consume
    • 存在的问题
  • 多消费者实现
    • 定义消息
    • Push
    • Consume
    • 存在的问题
  • 总结

前言

延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消息的场景,比如延迟60秒发送短信,延迟30分钟关闭订单,消息消费失败延迟重试等等。

一般我们实现延迟消息都需要依赖底层的有序结构,比如堆,而Redis刚好提供了zset这种数据类型,它的底层实现是哈希表+跳表,也是一种有序的结构,所以这篇文章主要是使用Go+Redis来实现延迟队列。

当然Redis本身并不支持延迟队列,所以我们只是实现一个比较简单的延迟队列,而且Redis不太适合大量消息堆积,所以只适合比较简单的场景,如果需要更加强大稳定的消息队列,可以使用RocketMQ等自带延迟消息的消息队列。

我们这里先定一下我们要实现的几个目标:

  • 消息必须至少被消费一次
  • 多个生产者
  • 多个消费者

然后我们定义一个简单的接口:

  • Push(msg) error:添加消息到队列
  • Consume(topic, batchSize, func(msg) error):消费消息

简单的实现

  • 每个主题最多可以被一个消费者消费,因为不会对主题进行分区
  • 但是可以多个生产者同时进行生产,因为Push操作是原子的
  • 同时需要消费操作返回值error为nil才删除消息,保证消息至少被消费一次

定义消息

这个消息参考了Kafka的消息结构:

  • Topic可以是某个队列的名字
  • Key是消息的唯一标识,在一个队列里面不可以重复
  • Body是消息的内容
  • Delay是消息的延迟时间
  • ReadyTime是消息准备好执行的时间
// Msg 消息
type Msg struct {
   Topic     string        // 消息的主题
   Key       string        // 消息的Key
   Body      []byte        // 消息的Body
   Delay     time.Duration // 延迟时间(秒)
   ReadyTime time.Time     // 消息准备好执行的时间(now + delay)
}

Push

由于我们需要把消息的Body存储到Hash,把消息的ReadyTime存储到ZSet,所以我们需要一个简单的Lua脚本来保证这两个操作是原子的

同时我们不会覆盖已经存在的相同Key的消息。

const delayQueuePushRedisScript = `
-- KEYS[1]: topicZSet
-- KEYS[2]: topicHash
-- ARGV[1]: 消息的Key
-- ARGV[2]: 消息的Body
-- ARGV[3]: 消息准备好执行的时间

local topicZSet = KEYS[1]
local topicHash = KEYS[2]
local key = ARGV[1]
local body = ARGV[2]
local readyTime = tonumber(ARGV[3])

-- 添加readyTime到zset
local count = redis.call("zadd", topicZSet, readyTime, key)
-- 消息已经存在
if count == 0 then
   return 0
end
-- 添加body到hash
redis.call("hsetnx", topicHash, key, body)
return 1
`
func (q *SimpleRedisDelayQueue) Push(ctx context.Context, msg *Msg) error {
   // 如果设置了ReadyTime,就使用RedisTime
   var readyTime int64
   if !msg.ReadyTime.IsZero() {
      readyTime = msg.ReadyTime.Unix()
   } else {
      // 否则使用Delay
      readyTime = time.Now().Add(msg.Delay).Unix()
   }
   success, err := q.pushScript.Run(ctx, q.client, []string{q.topicZSet(msg.Topic), q.topicHash(msg.Topic)},
      msg.Key, msg.Body, readyTime).Bool()
   if err != nil {
      return err
   }
   if !success {
      return ErrDuplicateMessage
   }
   return nil
}

Consume

其中第二个参数batchSize表示用于批量获取已经准备好执行的消息,减少网络请求。

fn是对消息进行处理的函数,它有一个返回值error,如果是nil才表示消息消费成功,然后调用删除脚本把成功消费的消息给删除(需要原子的删除ZSet和Hash里面的内容)。

const delayQueueDelRedisScript = `
-- KEYS[1]: topicZSet
-- KEYS[2]: topicHash
-- ARGV[1]: 消息的Key

local topicZSet = KEYS[1]
local topicHash = KEYS[2]
local key = ARGV[1]

-- 删除zset和hash关于这条消息的内容
redis.call("zrem", topicZSet, key)
redis.call("hdel", topicHash, key)
return 1
`
func (q *SimpleRedisDelayQueue) Consume(topic string, batchSize int, fn func(msg *Msg) error) {
   for {
      // 批量获取已经准备好执行的消息
      now := time.Now().Unix()
      zs, err := q.client.ZRangeByScoreWithScores(context.Background(), q.topicZSet(topic), &redis.ZRangeBy{
         Min:   "-inf",
         Max:   strconv.Itoa(int(now)),
         Count: int64(batchSize),
      }).Result()
      // 如果获取出错或者获取不到消息,则休眠一秒
      if err != nil || len(zs) == 0 {
         time.Sleep(time.Second)
         continue
      }
      // 遍历每个消息
      for _, z := range zs {
         key := z.Member.(string)
         // 获取消息的body
         body, err := q.client.HGet(context.Background(), q.topicHash(topic), key).Bytes()
         if err != nil {
            continue
         }

         // 处理消息
         err = fn(&Msg{
            Topic:     topic,
            Key:       key,
            Body:      body,
            ReadyTime: time.Unix(int64(z.Score), 0),
         })
         if err != nil {
            continue
         }

         // 如果消息处理成功,删除消息
         q.delScript.Run(context.Background(), q.client, []string{q.topicZSet(topic), q.topicHash(topic)}, key)
      }
   }
}

存在的问题

如果多个线程同时调用Consume函数,那么多个线程会拉取相同的可执行的消息,造成消息重复的被消费。

多消费者实现

  • 每个主题最多可以被分区个数个消费者消费,会对主题进行分区

定义消息

  • 我们添加了一个Partition字段表示消息的分区号
// Msg 消息
type Msg struct {
   Topic     string        // 消息的主题
   Key       string        // 消息的Key
   Body      []byte        // 消息的Body
   Partition int           // 分区号
   Delay     time.Duration // 延迟时间(秒)
   ReadyTime time.Time     // 消息准备好执行的时间
}

Push

代码与SimpleRedisDelayQueue的Push相同,只是我们会使用Msg里面的Partition字段对主题进行分区。

func (q *PartitionRedisDelayQueue) topicZSet(topic string, partition int) string {
   return fmt.Sprintf("%s:%d:z", topic, partition)
}

func (q *PartitionRedisDelayQueue) topicHash(topic string, partition int) string {
   return fmt.Sprintf("%s:%d:h", topic, partition)
}

Consume

代码与SimpleRedisDelayQueue的Consume相同,我们也只是对Consume多加了一个partition参数用于指定消费的分区。

func (q *PartitionRedisDelayQueue) Consume(topic string, batchSize, partition int, fn func(msg *Msg) error) {
    // ...
}

存在的问题

一个比较大的问题就是我们需要手动指定分区而不是自动分配分区,这个问题对于Push操作解决起来比较容易,可以通过哈希算法对Key进行哈希取模进行分区,比如murmur3。但是对于Consume就比较复杂,因为我们必须记录哪个分区已经被消费者消费了。如果真的需要更加复杂的场景还是建议使用RocketMQKafka等消息队列进行实现。

总结

  • 使用Redis的ZSet可以很容易的实现一个高性能消息队列
  • 但是Redis的ZSet实现的消息队列不适合大量消息堆积的场景,同时如果需要实现自动分区消费功能会比较复杂
  • 适合消息量不是很大,且不是很复杂的场景
  • 如果需要大量堆积消息和稳定的多消费者功能,可以使用自带延迟消息的RocketMQ

到此这篇关于Go+Redis实现延迟队列实操的文章就介绍到这了,更多相关Go 延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Redis延迟队列和分布式延迟队列的简答实现

    最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redis延迟队列和分布式延迟队列的简单实现. 在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列.我们本文的梗概如下,同学们可以选择性阅读. 1. 实现一个简单的延迟队列. 我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图.DelayQueue实现了Delay.BlockingQue

  • 基于Redis延迟队列的实现代码

    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户. 2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单. 解决方案分析 方案一: 采用通过定时任务采用数据库/非关系型数据库轮询方案. 优点: 1. 实现简单,对于项目前期这样是最容易的解决方案. 缺点: 1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询. 2. 服务资源浪费,因为轮询需

  • 百行代码实现基于Redis的可靠延迟队列

    目录 原理详解 pending2ReadyScript ready2UnackScript unack2RetryScript ack consume 在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库.不过问题不大,没有轮子我们自己造

  • Golang实现基于Redis的可靠延迟队列

    目录 前言 原理详解 pending2ReadyScript ready2UnackScript unack2RetryScript ack consume 前言 在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库.不过问题不大,没有轮子我们自己造. 本文的完整代码实现在hdt3213/delayqueue,可以直接 go get 安装使用. 使用有序集合结构实现延时队列的方法已经广为

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

  • 分布式利器redis及redisson的延迟队列实践

    目录 前言碎语 延迟队列多种实现方式 redisson中的延迟队列实现 文末结语 前言碎语 首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了.这个需求如果不是准时通知,而是每天定点通知就简单了.如果需要准时通知就只能上延迟队列了.使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等. 延迟队列多种实现方式 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后

  • php使用redis的有序集合zset实现延迟队列应用示例

    本文实例讲述了php使用redis的有序集合zset实现延迟队列.分享给大家供大家参考,具体如下: 延迟队列就是个带延迟功能的消息队列,相对于普通队列,它可以在指定时间消费掉消息. 延迟队列的应用场景: 1.新用户注册,10分钟后发送邮件或站内信. 2.用户下单后,30分钟未支付,订单自动作废. 我们通过redis的有序集合zset来实现简单的延迟队列,将消息数据序列化,作为zset的value,把消息处理时间作为score,每次通过zRangeByScore获取一条消息进行处理. <?php

  • Go+Redis实现延迟队列实操

    目录 前言 简单的实现 定义消息 Push Consume 存在的问题 多消费者实现 定义消息 Push Consume 存在的问题 总结 前言 延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消息的场景,比如延迟60秒发送短信,延迟30分钟关闭订单,消息消费失败延迟重试等等. 一般我们实现延迟消息都需要依赖底层的有序结构,比如堆,而Redis刚好提供了zset这种数据类型,它的底层实现是哈希表+跳表,也是一种有序的结构,所以这篇文章主要是使用Go+Redis来实现延迟队列. 当然R

  • Redis实现延迟队列的全流程详解

    目录 1.前言 1.1.什么是延迟队列 1.2.应用场景 1.3.为什么要使用延迟队列 2.Redis sorted set 3.Redis 过期键监听回调 4.Quartz定时任务 5.DelayQueue 延迟队列 6.RabbitMQ 延时队列 7.时间轮 1.前言 1.1.什么是延迟队列 延时队列相比于普通队列最大的区别就体现在其延时的属性上,普通队列的元素是先进先出,按入队顺序进行处理,而延时队列中的元素在入队时会指定一个延迟时间,表示其希望能够在经过该指定时间后处理.从某种意义上来讲

  • Redis优雅地实现延迟队列的方法分享

    目录 前言 使用 依赖配置 配置文件 demo代码 执行效果 原理分析 队列创建 生产者 消费者 整个流程 总结思考 前言 工作中常常会遇到这样的场景,如订单到期未支付取消,到期自动续费等,我们发现延迟队列非常适合在这样的场景中使用.常见的延迟队列的优秀实现有rabbitMQ的死信队列,RocketMQ的延迟队列等,但是了有时候项目没有特别的大,没有引入类似的消息中间件,但是了又遇到了特别适合使用延迟队列的场景,我们一般会利用已有的redis实现一个简陋的延迟队列.常见的实现方式有监听过期key

  • JAVA 实现延迟队列的方法

    延迟队列的需求各位应该在日常开发的场景中经常碰到.比如: 用户登录之后5分钟给用户做分类推送: 用户多少天未登录给用户做召回推送: 定期检查用户当前退款账单是否被商家处理等等场景. 一般这种场景和定时任务还是有很大的区别,定时任务是你知道任务多久该跑一次或者什么时候只跑一次,这个时间是确定的.延迟队列是当某个事件发生的时候需要延迟多久触发配套事件,引子事件发生的时间不是固定的. 业界目前也有很多实现方案,单机版的方案就不说了,现在也没有哪个公司还是单机版的服务,今天我们一一探讨各种方案的大致实现

随机推荐