一篇文章教你将JAVA的RabbitMQz与SpringBoot整合

目录
  • 一、fanout:发布订阅型
  • 二、direct:直连型
  • 三、topic:通配符模式
  • 四、消费者端接收消息
  • 总结

本文主要聊SpringBoot整合RabbitMQ,主要分为生产者和消费者两个工程,目录结构如下:

先简单说一下RabbitMQ的一些核心概念:

1.虚拟主机vhost:vhost是物理隔离的,你可以将vhost看作是一个个小型的RabbitMQ

2.交换机exchange:生产者发送的消息不是直接到达队列的,而是交换机,然后交换机再根据路由key,路由到指定的队列,可以理解为一个分发消息的角色

3.队列:存放消息的地方,消费者拿到的消息就是通过队列,可以理解为存放消息的容器

4.bindingKey和routingKey:这两个可能比较容易搞混,bindingKey是交换机和队列建立关系时候的key,而routingKey则是生产者发送消息时候的key。在DirectExchange类型的交换机中,bindingKey和routingKey是一致的,而在TopicExchange类型的交换机中,两者一般是不一致的。

开发环境:RabbitMQ:3.7.7、SpringBoot2.1.7

引入相关依赖:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.7.RELEASE</version>
        </dependency>
        <!-- rabbitmq依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.1.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.1.7.RELEASE</version>
        </dependency>
    </dependencies>

在生产者和消费者工程的application.yml文件添加RabbitMQ配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

如果是和上面一样的值,其实也可以不用配置,因为RabbitMQ的默认配置就是它们:

接下来,看看几种交换机的使用场景,包括:

(1)fanout:发布订阅型,可以理解为广播

(2)direct:直连型,可以理解为点对点

(3)topic:通配符模式

一、fanout:发布订阅型

在生产者工程,添加如下配置:

@Configuration
public class FanoutRabbitmqConfig {
    public static final String EMAIL_QUEUE = "email_fanout";
    public static final String SMS_QUEUE = "sms_fanout";
    public static final String EXCHANGE_NAME = "exchange_fanout";
    // 发布订阅模式,不用routingKey
    @Bean
    public FanoutExchange getFanoutExchange(){
        return new FanoutExchange(EXCHANGE_NAME);
    }
    @Bean
    public Queue getFanoutEmailQueue(){
        return new Queue(EMAIL_QUEUE);
    }
    @Bean
    public Queue getFanoutSmsQueue(){
        return new Queue(SMS_QUEUE);
    }
    // 绑定交换机和队列
    @Bean
    public Binding fanoutEmailExchange(){
        return BindingBuilder.bind(getFanoutEmailQueue()).to(getFanoutExchange());
    }
    @Bean
    public Binding fanoutSmsExchange(){
        return BindingBuilder.bind(getFanoutSmsQueue()).to(getFanoutExchange());
    }
    // 用于测试找到了交换机,但是没有找到队列的情况
    @Bean
    public FanoutExchange notBindingQueue(){
        return new FanoutExchange("testNotBindingQueue");
    }
}

可以看到声明了名为exchange_fanout的交换机和两个队列,并进行了绑定,如果向exchange_fanout交换机发送消息,则两个队列都会收到消息。

二、direct:直连型

在生产者工程添加如下配置:

@Configuration
public class RoutingRabbitmqConfig {
    public static final String EMAIL_QUEUE = "email_routing";
    public static final String SMS_QUEUE = "sms_routing";
    public static final String EXCHANGE_NAME = "exchange_routing";
    public static final String EMAIL_ROUTINGKEY = "inform.email.routingKey";
    public static final String SMS_ROUTINGKEY = "inform.sms.routingKey";
    @Bean
    public DirectExchange getDirectExchange(){
        return new DirectExchange(EXCHANGE_NAME);
    }
    @Bean
    public Queue getDirectEmailQueue(){
        return new Queue(EMAIL_QUEUE);
    }
    @Bean
    public Queue getDirectSmsQueue(){
        return new Queue(SMS_QUEUE);
    }
    // 绑定交换机和队列
    @Bean
    public Binding directEmailExchange(){
        return BindingBuilder.bind(getDirectEmailQueue()).to(getDirectExchange()).with(EMAIL_ROUTINGKEY);
    }
    @Bean
    public Binding directSmsExchange(){
        return BindingBuilder.bind(getDirectSmsQueue()).to(getDirectExchange()).with(SMS_ROUTINGKEY);
    }
}

和fanout类型不同的是,这里使用了routingKey进行交换机和队列的绑定,如果发送消息时的routingKey是inform.sms.routingKey,那么只有sms_routing这个队列可以收到消息,email_routing队列则不会收到消息,这正是直连的含义所在。

三、topic:通配符模式

topic类型的交换机,常使用的通配符是:*和#

(1)星号*:匹配一个词

(2)井号:匹配0个或多个词

词是通过点分割的,被点分割的就是一个词

先看配置:

@Configuration
public class TopicsRabbitmqConfig {
    public static final String EMAIL_QUEUE = "email_topics";
    public static final String SMS_QUEUE = "sms_topics";
    public static final String EXCHANGE_NAME = "exchange_topics";
    public static final String EMAIL_ROUTINGKEY = "inform.#.email.#";
    // * 必须匹配一个词,不能是0个
//    public static final String EMAIL_ROUTINGKEY = "inform.*.email";
    public static final String SMS_ROUTINGKEY = "inform.#.sms.#";
    @Bean
    public TopicExchange getTopicsExchange(){
        return new TopicExchange(EXCHANGE_NAME);
    }
    @Bean
    public Queue getTopicsEmailQueue(){
        return new Queue(EMAIL_QUEUE);
    }
    @Bean
    public Queue getTopicsSmsQueue(){
        return new Queue(SMS_QUEUE);
    }
    // 绑定交换机和队列
    @Bean
    public Binding topicsEmailExchange(){
        return BindingBuilder.bind(getTopicsEmailQueue()).to(getTopicsExchange()).with(EMAIL_ROUTINGKEY);
    }
    @Bean
    public Binding topicsSmsExchange(){
        return BindingBuilder.bind(getTopicsSmsQueue()).to(getTopicsExchange()).with(SMS_ROUTINGKEY);
    }
}

和direct不同的是:bindingKey中可以含有星号和井号,可以看到:队列email_topics的bindingKey(注意,这是bindingKey,不是routingKey)是inform.#.email.#,而队列sms_topics则绑定了inform.#.sms.#。如果发送消息的routingKey是inform开头,后面含有email,则email_topics队列会收到消息,而sms_topics队列不会收到消息。

topic类型交换机的强大之处在于,它可以替代前面两种交换机:

(1)如果bindingKey只有一个井号,在是fanout

(2)如果bindingKey不含有星号和井号,则是direct

四、消费者端接收消息

消费者接收消息配置如下:

@Component
public class TopicsConsumer {
    // topics模式
    @RabbitListener(queues = TopicsRabbitmqConfig.EMAIL_QUEUE)
    public void receiveEmail(String msg){
        System.out.println("topics收到email消息: "+msg);
    }
    @RabbitListener(queues = TopicsRabbitmqConfig.SMS_QUEUE)
    public void receiveSms(String msg){
        System.out.println("topics收到sms消息: "+msg);
    }
    // routing模式
    @RabbitListener(queues = RoutingRabbitmqConfig.EMAIL_QUEUE)
    public void receiveRoutingEmail(String msg){
        System.out.println("routing收到email消息: "+msg);
    }
    @RabbitListener(queues = RoutingRabbitmqConfig.SMS_QUEUE)
    public void receiveRoutingSms(String msg){
        System.out.println("routing收到sms消息: "+msg);
    }
    // 发布订阅模式
    @RabbitListener(queues = FanoutRabbitmqConfig.EMAIL_QUEUE)
    public void receiveFanoutEmail(String msg){
        System.out.println("fanout收到email消息: "+msg);
    }
    @RabbitListener(queues = FanoutRabbitmqConfig.SMS_QUEUE)
    public void receiveFanoutSms(String msg){
        System.out.println("fanout收到sms消息: "+msg);
    }
}

这里主要是使用了RabbitListener注解,指定了接收消息的队列。

总结

本片文章就到这里了,希望能够给你带来帮助,也希望您能够多多关注我们的更多内容!

(0)

相关推荐

  • SpringBoot停止启动时测试检查rabbitmq操作

    目录 SpringBoot停止启动时测试检查rabbitmq 问题 解决 RabbitMQ的简单使用的Demo 1.声明 2.创建一个测试账户 3.pom依赖 5.创建入口类 6.测试 7.总结 SpringBoot停止启动时测试检查rabbitmq 问题 在Springboot项目中配置rabbitmq后,总是在每次启动时自动测试MQ的连接,如果测试不通过,就一直重连,导致项目无法正常启动.自己在开发与MQ无关的功能时,无法正常进行,十分耽误时间.如下所示: org.springframewo

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • Springboot集成RabbitMQ死信队列的实现

    目录 关于死信队列 什么样的消息会进入死信队列? 场景分析 代码实现 场景模拟 生产者 消费者,设置死信队列监听 关于死信队列 在大多数的MQ中间件中,都有死信队列的概念.死信队列同其他的队列一样都是普通的队列.在RabbitMQ中并没有特定的"死信队列"类型,而是通过配置,将其实现. 当我们在创建一个业务的交换机和队列的时候,可以配置参数,指明另一个队列为当前队列的死信队列,在RabbitMQ中,死信队列(严格的说应该是死信交换机)被称为DLX Exchange.当消息"死

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • 详解消息队列及RabbitMQ部署和使用

    目录 什么是消息队列 为什么需要消息队列 常见的消息队列 ActiveMQ RabbitMQ ZeroMQ Kafka RocketMQ RabbitMQ 的部署和使用 Python 编写生产者 Python 编写消费者 最后的话 什么是消息队列 消息队列拆开了看,就是消息 + 队列,消息是什么?其实就是程序之间通讯所用到的数据,消息从生产者那里产生,进入队列后,安装设计好的规则出队,由消费者消费.仅此而已. 为什么需要消息队列 消息队列,最重要的是队列,可以想象一下没有队列的场景,你去银行办业

  • 一篇文章教你将JAVA的RabbitMQz与SpringBoot整合

    目录 一.fanout:发布订阅型 二.direct:直连型 三.topic:通配符模式 四.消费者端接收消息 总结 本文主要聊SpringBoot整合RabbitMQ,主要分为生产者和消费者两个工程,目录结构如下: 先简单说一下RabbitMQ的一些核心概念: 1.虚拟主机vhost:vhost是物理隔离的,你可以将vhost看作是一个个小型的RabbitMQ 2.交换机exchange:生产者发送的消息不是直接到达队列的,而是交换机,然后交换机再根据路由key,路由到指定的队列,可以理解为一

  • 一篇文章教你用Java使用JVM工具检测问题

    目录 1.jps 2.jstat 3.jinfo 4.jstack a.死循环案例 b.等待输入 c.死锁 5.jconsole 6.jvisualvm 总结 1.jps 显示运行程序的进程.编码.主类目录信息 public class Demo01 { /** * jps : 显示进程ID,主类名称 * jps -v: 显示进程ID,主类名称以及详细编码信息 * jps -l:显示进程ID,主类目录 * * @param args * @throws IOException */ public

  • 一篇文章教你使用SpringBoot如何实现定时任务

    前言 在 Spring + SpringMVC 环境中,一般来说,要实现定时任务,我们有两中方案,一种是使用 Spring 自带的定时任务处理器 @Scheduled 注解,另一种就是使用第三方框架 Quartz ,Spring Boot 源自 Spring+SpringMVC ,因此天然具备这两个 Spring 中的定时任务实现策略,当然也支持 Quartz,本文我们就来看下 Spring Boot 中两种定时任务的实现方式. 一.第一种方式:@Scheduled 使用 @Scheduled

  • 一篇文章教你学会js实现弹幕效果

    目录 新建一个html文件: 建好html文件,搞出初始模版 HTML添加 CSS填充 js逻辑代码 动画效果 下面是弹幕效果 : 相信小伙伴们都看过了,那么它实现的原理是什么呢,那么我们前端怎么用我们web技术去实现呢?? 新建一个html文件: 哈哈哈,大家别像我一样用中文命名. 中文命名是不合规范的,行走江湖,大佬们看见你的中文命名会笑话你的. 上图中,我们引入了jquery插件,没错我们用jq写,回归原始(找不到cdn链接的小伙伴可以百度bootcdn,在里面搜索jquery).并且取了

  • 一篇文章带你了解Java Spring基础与IOC

    目录 About Spring About IOC Hello Spring Hello.java Beans.xml Test.java IOC创建对象的几种方式 Spring import settings Dependency Injection 1.构造器注入 2.set注入 3.拓展注入 P-namespcae&C-namespace Bean scopes singleton prototype Bean的自动装配 byName autowire byType autowire 小结

  • 一篇文章教你学会使用Python绘制甘特图

    目录 优点 局限 一日一书 用来制作甘特图的专业工具也不少,常见的有:Microsoft Office Project.GanttProject.WARCHART XGantt.jQuery.Gantt.Excel等,网络上也有一些优质工具支持在线绘制甘特图. 可是这种现成的工具,往往也存在一些弊端,让编程人员不知所措.比如说,花里胡哨的UI,让人目不暇接,不知点哪个才好: 比如说,有些基于浏览器的图表需要掌握HTML.JS等编程语言,只会点Python的我直接被劝退: 再比如,进来就是注册.登

  • 一篇文章教会你使用java爬取想要的资源

    目录 说明 方法摘要 常用的Element节点方法 实战:爬取B站番剧 Maven 代码 说明 简介: 你还在为想要的资源而获取不到而烦劳吗?你还在为你不会python而爬取不到资源而烦劳吗?没关系,看完我这一篇文章你就会学会用java爬取资源,从此不会因此而烦劳,下面我会以爬取京东物品来进行实战演示!!! 方法摘要 方法 方法说明 adoptNode(Node source) 试图把另一文档中的节点采用到此文档. createAttribute(String name) 创建指定名称的Attr

  • 一篇文章轻松搞懂Java中的自旋锁

    前言 锁作为并发共享数据,保证一致性的工具,在JAVA平台有多种实现(如 synchronized 和 ReentrantLock等等 ) .这些已经写好提供的锁为我们开发提供了便利. 在之前的文章<一文彻底搞懂面试中常问的各种"锁" >中介绍了Java中的各种"锁",可能对于不是很了解这些概念的同学来说会觉得有点绕,所以我决定拆分出来,逐步详细的介绍一下这些锁的来龙去脉,那么这篇文章就先来会一会"自旋锁". 正文 出现原因 在我们的

  • 一篇文章带你了解Java中ThreadPool线程池

    目录 ThreadPool 线程池的优势 线程池的特点 1 线程池的方法 (1) newFixedThreadPool (2) newSingleThreadExecutor (3) newScheduledThreadPool (4) newCachedThreadPool 2 线程池底层原理 3 线程池策略及分析 拒绝策略 如何设置maximumPoolSize大小 ThreadPool 线程池的优势 线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些

  • 一篇文章教你使用枚举来实现java单例模式

    目录 传统的单例写法解决了什么问题 仍然存在的问题 为什么枚举就没有问题 总结 传统的单例写法解决了什么问题 首先,在大多数情况下(不包含面试),传统的单例写法已经完全够用了.通过 synchronized 关键字解决了多线程并发使用. public synchronized static SingleClassV1 getInstance(){ if(instance == null){ instance = new SingleClassV1(); } return instance; }

随机推荐