SpringBoot使用RabbitMQ延时队列(小白必备)

1.什么是MQ

MQ,是一种跨进程的通信机制,用于上下游传递消息。

在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。

使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务。

为什么会产生消息列队?

  1. 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个;
  2. 不同进程(process)之间传递消息时,为了实现标准化,将消息的格式规范化了,并且,某一个进程接受的消息太多,一下子无法处理完,并且也有先后顺序,必须对收到的消息进行排队,因此诞生了事实上的消息队列;

延时列队的使用场景?

  1. 订单业务:在淘宝或者京东购买东西,用户下单后未付款则30分钟后取消订单。
  2. 短信通知:手机用户交完话费后,几分钟之内将会收到缴费信息

2.什么是RabbitMQ(这里就做了一下简单介绍)

RabbitMQ是一种消息队列 ,用于常见的进程通信。支持点对点,请求应答和发布订阅模式 并且提供多种语言的支持。常见的java,c#,php都支持。

常被用在异步处理,应用解耦。流量消锋等复杂的业务场景中。和java的kafka一样都属于消息中间件。

下载地址:

https://www.rabbitmq.com/download.html

进入RabbitMQ官网

1.第一步

第二步

下载好后不要着急安装RabbitMQ,我们这里还需要安装Erlang

下载地址:http://www.erlang.org/download/otp_win64_17.3.exe

安装步骤

步骤一

步骤二

步骤三

步骤四

安装完成

现在安装RabbitMQ

步骤一

步骤二

步骤三

安装完成

启动RabbitMQ管理工具

开始菜单 — 最新添加 — 展开 — 选中双击

输入命令:rabbitmq-plugins enable rabbitmq_management

效果如果图

在浏览器中输入地址查看:http://127.0.0.1:15672/

出现次页面代表成功,默认用户和密码都是guest/ guest

若不出现此页面,就是安装失败了,不要慌,多半问题在系统用户名必须是中文(放心有解决办法):

Windows下安装RabbitMQ后,按正常RabbitMQ会自动注册服务并自动启动,但是如果有的道友不注意中英文目录就会出现服务启动后几秒钟自动停止,而且反反复复。

出现这种情况一般都是由我们的用户名是中文,而导致默认的DB和log访问出现问。所以我建议以后大家在使用windows操作系统的时候尽量用英文来命名文件或目录,这样会极大的减小以后安装软件出现莫名其妙的问题的bug。

接下来我们先卸载我们的RabbitMQ,然后在我们的系统变量里设置一个RABBITMQ_BASE 的变量路径为一个不含英文的路径 比如 E:\rabbit,最后我们重新安装RabbitMQ即可,然后就会看到RabbitMQ服务自动注册了,并且不会自动停止。

SpringBoot整合RabbitMQ

1.添加依赖

pom.xml中添加 spring-boot-starter-amqp的依赖

 <!-- spring-boot-starter-amqp的依赖 -->
      <dependency>
   <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>

其他依赖

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
      </dependency>

      <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
      </dependency>

      <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
          <exclusion>
            <groupId>org.junit.vintage</groupId>
            <artifactId>junit-vintage-engine</artifactId>
          </exclusion>
        </exclusions>
      </dependency>

      <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
        <scope>test</scope>
      </dependency>

application.yml文件中配置rabbitmq相关内容

spring:
  rabbitmq:
   host: localhost
   port: 5672
   username: guest
   password: guest

这里我们环境就搭建起来了

2.具体编码实现

配置列队

 package com.example.spring_boot_rabbitmq;

  import lombok.extern.slf4j.Slf4j;
  import org.springframework.amqp.core.*;
  import org.springframework.context.annotation.Bean;
  import org.springframework.context.annotation.Configuration;

  import java.util.HashMap;
  import java.util.Map;

  /**
   * @author:zq
   * @date: Greated in 2019/12/19 11:46
   * 配置队列
   */

  @Configuration
  @Slf4j
  public class DelayRabbitConfig {

    /**
     * 延迟队列 TTL 名称
     */
    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
    /**
     * DLX,dead letter发送到的 exchange
     * 延时消息就是发送到该交换机的
     */
    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
    /**
     * routing key 名称
     * 具体消息发送在该 routingKey 的
     */
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";

    public static final String ORDER_QUEUE_NAME = "user.order.queue";
    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
    public static final String ORDER_ROUTING_KEY = "order";

    /**
     * 延迟队列配置
     * <p>
     * 1、params.put("x-message-ttl", 5 * 1000);
     * 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
     * 2、rabbitTemplate.convertAndSend(book, message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
     **/
    @Bean
    public Queue delayOrderQueue() {
      Map<String, Object> params = new HashMap<>();
      // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,
      params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
      // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
      params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
      return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
    }
    /**
     * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
     * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,
     * 不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderDelayExchange() {
      return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }
    @Bean
    public Binding dlxBinding() {
      return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
    }

    @Bean
    public Queue orderQueue() {
      return new Queue(ORDER_QUEUE_NAME, true);
    }
    /**
     * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
     * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
     **/
    @Bean
    public TopicExchange orderTopicExchange() {
      return new TopicExchange(ORDER_EXCHANGE_NAME);
    }

    @Bean
    public Binding orderBinding() {
      // TODO 如果要让延迟队列之间有关联,这里的 routingKey 和 绑定的交换机很关键
      return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
    }

  }

创建一个Order实体类

package com.example.spring_boot_rabbitmq.pojo;

 import lombok.Data;

 import java.io.Serializable;

 /**
  * @author:zq
  * @date: Greated in 2019/12/19 11:49
  */
 @Data
 public class Order implements Serializable {
   private static final long serialVersionUID = -2221214252163879885L;

   private String orderId; // 订单id

   private Integer orderStatus; // 订单状态 0:未支付,1:已支付,2:订单已取消

   private String orderName; // 订单名字

 }

接收者

package com.example.spring_boot_rabbitmq;

 import com.example.spring_boot_rabbitmq.pojo.Order;
 import com.rabbitmq.client.Channel;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.Message;
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.stereotype.Component;

 import java.util.Date;

 /**
  * @author:zq
  * @date: Greated in 2019/12/19 11:53
  * 接收者
  */

 @Component
 @Slf4j
 public class DelayReceiver {
   @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
   public void orderDelayQueue(Order order, Message message, Channel channel) {
     log.info("###########################################");
     log.info("【orderDelayQueue 监听的消息】 - 【消费时间】 - [{}]- 【订单内容】 - [{}]", new Date(), order.toString());
     if(order.getOrderStatus() == 0) {
       order.setOrderStatus(2);
       log.info("【该订单未支付,取消订单】" + order.toString());
     } else if(order.getOrderStatus() == 1) {
       log.info("【该订单已完成支付】");
     } else if(order.getOrderStatus() == 2) {
       log.info("【该订单已取消】");
     }
     log.info("###########################################");
   }

 }

发送者

 package com.example.spring_boot_rabbitmq;

 import com.example.spring_boot_rabbitmq.pojo.Order;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.amqp.core.AmqpTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;

 import java.util.Date;

 /**
 * @author:zq
 * @date: Greated in 2019/12/19 11:55
 * 发送者
 */
 @Component
 @Slf4j
 public class DelaySender {
   @Autowired
   private AmqpTemplate amqpTemplate;

   public void sendDelay(Order order) {
     log.info("【订单生成时间】" + new Date().toString() +"【1分钟后检查订单是否已经支付】" + order.toString() );
     this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
       // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
       message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
       return message;
     });
   }

 }

测试,访问http://localhost:8080/sendDelay查看日志输出

package com.example.spring_boot_rabbitmq;

import com.example.spring_boot_rabbitmq.pojo.Order;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;

/**
 * @author:zq
 * @date: Greated in 2019/12/19 11:57
 * 测试
 */

@RestController
public class TestController {
  @Autowired
  private DelaySender delaySender;

  @GetMapping("/sendDelay")
  public Object sendDelay() {
    Order order1 = new Order();
    order1.setOrderStatus(0);
    order1.setOrderId("123456");
    order1.setOrderName("小米6");

    Order order2 = new Order();
    order2.setOrderStatus(1);
    order2.setOrderId("456789");
    order2.setOrderName("小米8");

    delaySender.sendDelay(order1);
    delaySender.sendDelay(order2);
    return "ok";
  }

}

输出

到此已经SpringBoot使用RabbitMQ延时队列已经完成,希望对你有所帮助,若有地方不理解或者有更好的办法请留言,谢谢。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • springboot集成rabbitMQ之对象传输的方法

    rabbitMQ的安装方法网上有很多教程,这里就不重复了. 在springboot上使用rabbitMQ传输字符串和对象,本文所给出的例子是在两个不同的项目之间进行对象和和字符串的传输. rabbitMQ的依赖(在两个项目中一样的配置): <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • springboot2.0集成rabbitmq的示例代码

    安装rabbitmq 简介: rabbitmq即一个消息队列,主要用来实现应用程序的异步和解耦,消息缓冲,消息分发的作用. 由于rabbitmq依赖于erlang语言,所以先安装erlang: 添加erlang solutions源 $ wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm $ sudo rpm -Uvh erlang-solutions-1.0-1.noarch.rpm $ su

  • SpringBoot+RabbitMq具体使用的几种姿势

    目前主流的消息中间件有activemq,rabbitmq,rocketmq,kafka,我们要根据实际的业务场景来选择一款合适的消息中间件,关注的主要指标有,消息投递的可靠性,可维护性,吞吐量以及中间件的特色等重要指标来选择,大数据领域肯定是kafka,那么传统的业务场景就是解耦,异步,削峰.那么就在剩下的3款产品中选择一款,从吞吐量,社区的活跃度,消息的可靠性出发,一般的中小型公司选择rabbitmq来说可能更为合适.那么我们就来看看如何使用它吧. 环境准备 本案例基于springboot集成

  • Springboot 配置RabbitMQ文档的方法步骤

    简介 RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗 概念: 生产者 消息的产生方,负责将消息推送到消息队列 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息 队列 消息的寄存器,负责存放生产者发送的消息 交换机 负责根据一定规则分发生产者产生的消息 绑定 完成交换机和队列之间的绑定 模式: direct:直连模式,用于实例间的任务分发 topic:话题模式,通过可配置的规则分发给绑定在该

  • springboot实现rabbitmq的队列初始化和绑定

    配置文件,在rabbit中自动建立exchange,queue和绑定它们的关系 代码里初始化exchange 代码里初始化queue 代码里绑定exchange,queue和routekey 配置文件,直接声明vhost 代码里初始化exchange /** * rabbitMq里初始化exchange. * * @return */ @Bean public TopicExchange crmExchange() { return new TopicExchange(EXCHANGE); }

  • springboot整合rabbitmq的示例代码

    概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理. 它现实了AMQP协议,并且遵循Mozilla Public License开源协议,它支持多种语言,可以方便的和spring集成. 消息队列使用消息将应用程序连接起来,这些消息通过像RabbitMQ这样的消息代理服务器在应用程序之间路由. 基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息

  • SpringBoot下RabbitMq实现定时任务

    本文实例为大家分享了SpringBoot下RabbitMq实现定时任务,供大家参考,具体内容如下 定时任务场景:订单下单15分钟未付款自动关闭 延迟任务实现原理图如下: 根据上图看出我们需要两个队列(一是死信队列,消息在里面度过TLL时间,二是处理队列,消息度过TLL时间后进入该队列),两个交换机和路由(一是用来将消息送入死信队列,二是将消息从死信队列送到处理队列),但是交换机其实可以用同一个,也就是一个交换机搭配两个路由的方式. 以下为代码实现过程: //首先rabbitAdmin的配置 @B

  • SpringBoot与rabbitmq的结合的示例

    消息中间件对于我们系统之间的解耦合,消峰等都有极大的帮助.spring boot 也集成了此部分的内容,集成最为容易的是rabbitmq.今天我们就以rabbitmq为例说明. 老规矩,先看下pom <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <

  • SpringBoot使用RabbitMQ延时队列(小白必备)

    1.什么是MQ MQ,是一种跨进程的通信机制,用于上下游传递消息. 在互联网架构中,MQ是一种非常常见的上下游"逻辑解耦+物理解耦"的消息通信服务. 使用了MQ之后,消息发送上游只需要依赖MQ,不用依赖其他服务. 为什么会产生消息列队? 不同进程(process)之间传递消息时,两个进程之间耦合程度过高,改动一个进程,引发必须修改另一个进程,为了隔离这两个进程,在两进程间抽离出一层(一个模块),所有两进程之间传递的消息,都必须通过消息队列来传递,单独修改某一个进程,不会影响另一个: 不

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • Springboot 整合 RabbitMQ 消息队列 详情

    目录 生产者工程 POM依赖 application文件 生产者业务代码 测试 Direct 模式 业务代码 消费者 消息监听 Topic 模式 生产者 消费者 生产者工程 POM依赖 可以在创建工程时直接选择添加依赖. application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置. RabbitMQ配置文件: 在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败.所以在使用springboot整合

  • Springboot+rabbitmq实现延时队列的两种方式

    什么是延时队列,延时队列应用于什么场景 延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费. 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝.去哪儿网) 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会 系统中的业务失败之后,需要重试 这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会.那么一天之中肯定是会有很多个

  • Docker安装RabbitMQ并安装延时队列插件

    目录 一.RabbitMQ简介 二.docker安装RabbitMQ 1.搜索镜像 2.拉取并运行容器 3.访问 http://192.168.50.128:15672 三.安装RabbitMQ延时队列插件(delayed_message_exchange) 一.RabbitMQ简介 RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛.

  • 一口气说出Java 6种延时队列的实现方法(面试官也得服)

    五一期间原计划是写两篇文章,看一本技术类书籍,结果这五天由于自律性过于差,禁不住各种诱惑,我连电脑都没打开过,计划完美宣告失败.所以在这能看出和大佬之间的差距,人家没白没夜的更文,比你优秀的人比你更努力,难以望其项背,真是让我自愧不如. 知耻而后勇,这不逼着自己又学起来了,个人比较喜欢一些实践类的东西,既学习到知识又能让技术落地,能搞出个demo最好,本来不知道该分享什么主题,好在最近项目紧急招人中,而我有幸做了回面试官,就给大家整理分享一道面试题:"如何实现延时队列?". 下边会介绍

  • C#通过rabbitmq实现定时任务(延时队列)

    本文主要讲解如何通过RabbitMQ实现定时任务(延时队列) 环境准备 需要在MQ中进行安装插件 地址链接 插件介绍地址:https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/ 使用场景 作为一个新的预支付订单被初始化放置,如果该订单在指定时间内未进行支付,则将被认为超时订单进行关闭处理:电商系统中应用较多,用户购买商品产生订单,但未进行支付,订单产生30分钟内未支付将关闭订单(且满足该场景数量庞大)

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录 一.死信队列 二.延时队列 三.延时消息设置不同过期时间 四.延时消息用延时插件的方式实现 一.死信队列 描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2) P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息. 特定情况有哪些呢: 1.消息被拒(basic.

随机推荐