redis实现延时队列的两种方式(小结)

背景

项目中的流程监控,有几种节点,需要监控每一个节点是否超时。按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢;2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟。所以就想到用延迟队列的方式去实现。

一,redis的过期key监控

1,开启过期key监听

在redis的配置里把这个注释去掉

notify-keyspace-events Ex

然后重启redis

2,使用redis过期监听实现延迟队列

继承KeyExpirationEventMessageListener类,实现父类的方法,就可以监听key过期时间了。当有key过期,就会执行这里。这里就把需要的key过滤出来,然后发送给kafka队列。

@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

  @Autowired
  private KafkaProducerService kafkaProducerService;

  public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
    super(listenerContainer);
  }

  /**
   * 针对 redis 数据失效事件,进行数据处理
   * @param message
   * @param pattern
   */
  @Override
  public void onMessage(Message message, byte[] pattern){
    if(message == null || StringUtils.isEmpty(message.toString())){
      return;
    }
    String content = message.toString();
    //key的格式为  flag:时效类型:运单号 示例如下
    try {
      if(content.startsWith(AbnConstant.EMS)){
        kafkaProducerService.sendMessageSync(TopicConstant.EMS_WAYBILL_ABN_QUEUE,content);
      }else if(content.startsWith(AbnConstant.YUNDA)){
        kafkaProducerService.sendMessageSync(TopicConstant.YUNDA_WAYBILL_ABN_QUEUE,content);
      }
    } catch (Exception e) {
      log.error("监控过期key,发送kafka异常,",e);
    }
  }
}

可以看的出来,这种方式其实是很简单的,但是有几个问题需要注意,一是,这个尽量单机运行,因为多台机器都会执行,浪费cpu,增加数据库负担。二是,机器频繁部署的时候,如果有时间间隔,会出现数据的漏处理。

二,redis的zset实现延迟队列

1,生产者实现

可以看到生产者很简单,其实就是利用zset的特性,给一个zset添加元素而已,而时间就是它的score。

public void produce(Integer taskId, long exeTime) {
  System.out.println("加入任务, taskId: " + taskId + ", exeTime: " + exeTime + ", 当前时间:" + LocalDateTime.now());
  RedisOps.getJedis().zadd(RedisOps.key, exeTime, String.valueOf(taskId));
}

2,消费者实现

消费者的代码也不难,就是把已经过期的zset中的元素给删除掉,然后处理数据。

public void consumer() {
  Executors.newSingleThreadExecutor().submit(new Runnable() {
    @Override
    public void run() {
      while (true) {
        Set<String> taskIdSet = RedisOps.getJedis().zrangeByScore(RedisOps.key, 0, System.currentTimeMillis(), 0, 1);
        if (taskIdSet == null || taskIdSet.isEmpty()) {
          System.out.println("没有任务");

        } else {
          taskIdSet.forEach(id -> {
            long result = RedisOps.getJedis().zrem(RedisOps.key, id);
            if (result == 1L) {
              System.out.println("从延时队列中获取到任务,taskId:" + id + " , 当前时间:" + LocalDateTime.now());
            }
          });
        }
        try {
          TimeUnit.MILLISECONDS.sleep(100);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
  });
}

可以看到这种方式其实是比上个方式要好的。因为,他的那两个缺点都被克服掉了。多台机器也没事儿,也不用再担心部署时间间隔长的问题。

总结

两个方式都是不错的,都能解决问题。碰到问题,多思考,多总结。

到此这篇关于redis实现延时队列的两种方式(小结)的文章就介绍到这了,更多相关redis 延时队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用Redis实现延时任务的解决方案

    最近在生产环境刚好遇到了延时任务的场景,调研了一下目前主流的方案,分析了一下优劣并且敲定了最终的方案.这篇文章记录了调研的过程,以及初步方案的实现. 候选方案对比 下面是想到的几种实现延时任务的方案,总结了一下相应的优势和劣势. 方案 优势 劣势 选用场景 JDK 内置的延迟队列 DelayQueue 实现简单 数据内存态,不可靠 一致性相对低的场景 调度框架和 MySQL 进行短间隔轮询 实现简单,可靠性高 存在明显的性能瓶颈 数据量较少实时性相对低的场景 RabbitMQ 的 DLX 和 T

  • golang实现redis的延时消息队列功能示例

    前言 在学习过程中发现redis的zset还可以用来实现轻量级的延时消息队列功能,虽然可靠性还有待提高,但是对于一些对数据可靠性要求不那么高的功能要求完全可以实现.本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo. 提前准备 安装redis, redis-go 因为用的是macOS, 直接 $ brew install redis $ go get github.com/garyburd/redigo/redis 又因为比较懒,生成任

  • 利用Redis实现延时处理的方法实例

    背景 在开发中,往往会遇到一些关于延时任务的需求.例如 •生成订单30分钟未支付,则自动取消 •生成订单60秒后,给用户发短信 对上述的任务,我们给一个专业的名字来形容,那就是延时任务. 最近需要做一个延时处理的功能,主要是从kafka中消费消息后根据消息中的某个延时字段来进行延时处理,在实际的实现过程中有一些需要注意的地方,记录如下. 实现过程 说到java中的定时功能,首先想到的Timer和ScheduledThreadPoolExecutor,但是相比之下Timer可以排除,主要原因有以下

  • redis实现延时队列的两种方式(小结)

    背景 项目中的流程监控,有几种节点,需要监控每一个节点是否超时.按传统的做法,肯定是通过定时任务,去扫描然后判断,但是定时任务有缺点:1,数据量大会慢:2,时间不好控制,太短,怕一次处理不完,太长状态就会有延迟.所以就想到用延迟队列的方式去实现. 一,redis的过期key监控 1,开启过期key监听 在redis的配置里把这个注释去掉 notify-keyspace-events Ex 然后重启redis 2,使用redis过期监听实现延迟队列 继承KeyExpirationEventMess

  • Springboot+rabbitmq实现延时队列的两种方式

    什么是延时队列,延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝.去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会.那么一天之中肯定是会有很多个

  • java实现消息队列的两种方式(小结)

    实现消息队列的两种方式 Apache ActiveMQ官方实例发送消息 直接在Apache官网http://activemq.apache.org/download-archives.html下载ActiveMQ源码 下载解压后拿到java代码实例 然后倒入IDE 如下: 请认真阅读readme.md文件,大致意思就是把项目打成两个jar包,然后启动服务,然后同时运行打的两个jar包,然后就能看到具体的调用信息.打jar包时直接利用maven打就行了,不用修改代码. 启动服务: 利用Spring

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • 基于Java数组实现循环队列的两种方法小结

    用java实现循环队列的方法: 1.添加一个属性size用来记录眼下的元素个数. 目的是当head=rear的时候.通过size=0还是size=数组长度.来区分队列为空,或者队列已满. 2.数组中仅仅存储数组大小-1个元素,保证rear转一圈之后不会和head相等.也就是队列满的时候.rear+1=head,中间刚好空一个元素. 当rear=head的时候.一定是队列空了. 队列(Queue)两端同意操作的类型不一样: 能够进行删除的一端称为队头,这样的操作也叫出队dequeue: 能够进行插

  • laravel实现上传图片的两种方式小结

    第一:是laravel里面自带的上传方式(写在接口里面的) function uploadAvatar(Request $request) { $user_id = Auth::id(); $avatar = $request->file('avatar')->store('/public/' . date('Y-m-d') . '/avatars'); //上传的头像字段avatar是文件类型 $avatar = Storage::url($avatar);//就是很简单的一个步骤 $res

  • vue data引入本地图片的两种方式小结

    我就废话不多说了,大家直接看吧! 第一种 <template> <img :src="imgsrc"> </template> <script> export default { data () { return { imgsrc: require('../../images/ICON-electronicbilling.png') } } } </script> 第二种 <template> <img :s

  • pytorch实现focal loss的两种方式小结

    我就废话不多说了,直接上代码吧! import torch import torch.nn.functional as F import numpy as np from torch.autograd import Variable ''' pytorch实现focal loss的两种方式(现在讨论的是基于分割任务) 在计算损失函数的过程中考虑到类别不平衡的问题,假设加上背景类别共有6个类别 ''' def compute_class_weights(histogram): classWeigh

  • Springboot之修改启动端口的两种方式(小结)

    Springboot启动的时候,端口的设定默认是8080,这肯定是不行的,我们需要自己定义端口,Springboot提供了两种方式,第一种,我们可以通过application.yml配置文件配置,第二种,可以通过代码里面指定,在开发中,建议使用修改application.yml的方式来修改端口. 代码地址 #通过yml配置文件的方式指定端口地址 https://gitee.com/yellowcong/springboot-demo/tree/master/springboot-demo2 #硬

  • Spring MVC获取HTTP请求头的两种方式小结

    1 前言 请求是任何Web服务要关注的对象,而请求头也是其中非常重要的信息.本文将通过代码讲解如何在Spring MVC项目中获取请求头的内容.主要通过两种方式获取: (1)通过注解@RequestHeader获取,需要在Controller中显式获取: (2)通过RequestContextHolder获取,可以任何地方获取. 接下来通过代码讲解. 2 通过注解@RequestHeader获取 需要在Controller中显示使用@RequestHeader. 2.1 获取某个请求头 只获取其

随机推荐