Redis优雅地实现延迟队列的方法分享

目录
  • 前言
  • 使用
    • 依赖配置
    • 配置文件
    • demo代码
    • 执行效果
  • 原理分析
    • 队列创建
    • 生产者
    • 消费者
    • 整个流程
  • 总结思考

前言

工作中常常会遇到这样的场景,如订单到期未支付取消,到期自动续费等,我们发现延迟队列非常适合在这样的场景中使用。常见的延迟队列的优秀实现有rabbitMQ的死信队列,RocketMQ的延迟队列等,但是了有时候项目没有特别的大,没有引入类似的消息中间件,但是了又遇到了特别适合使用延迟队列的场景,我们一般会利用已有的redis实现一个简陋的延迟队列。常见的实现方式有监听过期key,使用zset利用分值进行一个匹配,但是了这些实现或多或少有些问题,不够优雅。监听过期key是一种危险行为,一是如果过redis中key数量较大监听过期key可能导致服务负载异常,二是redis中key过期后key是惰性删除的,因此监听机制需要主动触发。利用zset分值实现呢,需要自己开发代码处理定时轮训以及key删除的逻辑,具有一定的工作量和复杂度。哪有没有一种优雅的redis延迟队列的实现呢?

Redisson是Redis服务器上的分布式可伸缩Java数据结构----驻内存数据网格(In-Memory Data Grid,IMDG)。底层使用netty框架,并提供了与java对象相对应的分布式对象、分布式集合、分布式锁和同步器、分布式服务等一系列的Redisson的分布式对象。为我们提供了许多开箱即用的功能。今天介绍Redisson实现的优雅的延迟队列。

使用

依赖配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.homeey</groupId>
    <artifactId>redis-delay-queue</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>redis-delay-queue</name>
    <description>redis-delay-queue</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.19.3</version>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-data-23</artifactId>
            <version>3.19.3</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

备注:处理redisson和springboot兼容性问题

配置文件

springboot整合redisson有三种方式

  • 第一种:通用的redis配置+redisson的自动配置[最简单]
  • 第二种:使用单独的redisson配置文件
  • 第三种:使用spring.redis.redisson这个配置key下进行配置

详细的整合查看 springboot整合redisson配置

spring:
  redis:
    database: 0
    host: localhost
    port: 6379
    timeout: 10000
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        min-idle: 0
        max-idle: 8

demo代码

package com.homeey.redisdelayqueue.delay;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 明天的你会因今天到的努力而幸运
 *
 * @author jt4mrg@qq.com
 * 23:11 2023-02-19 2023
 **/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedissonDelayQueue {

    private final RDelayedQueue<String> delayedQueue;
    private final RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        ExecutorService executorService = Executors.newFixedThreadPool(1);
        executorService.submit(() -> {
            while (true) {
                try {
                    String task = blockingQueue.take();
                    log.info("rev delay task:{}", task);
                } catch (Exception e) {
                    log.error("occur error", e);
                }
            }
        });
    }

    public void offerTask(String task, long seconds) {
        log.info("add delay task:{},delay time:{}s", task, seconds);
        delayedQueue.offer(task, seconds, TimeUnit.SECONDS);
    }

    @Configuration
    static class RedissonDelayQueueConfigure {

        @Bean
        public RBlockingQueue<String> blockingQueue(RedissonClient redissonClient) {
            return redissonClient.getBlockingQueue("TOKEN-RENEWAL");
        }

        @Bean
        public RDelayedQueue<String> delayedQueue(RBlockingQueue<String> blockingQueue,
                                                  RedissonClient redissonClient) {
            return redissonClient.getDelayedQueue(blockingQueue);
        }
    }
}

执行效果

原理分析

RedissonDelayedQueue实现中我们看到有四个角色

  • redisson_delay_queue_timeout:xxx,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
  • redisson_delay_queue:xxx,list数据类型,暂时没发现什么用,只是在提交任务时会写入这里面,队列转移时又会删除里面的元素
  • xxx:list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
  • redisson_delay_queue_channel:xxx,是一个channel,用来通知客户端开启一个延迟任务

队列创建

RedissonDelayedQueue延迟队列创建时,指定了队列转移服务,以及实现延迟队列的四个重要校色的key。核心代码是指定队列转移任务

 QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {

            @Override
            protected RFuture<Long> pushTaskAsync() {
                return commandExecutor.evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
                        "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); "//拿到zset中过期的值列表
                      + "if #expiredValues > 0 then " //如果有
                          + "for i, v in ipairs(expiredValues) do "
                              + "local randomId, value = struct.unpack('dLc0', v);"//解构消息,在提交任务时打包的消息
                              + "redis.call('rpush', KEYS[1], value);" //放入无前缀的list 队头
                              + "redis.call('lrem', KEYS[3], 1, v);"//移除带前缀list 队尾元素
                          + "end; "
                          + "redis.call('zrem', KEYS[2], unpack(expiredValues));" //移除zset中本次读取的过期元素
                      + "end; "
                        // get startTime from scheduler queue head task
                      + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); "//取zset最小分值的元素
                      + "if v[1] ~= nil then "
                         + "return v[2]; " //返回分值,即过期时间
                      + "end "
                      + "return nil;",
                      Arrays.asList(getRawName(), timeoutSetName, queueName),
                      System.currentTimeMillis(), 100);
            }

            @Override
            protected RTopic getTopic() {
                return RedissonTopic.createRaw(LongCodec.INSTANCE, commandExecutor, channelName);
            }
        };

生产者

核心代码RedissonDelayedQueue#offerAsync

 return commandExecutor.evalWriteNoRetryAsync(getRawName(), codec, RedisCommands.EVAL_VOID,
                "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" //打包消息体:消息id,消息长度,消息值
              + "redis.call('zadd', KEYS[2], ARGV[1], value);"//zset中加入消息及其超时分值
              + "redis.call('rpush', KEYS[3], value);" //向带前缀的list中添加消息
              // if new object added to queue head when publish its startTime
              // to all scheduler workers
              + "local v = redis.call('zrange', KEYS[2], 0, 0); "//取出zset中第一个元素
              + "if v[1] == value then " //如果最快过期的元素就是这次发送的消息
                 + "redis.call('publish', KEYS[4], ARGV[1]); " //channel中发布一下超时时间
              + "end;",
              Arrays.asList(getRawName(), timeoutSetName, queueName, channelName),
              timeout, randomId, encode(e));

消费者

消费者最简单,直接从不带前缀的list中BLPOP读取就可以

整个流程

总结思考

Lua是redis的好朋友,我们可以看到Redisson实现延迟队列时,大量使用到lua脚本,因Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务。我们在平时开发时有多个redis命令操作的有简单的业务逻辑,不妨尝试一下lua脚本的方式,可以避免使用分布式锁来保障一致性。

Redisson的源码值得一读,有很多新东西值得学习,如果其用到的netty基于时间轮算法的定时任务调度,可以让我们基于此实现自己的任务调度框架,也让我有了去探究这种实现方式和基于ScheduledThreadPoolExecutor的定时调度的差异及各自优劣的欲望。

以上就是Redis优雅地实现延迟队列的方法分享的详细内容,更多关于Redis延迟队列的资料请关注我们其它相关文章!

(0)

相关推荐

  • Redis延迟队列和分布式延迟队列的简答实现

    最近,又重新学习了下Redis,Redis不仅能快还能慢,简直利器,今天就为大家介绍一下Redis延迟队列和分布式延迟队列的简单实现. 在我们的工作中,很多地方使用延迟队列,比如订单到期没有付款取消订单,制订一个提醒的任务等都需要延迟队列,那么我们需要实现延迟队列.我们本文的梗概如下,同学们可以选择性阅读. 1. 实现一个简单的延迟队列. 我们知道目前JAVA可以有DelayedQueue,我们首先开一个DelayQueue的结构类图.DelayQueue实现了Delay.BlockingQue

  • 基于Redis延迟队列的实现代码

    使用场景 工作中大家往往会遇到类似的场景: 1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户. 2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单. 解决方案分析 方案一: 采用通过定时任务采用数据库/非关系型数据库轮询方案. 优点: 1. 实现简单,对于项目前期这样是最容易的解决方案. 缺点: 1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询. 2. 服务资源浪费,因为轮询需

  • 百行代码实现基于Redis的可靠延迟队列

    目录 原理详解 pending2ReadyScript ready2UnackScript unack2RetryScript ack consume 在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库.不过问题不大,没有轮子我们自己造

  • Redisson延迟队列执行流程源码解析

    目录 引言 demo示例 SUBSCRIBE指令 zrangebyscore和zrange指令 BLPOP指令 最后定时器源码解析 总结: 引言 在实际分布式项目中延迟任务一般不会使用JDK自带的延迟队列,因为它是基于JVM内存存储,没有持久化操作,所以当服务重启后就会丢失任务. 在项目中可以使用MQ死信队列或redisson延迟队列进行处理延迟任务,本篇文章将讲述redisson延迟队列的使用demo和其执行源码. demo示例 通过脚手架创建一个简易springboot项目,引入rediss

  • SpringBoot集成Redisson实现延迟队列的场景分析

    使用场景 1.下单成功,30分钟未支付.支付超时,自动取消订单 2.订单签收,签收后7天未进行评价.订单超时未评价,系统默认好评 3.下单成功,商家5分钟未接单,订单取消 4.配送超时,推送短信提醒 ...... 对于延时比较长的场景.实时性不高的场景,我们可以采用任务调度的方式定时轮询处理.如:xxl-job 今天我们采用一种比较简单.轻量级的方式,使用 Redis 的延迟队列来进行处理.当然有更好的解决方案,可根据公司的技术选型和业务体系选择最优方案.如:使用消息中间件Kafka.Rabbi

  • Redis优雅地实现延迟队列的方法分享

    目录 前言 使用 依赖配置 配置文件 demo代码 执行效果 原理分析 队列创建 生产者 消费者 整个流程 总结思考 前言 工作中常常会遇到这样的场景,如订单到期未支付取消,到期自动续费等,我们发现延迟队列非常适合在这样的场景中使用.常见的延迟队列的优秀实现有rabbitMQ的死信队列,RocketMQ的延迟队列等,但是了有时候项目没有特别的大,没有引入类似的消息中间件,但是了又遇到了特别适合使用延迟队列的场景,我们一般会利用已有的redis实现一个简陋的延迟队列.常见的实现方式有监听过期key

  • JAVA 实现延迟队列的方法

    延迟队列的需求各位应该在日常开发的场景中经常碰到.比如: 用户登录之后5分钟给用户做分类推送: 用户多少天未登录给用户做召回推送: 定期检查用户当前退款账单是否被商家处理等等场景. 一般这种场景和定时任务还是有很大的区别,定时任务是你知道任务多久该跑一次或者什么时候只跑一次,这个时间是确定的.延迟队列是当某个事件发生的时候需要延迟多久触发配套事件,引子事件发生的时间不是固定的. 业界目前也有很多实现方案,单机版的方案就不说了,现在也没有哪个公司还是单机版的服务,今天我们一一探讨各种方案的大致实现

  • 分布式利器redis及redisson的延迟队列实践

    目录 前言碎语 延迟队列多种实现方式 redisson中的延迟队列实现 文末结语 前言碎语 首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了.这个需求如果不是准时通知,而是每天定点通知就简单了.如果需要准时通知就只能上延迟队列了.使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等. 延迟队列多种实现方式 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后

  • Redis实现唯一计数的3种方法分享

    唯一计数是网站系统中十分常见的一个功能特性,例如网站需要统计每天访问的人数 unique visitor (也就是 UV).计数问题很常见,但解决起来可能十分复杂:一是需要计数的量可能很大,比如大型的站点每天有数百万的人访问,数据量相当大:二是通常还希望扩展计数的维度,比如除了需要每天的 UV,还想知道每周或每月的 UV,这样导致计算十分复杂. 在关系数据库存储的系统里,实现唯一计数的方法就是 select count(distinct <item_id>),它十分简单,但是如果数据量很大,这

  • Go+Redis实现延迟队列实操

    目录 前言 简单的实现 定义消息 Push Consume 存在的问题 多消费者实现 定义消息 Push Consume 存在的问题 总结 前言 延迟队列是一种非常使用的数据结构,我们经常有需要延迟推送处理消息的场景,比如延迟60秒发送短信,延迟30分钟关闭订单,消息消费失败延迟重试等等. 一般我们实现延迟消息都需要依赖底层的有序结构,比如堆,而Redis刚好提供了zset这种数据类型,它的底层实现是哈希表+跳表,也是一种有序的结构,所以这篇文章主要是使用Go+Redis来实现延迟队列. 当然R

  • php实现redis数据库指定库号迁移的方法

    本文实例讲述了php实现redis数据库指定库号迁移的方法,分享给大家供大家参考.具体如下: redis普通的数据库迁移,只能整个redis save,或者利用主从,当然也可以安装一个redis-dump,不过比较麻烦,这里提供一种php的脚本,实现指定库号的迁移,其实也就是遍历根据存储类型,读出来,插入新库,效果是这样: 复制代码 代码如下: [root@localhost ~]# php 1.php 1/407 101/407 201/407 301/407 401/407 PHP实例代码如

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • SpringBoot集成Redis实现消息队列的方法

    list 原理说明 Redis 的 list 是按照插入顺序排序的字符串链表. 如图所示,可以通过 lpush 和 rpop 或者 rpush 和 lpop 实现消息队列. 1 lpush 和 rpop 2 rpush 和 lpop 消息队列功能实现 引入 Redis 依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data

  • Golang实现基于Redis的可靠延迟队列

    目录 前言 原理详解 pending2ReadyScript ready2UnackScript unack2RetryScript ack consume 前言 在之前探讨延时队列的文章中我们提到了 redisson delayqueue 使用 redis 的有序集合结构实现延时队列,遗憾的是 go 语言社区中并无类似的库.不过问题不大,没有轮子我们自己造. 本文的完整代码实现在hdt3213/delayqueue,可以直接 go get 安装使用. 使用有序集合结构实现延时队列的方法已经广为

随机推荐