生产redisson延时队列不消费问题排查解决

目录
  • 问题描述
  • 初步排查
  • 排查过程
  • 解决方案
  • redisson 延时队列原理
  • 流程总结

问题描述

项目使用redisson延时队列功能,实现直播的开播提醒,突然有一天业务爆出问题,未触发开播提醒。

初步排查

首先通过查询生产日志,发送端日志存在,没有消费日志,猜测消费端没有消费到延时消息,,在dba的协助下查询redis队列,消息也确实存在,但已经过了过期时间,由此证明redisson消费者出现问题。通过服务日志发现在最后一次设置自定义推送任务是在一次服务发布之前,服务发布后,之前设置的自定义推送消息均没有被客户端消费,由此猜想是由发布服务导致消费端失效。

排查过程

发送端代码

public <T> void produce(String delayQueue, T t, long delay, TimeUnit timeUnit) {
    try {
        log.info("delay msg,delayQueue:{},key:{},delay:{}", delayQueue, t, delay);
        if (delay < 0) {
            delay = 0;
        }
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);
        delayedQueue.offer(t, delay, timeUnit);
    }catch (Exception e){
        log.error("添加延时任务队列失败",e);
    }
}

消费端代码

public class DelayTaskHandler implements Runnable {
    @Override
    public void run() {
        RBlockingQueue<T> blockingFairQueue = redissonClient.getBlockingQueue(delayQueue);
        while (true) {
            try {
                T value = blockingFairQueue.take();
                log.info("delay queue {},延时任务开始执行,value - {} , timeStamp - {} , threadName - {}", delayQueue, value, System.currentTimeMillis(), Thread.currentThread().getName());
                consumer.accept(value);
            } catch (Exception e) {
                log.error("延时任务执行失败,", e);
            }
        }
    }
}

因为redisson 延时队列是基于redis实现的,所以从redis执行命令开始入手排查

1.打开redis监控,启动服务,发现redis首先执行了blpop命令,阻塞等待{cl-live-admin:notice_delay_queue} 队列消息

2.提交一个延时任务后,观察redis命令

此时发现redis首先执行了一个SUBSCRIBE命令,订阅了一个队列,然后执行了一段lua脚本,主要包括以下命令:

  • zrangebyscore:获取zset中score在0至当前时间戳范围内的前一百条数据 如果获取到数据则循环执行rpush,lrem,zrem命令
  • zrange:取zset中第一条数据
  • zadd:向zset中添加一条数据,score为时间戳
  • rpush:向list右边push一条数据
  • publish:如果添加的消息在顶部,则发布一条订阅消息

3.消费一条消息

同样消费的时候也是提交了一条lua脚本,主要执行了以下命令 可以看到和发送端命令相似

  • zrangebyscore:获取zset中score在0至当前时间戳范围内的前一百条数据
  • rpush:向list右边push一条数据
  • lrem:删除一条数据
  • zrem:删除zeset中的数据
  • zrange:获取第一条数据
  • BLPOP:阻塞等待队列消息

通过以上redis命令的执行可以发现一个命令SUBCRIBE用于订阅redis的一个队列,而这个命令只在发送消息的时候执行了,在消费的时候没有执行。从而验证了当服务重启后如果没有新的消息发送,那么客户端就不会发送SUBCRIBE命令,订阅延时队列,这就导致在服务重启前发送的消息到时间后无法消费。

解决方案

在消费端启动的时候添加一行代码用于订阅延时队列

 //订阅redis队列
 redissonClient.getDelayedQueue(blockingFairQueue);

那么为什么没有订阅就消费不到消息了呢?带着疑问继续深入理解redisson的实现

redisson 延时队列原理

首先回到消费端代码

在我们没有发送订阅命令的时候,客户端只是在阻塞等待一个指定队列的消息,那么这个队列的消息是谁放进去的呢? 带着疑问我们再看发送端代码

直接进入 delayedQueue.offer()方法内部

可以看到发送端是提交了一个lua脚本主要执行了zadd,rpush,publish命令,这里我们需要注意publish命令,在redis中pub/sub是对应的,当有publish的时候,那么subcribe端会收到该订阅消息。

那么是谁收到了订阅的消息,收到消息后又做了什么呢,回到redissonClient.getDelayedQueue(blockingFairQueue)代码中

继续进入 new RedissonDelayedQueue()

可以看到这里创建了一个QueueTransferTask,实现了pushTaskAsync()方法,具体内容是一个lua脚本,首先执行zrangebyscore 获取过期的前一百条数据,循环调用rpush,lrem,zrem,注意这里rpush的队列为我们指定的延时队列,也就是consumer端take的队列。至此明白了消费端的消息是方法pushTaskAsync()执行后放入的。那么什么时候执行这个方法呢。

进入 queueTransferService.schedule(queueName, task)方法

这里会执行start方法,继续跟进

这里可以看到添加了两个listener,onSubcribe,onMessage,当订阅到消息时执行onSubcribe中的pushTash,当redis有新的消息通知,就会触发scheduleTask(...)方法,startTime为上述中publish通知的元素过期时间

继续进入pushTask方法

这里可以看到一个熟悉的方法pushTaskAsync(),也就是前边的一段lua脚本,用于将过期的消息放入阻塞队列,并返回排在第一个的消息执行scheduleTask()

继续进入scheduleTask()方法

如果时间差小于10毫秒则执行pushTask方法,如果大于10毫秒则启动一个延时任务,到时间后执行pushTask方法。pushTask与scheduleTask互相调用循环往复

流程总结

至此源码分析完毕,整个流程总结如下:

发送端只是往zset,list,添加数据,并且发布一条订阅消息

消费端收到订阅消息后会查询zset中的过期消息,并放入阻塞队列供消费端take消息,并且获取zset第一个消息,启动一个延时任务,到期后继续从zset中获取过期消息如此循环。

此时就回答了上边的问题 那么为什么没有订阅就消费不到消息了呢?

如果没有订阅的话消费端就收不到订阅消息,也就不会去获取过期时间放入阻塞队列进行循环。

以上就是生产redisson延时队列不消费问题排查解决的详细内容,更多关于排查redisson延时队列不消费的资料请关注我们其它相关文章!

(0)

相关推荐

  • Redisson 主从一致性问题详解

    目录 Redisson 主从一致性 Java 实现 mutilLock RedissonConfig.java TestRedisson.java Redisson 主从一致性 我们先来说一下 Redis 的主从模式,Redis Master(主节点)中处理所有发向 Redis 的写操作(增删改),Redis Slave (从节点)只负责处理读操作,主节点会不断将自己的数据同步给从节点,确保主从之间的数据一致性,但是数据同步会存在一定的延时,主从一致性问题就是因为延时而导致的 比如我们通过 se

  • Quarkus集成redis操作Redisson实现数据互通

    目录 前言 集成redis 复制Redisson序列化 使用 前言 博主所在公司大量使用了redis缓存,redis客户端用的Redisson.在Quarkus集成redis时,博主尝试使用Redisson客户端直接集成,发现,在jvm模式下运行quarkus没点问题,但是在打native image时,就报错了,尝试了很多方式都是莫名其妙的异常.最后决定采用quarkus官方的redis客户端,但是Redisson客户端数据序列化方式是特有的,不是简单的String,所以quarkus中的re

  • spring boot集成redisson的最佳实践示例

    目录 前言 集成jedis实例,xml方式 集成前引用的jar springbean配置xml 集成redisson实例,javabean的方式 集成前引入的jar javabean配置如下 提供实例化javabean application.properties添加如下配置 前言 本文假使你了解spring boot并实践过,非spring boot用户可跳过也可借此研究一下. redisson是redis的java客户端程序,国内外很多公司都有在用,如下, 和spring的集成中官方给出的实

  • 分布式利器redis及redisson的延迟队列实践

    目录 前言碎语 延迟队列多种实现方式 redisson中的延迟队列实现 文末结语 前言碎语 首先说明下需求,一个用户中心产品,用户在试用产品有三天的期限,三天到期后准时准点通知用户,试用产品到期了.这个需求如果不是准时通知,而是每天定点通知就简单了.如果需要准时通知就只能上延迟队列了.使用场景除了如上,典型的业务场景还有电商中的延时未支付订单失效等等. 延迟队列多种实现方式 1.如基于RabbitMQ的队列ttl+死信路由策略:通过设置一个队列的超时未消费时间,配合死信路由策略,到达时间未消费后

  • Redis中Redisson红锁(Redlock)使用原理

    目录 简介 为什么使用Redis的红锁 解决方案:使用红锁 Redisson红锁实例 Redisson红锁原理 参考文章 简介 说明 本文介绍为什么要使用Redis的红锁(Redlock).什么是Redis的红锁以及Redis红锁的原理. 本文用Redisson来介绍Redis红锁的用法. Redisson 高版本会根据redisClient的模式来决定getLock返回的锁类型,如果集群模式,满足红锁的条件,则会直接返回红锁. 官网 REDIS distlock -- Redis中国用户组(C

  • Redisson如何解决Redis分布式锁提前释放问题

    目录 前言: 一.问题描述: 二.原因分析: 三.解决方案: 1.思考: 2.Redisson简单配置: 3.使用样例: 四.源码分析 1.lock加锁操作 2.unlock解锁操作 总结: 相关参考: 前言: 在分布式场景下,相信你或多或少需要使用分布式锁来访问临界资源,或者控制耗时操作的并发性. 当然,实现分布式锁的方案也比较多,比如数据库.redis.zk 等等.本文主要结合一个线上案例,讲解 redis 分布式锁的相关实现. 一.问题描述: 某天线上出现了数据重复处理问题,经排查后发现,

  • 生产redisson延时队列不消费问题排查解决

    目录 问题描述 初步排查 排查过程 解决方案 redisson 延时队列原理 流程总结 问题描述 项目使用redisson延时队列功能,实现直播的开播提醒,突然有一天业务爆出问题,未触发开播提醒. 初步排查 首先通过查询生产日志,发送端日志存在,没有消费日志,猜测消费端没有消费到延时消息,,在dba的协助下查询redis队列,消息也确实存在,但已经过了过期时间,由此证明redisson消费者出现问题.通过服务日志发现在最后一次设置自定义推送任务是在一次服务发布之前,服务发布后,之前设置的自定义推

  • Redisson 分布式延时队列 RedissonDelayedQueue 运行流程

    目录 前言 基本使用 内部数据结构介绍 基本流程 发送延时消息 获取延时消息 初始化延时队列 总结 前言 因为工作中需要用到分布式的延时队列,调研了一段时间,选择使用 RedissonDelayedQueue,为了搞清楚内部运行流程,特记录下来. 总体流程大概是图中的这个样子,初看一眼有点不知从何下手,接下来我会通过以下几点来分析流程,相信看完本文你能了解整个运行流程. 基本使用 内部数据结构介绍 基本流程 发送延时消息 获取延时消息 初始化延时队列 基本使用 发送延迟消息代码如下,发送了一条延

  • 基于golang的简单分布式延时队列服务的实现

    一.引言 背景 我们在做系统时,很多时候是处理实时的任务,请求来了马上就处理,然后立刻给用户以反馈.但有时也会遇到非实时的任务,比如确定的时间点发布重要公告.或者需要在用户做了一件事情的X分钟/Y小时后,EG: "PM:我们需要在这个用户通话开始10分钟后给予提醒给他们发送奖励" 对其特定动作,比如通知.发券等等.一般我接触到的解决方法中在比较小的服务里都会自己维护一个backend,但是随着这种backend和server增多,这种方法很大程度和本身业务耦合在一起,所以这时需要一个延

  • Springboot+rabbitmq实现延时队列的两种方式

    什么是延时队列,延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝.去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会.那么一天之中肯定是会有很多个

  • SpringBoot使用RabbitMQ延时队列(小白必备)

    1.什么是MQ MQ,是一种跨进程的通信机制,用于上下游传递消息. 在互联网架构中,MQ是一种非常常见的上下游"逻辑解耦+物理解耦"的消息通信服务. 使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务. 为什么会产生消息列队? 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个: 不

  • 一口气说出Java 6种延时队列的实现方法(面试官也得服)

    五一期间原计划是写两篇文章,看一本技术类书籍,结果这五天由于自律性过于差,禁不住各种诱惑,我连电脑都没打开过,计划完美宣告失败.所以在这能看出和大佬之间的差距,人家没白没夜的更文,比你优秀的人比你更努力,难以望其项背,真是让我自愧不如. 知耻而后勇,这不逼着自己又学起来了,个人比较喜欢一些实践类的东西,既学习到知识又能让技术落地,能搞出个demo最好,本来不知道该分享什么主题,好在最近项目紧急招人中,而我有幸做了回面试官,就给大家整理分享一道面试题:"如何实现延时队列?". 下边会介绍

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录 一.死信队列 二.延时队列 三.延时消息设置不同过期时间 四.延时消息用延时插件的方式实现 一.死信队列 描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2) P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息. 特定情况有哪些呢: 1.消息被拒(basic.

  • 基于Redis实现延时队列的优化方案小结

    目录 一.延时队列的应用 二.延时队列的实现 三.总结 一.延时队列的应用 近期在开发部门的新项目,其中有个关键功能就是智能推送,即根据用户行为在特定的时间点向用户推送相应的提醒消息,比如以下业务场景: 在用户点击充值项后,半小时内未充值,向用户推送充值未完成提醒. 在用户最近一次阅读行为2小时后,向用户推送继续阅读提醒. 在用户新注册或退出应用N分钟后,向用户推送合适的推荐消息. … 上述场景的共同特征就是在某事件触发后延迟一定时间后再执行特定任务,若事件触发时间点可知,则上述逻辑也可等价于在

  • 详解Java中的延时队列 DelayQueue

    当用户超时未支付时,给用户发提醒消息.另一种场景是,超时未付款,订单自动取消.通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行.其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描.这里我们用延时队列来做.RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列. Queue 队列是一种集合.除了基本的集合操作以外,队列还提供了额外的插入.提取和检查操作.队列的每个方法都以两种形式存在:一

  • redis实现延时队列的两种方式(小结)

    背景 项目中的流程监控,有几种节点,需要监控每一个节点是否超时.按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢:2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟.所以就想到用延迟队列的方式去实现. 一,redis的过期key监控 1,开启过期key监听 在redis的配置里把这个注释去掉 notify-keyspace-events Ex 然后重启redis 2,使用redis过期监听实现延迟队列 继承KeyExpirationEventMess

随机推荐