如何通过RabbitMq实现动态定时任务详解

目录
  • 一、需求背景
  • 二、方案思考
    • (1)需求大致分析
    • (2)可尝试的方案
  • 三、通过RabbitMQ实现延时任务并间接实现动态定时任务。
    • (1)通过死信的方式实现延时信息消费
    • (2)通过MQ延时插件实现延时任务(重点)
  • 四、MQ延时任务插件实现动态定时任务
    • (1)安装延时插件
    • (2)编码实现
    • (3)流程图
  • 总结

一、需求背景

定时任务的需求所谓是数不胜数,其中实现方式也是百花齐放,用得最多的大概率为Springboot中的 @Scheduled(cron = “0 0 1 1 * ?”) 注解,或者是定时任务XXL-JOB框架,这两者我接触的比较多,除此之外还有,Quartz 、elastic-job、但这两个在分布式领域而言,和XXL-JOBB比较,XXL-JOB更为受欢迎。无论是这些框架或者是springboot自带的定时任务组件,基本上都能满足固定定时任务的需求。而我们今天讨论的是动态定时任务的实现。

动态定时任务的需求其实在现实生活中随处可见,如花费到期多久之后发送信息提醒用户?时间间隔是多少。又或者客户下单之后多久提醒商家发货,提醒的频率又是多少…。这样的需求还有很多。今天我们针对此类需求进行探讨。

二、方案思考

(1)需求大致分析

对于此类需求相比于传统的定时任务无非多了可控性, 其可控性包括了定时任务开始和结束时间的可控性,周期性可控性,只要解决了这两个问题,实际上此类的需求也就迎刃而解了。

(2)可尝试的方案

前面提供的方案只做文字探索性描述。

2.1、 采用重写Springboot 的定时框架,从数据库中读取cron表达式来实现可控性周期。

其本质是通过如下线程进行动态定时任务的创建,从而实现对应的周期可控性。

ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();

其具体的细节不再说,其存在的痛点包括了

1 . 需要另外逻辑去实现可控性开始时间和结束时间

2. 此任务开启的入参是corn表达式,需要另外的逻辑将其进行转化,太过于猥琐

2.2、采用时间线程池

时间线程池我忘记叫什么,他是可以指定开始时间,周期时间的,相对而言,比第一种方案来得更为直观,其我考虑到的痛点如下。其实上面那种方案也是有这个痛点的。

  1. 多节点,多服务的服务部署情况下,无法实现高可用特性
  2. 需要编写过多的逻辑来管理任务线程,如果不够谨慎,有可能造成内存浪费。

2.3、采用延时操作

简单言之,实际上只要实现了延时操作 便是实现了动态的开始时间以及周期性运行,可以利用其递归的概念实现所谓的动态周期。

redis 队列来实现延时

redis的体量本身定位就不高,在数据量(任务量)过大时,对redis的压力也很大,redis不一定扛得住。但其实通过redis来实现延时消息这样的成功案列还是有很多的。在这里就不细说了。

RabbitMq实现延时消息。

通过MQ实现延时消息是本文的重点,在标题三会细说。

三、通过RabbitMQ实现延时任务并间接实现动态定时任务。

(1)通过死信的方式实现延时信息消费

通过创建死信队列来实现延时任务,然后再通过递归思想实现对应的逻辑,就可以实现对应的动态延时任务,但是这个会存在以下下几个痛点。

队列顺序消费

通过死信,我们确实可以动态的控制消息的消费时间,但是消息在队列里面,如果队列里面存在多个信息任务,前一个未到消费时间,后一个已经到了消费时间,这就好导致了,即使后面任务信息消费时间到了,却没法被消费的问题。解决方法,对队列进行排序逻辑,但如果这样做的话,就有点猥琐了。

开销过大。

对于通过死信来实现延时消息,网上有挺多优秀的博客介绍,在此就不做说明了。

(2)通过MQ延时插件实现延时任务(重点)

使用延时插件需要MQ在3.6以上(实际上我在尝试下载的时候并未发现git上有对应3.6的插件,所以还是选择较高的版本比较好)。

四、MQ延时任务插件实现动态定时任务

(1)安装延时插件

这里不做过多说明,重点在于编码的实现,主要步骤如下。

去官网下载对应版本的插件,地址为下载地址

插件名字为rabbitmq_delayed_message_exchange

将插件放到MQ插件目录下,然后cmd命令解压网(网上有命令),然后重启mq服务。大概就这样的一个过程。

(2)编码实现

创建队列

这里只弄了对应的核心代码,大致就是创建延时交换机,延时队列,以及绑定器,对应的key,value如下

    public static final String DELAY_EXCHANGE = "delay.exchange";

    public static final String DELAY_ROUTE_KEY = "delay.routeKey";

    public static final String DELAY_QUEUE = "delay.queue";

    /**
     * 延时交换机
     * @return 延时交换机
     */
    @Bean
    public CustomExchange delayExchange() {
        Map<String, Object> arguments = new HashMap<>(1);
        arguments.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE,"x-delayed-message",true,false,arguments);
    }

    /**
     * mq已经安装了延时插件使用,否则得使用延时插件
     * @return 延时发送队列。
     */
    @Bean
    public Queue delayQueue() {
        return new Queue(DELAY_QUEUE,true,false,false);
    }

    /**
     * 延时绑定区
     * @return 延时绑定区
     */
    @Bean
    public Binding delayBind() {
        return BindingBuilder.bind(this.delayQueue()).to(this.delayExchange()).with(DELAY_ROUTE_KEY).noargs();
    }

生产者

这里写得比较随意,也直接使用了lombok,还直接用了 @Service ,有点草率,主要为了让读者看得清晰。还用了hutool工具类的JSONUtil。

可以清晰的看到主方法里面需要传一个Integer类型的入参,这个时间我将其转换成了秒,其MQ实际入参为毫秒,所以读者不要被误导。入参time通俗的讲就是这个消息多久之后被消费。不需要在乎顺序。

package com.linkyoyo.bill.mq.impl;

import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.linkyoyo.bill.bo.WorkOrderDelaySenMailActionBO;
import com.linkyoyo.bill.config.RabbitMQConfig;
import com.linkyoyo.bill.mq.DelaySenderService;
import lombok.AllArgsConstructor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

/**
 * 延时发送
 * @author 邹 [295006967@qq.com]
 * @date 2022/1/4 20:33
 */
@Slf4j
@RequiredArgsConstructor
@AllArgsConstructor
@Service
public class DelaySenderServiceImpl implements DelaySenderService {

    private final RabbitTemplate rabbitTemplate;

    @Override
    @Async
    public void sendMessageByDelay(JSONObject message, Integer time) {
        if(ObjectUtil.isNull(message) || ObjectUtil.isNull(time)) {
            return;
        }
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, RabbitMQConfig.DELAY_ROUTE_KEY, message, msg -> {
            msg.getMessageProperties().setHeader("x-delay", time * 1000);
            return msg;
        });
        log.info("延时发送成功:延时周期时间{}毫秒,消息内容为{}", time * 1000, message);
    }

    @Override
    public void sendMessageByDelay(WorkOrderDelaySenMailActionBO actionBO) {
        Integer afterSecond = actionBO.getAfterSecond();
        if(ObjectUtil.isNull(afterSecond)) {
            afterSecond = 0;
        }
        this.sendMessageByDelay(JSONUtil.parseObj(actionBO), afterSecond);
    }
}

消费者

消费者的demo不太好写,只是做了一个简单的伪代码。 以定时任务发邮箱为例

1- 消费者线程开始,先执行发邮箱任务

2- 发送完邮箱之后判断是否还需要发邮箱,如果需要,就再通过生产者发送延时邮箱 此时可以指定下一次消费的时间,以此流程走下去便是一套动态任务的流程实现。可以参考后续的流程图。

这样就实现一个简易的定时任务发送邮箱的逻辑

	private final DelaySenderService delaySenderService;

    @RabbitHandler
    @RabbitListener(queues = RabbitMQConfig.DELAY_QUEUE)
    public void delayConsumer(Message message) {
        //业务逻辑
        this.sendMail(workOrderDelaySenMailActionBO);
        // 判断是否需要递归执行定时任务(实际上就是使用生产者再发一次延时消息,确认下一次消费)
        if(需要进行定时任务) {
             this.sendDelayMessageToMq(workOrderDelaySenMailActionBO);
        }
        log.info("信息为:{}", message.getBody());
    }

大致流程就这么多了,以下是整套步骤流闭环程图

(3)流程图

总结

到此这篇关于如何通过RabbitMq实现动态定时任务的文章就介绍到这了,更多相关RabbitMq实现动态定时任务内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • SpringBoot下RabbitMq实现定时任务

    本文实例为大家分享了SpringBoot下RabbitMq实现定时任务,供大家参考,具体内容如下 定时任务场景:订单下单15分钟未付款自动关闭 延迟任务实现原理图如下: 根据上图看出我们需要两个队列(一是死信队列,消息在里面度过TLL时间,二是处理队列,消息度过TLL时间后进入该队列),两个交换机和路由(一是用来将消息送入死信队列,二是将消息从死信队列送到处理队列),但是交换机其实可以用同一个,也就是一个交换机搭配两个路由的方式. 以下为代码实现过程: //首先rabbitAdmin的配置 @B

  • 详解Spring Cloud Stream使用延迟消息实现定时任务(RabbitMQ)

    我们在使用一些开源调度系统(比如:elastic-job等)的时候,对于任务的执行时间通常都是有规律性的,可能是每隔半小时执行一次,或者每天凌晨一点执行一次.然而实际业务中还存在另外一种定时任务,它可能需要一些触发条件才开始定时,比如:编写博文时候,设置2小时之后发送.对于这些开始时间不确定的定时任务,我们也可以通过Spring Cloud Stream来很好的处理. 为了实现开始时间不确定的定时任务触发,我们将引入延迟消息的使用.RabbitMQ中提供了关于延迟消息的插件,所以本文就来具体介绍

  • 如何通过RabbitMq实现动态定时任务详解

    目录 一.需求背景 二.方案思考 (1)需求大致分析 (2)可尝试的方案 三.通过RabbitMQ实现延时任务并间接实现动态定时任务. (1)通过死信的方式实现延时信息消费 (2)通过MQ延时插件实现延时任务(重点) 四.MQ延时任务插件实现动态定时任务 (1)安装延时插件 (2)编码实现 (3)流程图 总结 一.需求背景 定时任务的需求所谓是数不胜数,其中实现方式也是百花齐放,用得最多的大概率为Springboot中的 @Scheduled(cron = “0 0 1 1 * ?”) 注解,或

  • Java下SpringBoot创建定时任务详解

    序言 使用SpringBoot创建定时任务非常简单,目前主要有以下三种创建方式: 一.基于注解(@Scheduled) 二.基于接口(SchedulingConfigurer) 前者相信大家都很熟悉,但是实际使用中我们往往想从数据库中读取指定时间来动态执行定时任务,这时候基于接口的定时任务就派上用场了. 三.基于注解设定多线程定时任务 一.静态:基于注解 基于注解@Scheduled默认为单线程,开启多个任务时,任务的执行时机会受上一个任务执行时间的影响. 1.创建定时器 使用SpringBoo

  • springboot定时任务详解

    在我们开发项目过程中,经常需要定时任务来帮助我们来做一些内容, Spring Boot 默认已经帮我们实行了,只需要添加相应的注解就可以实现 一.基于注解(静态) 1.pom 包配置 pom 包里面只需要引入 Spring Boot Starter 包即可 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactI

  • vue系列之动态路由详解【原创】

    开题 最近用vue来构建了一个小项目,由于项目是以iframe的形式嵌套在别的项目中的,所以对于登录的验证就比较的麻烦,索性后端大佬们基于现在的问题提出了解决的方案,在看到他们的解决方案之前,我先画了一个比较标准的单系统的解决方案. 本文目录: 一: 设想 二: 讨论 三:实现 四:总结 一: 设想 简单解释下上图就是: 首先前端从cookie获取token,如果没有token就跳转到登录页面登录,登录验证之后生成token存在数据库中并返回给前端:前端将这个token保存下来,为了让在浏览器新

  • IOS 静态方法与动态方法详解

    IOS 静态方法与动态方法详解 1.问题提出 iOS中有静态方法与动态方法,那么两种方法的异同是什么? 2.问题分析 因为每个对象都由相应的数据结构与方法相构成,一个程序可能有多个属于同一个类的对象,而每个对象的数据结构应该是不一的,但方法是相同的,若为每个对象开辟内存空间来存储方法,必然是对内存空间极大的浪费.因此apple是通过类对象与元类来解决这个问题的. 从根本来说,c++.objective-c.java都发源于c语言,因此这些语言实际上可以理解了经过封装的c语言,所以它们更加方便使用

  • mybatis的动态sql详解(精)

    MyBatis 的一个强大的特性之一通常是它的动态 SQL 能力.如果你有使用 JDBC 或其他 相似框架的经验,你就明白条件地串联 SQL 字符串在一起是多么的痛苦,确保不能忘了空 格或在列表的最后省略逗号.动态 SQL 可以彻底处理这种痛苦. 通常使用动态SQL不可能是独立的一部分,MyBatis当然使用一种强大的动态SQL语言来改进这种情形,这种语言可以被用在任意映射的SQL语句中. 动态SQL元素和使用 JSTL或其他相似的基于XML的文本处理器相似.在MyBatis之前的版本中,有很多

  • JVM 方法调用之动态分派(详解)

    1. 动态分派 一个体现是重写(override).下面的代码,运行结果很明显. public class App { public static void main(String[] args) { Super object = new Sub(); object.f(); } } class Super { public void f() { System.out.println("super : f()"); } public void f(int i) { System.out

  • 基于RabbitMQ的简单应用(详解)

    虽然后台使用了读写分离技术,能够在一定程度上抗击高并发,但是如果并发量特别巨大时,主数据库不能同时处理高并发的请求,这时数据库容易宕机. 问题: 现在的问题是如何既能保证数据库正常运行,又能实现用户数据的入库操作? 解决方案: 引入rabbitMQ技术: 说明: 当数据库的访问压力过载时,这时会将过载以后的数据先保存到rabbitMQ中.其中的数据结构是队列的形式,先进先出.这时数据库从队列中取数据执行.一直到队列中的数据全部操作完成为止. RabbitMQ就是消息的中间件. RabbitMQ介

  • 对layui数据表格动态cols(字段)动态变化详解

    如搜索查询时,常会遇到按日期时间段查询,并显示查询的每个日期的数据,后台拼装数据此处不讨论. 把表格渲染封装在函数里面,cols_arr是传入的字段数组 function tableRender(cols_arr){ table.render({ elem: '#demo' , url: 请求地址 //数据接口 , method: 'post' , page: true //开启分页 , cols: cols_arr , id: 'demo' , limit: 10 , limits: [10,

  • Vue 使用Props属性实现父子组件的动态传值详解

    如下所示: <!DOCTYPE html> <html lang="en" xmlns:v-on="http://www.w3.org/1999/xhtml"> <head> <meta charset="UTF-8"> <title>Title</title> <script src="https://cdn.jsdelivr.net/npm/vue&quo

随机推荐