消息队列 RabbitMQ 与 Spring 整合使用的实例代码

一、什么是 RabbitMQ

RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包。

二、如何与 Spring 集成

1. 我们都需要哪些 Jar 包?

抛开单独使用 Spring 的包不说,引入 RabbitMQ 我们还需要两个:

<!-- RabbitMQ -->
<dependency>
 <groupId>com.rabbitmq</groupId>
 <artifactId>amqp-client</artifactId>
 <version>3.5.1</version>
</dependency>
<dependency>
 <groupId>org.springframework.amqp</groupId>
 <artifactId>spring-rabbit</artifactId>
 <version>1.4.5.RELEASE</version>
</dependency>

2. 使用外部参数文件 application.properties:

mq.host=127.0.0.1
mq.username=queue
mq.password=1234
mq.port=8001
# 统一XML配置中易变部分的命名
mq.queue=test_mq

易变指的是在实际项目中,如果测试与生产环境使用的同一个 RabbitMQ 服务器。那我们在部署时直接修改 properties 文件的参数即可,防止测试与生产环境混淆。

修改 applicationContext.xml 文件,引入我们创建的 properties 文件

<context:property-placeholder location="classpath:application.properties"/>
<util:properties id="appConfig" location="classpath:application.properties"></util:properties>

3. 连接 RabbitMQ 服务器

<!-- 连接配置 -->
<rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
  password="${mq.password}" port="${mq.port}" />

<rabbit:admin connection-factory="connectionFactory"/>

4. 声明一个 RabbitMQ Template

代码如下:

<rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory"  />

5. 在 applicationContext.xml 中声明一个交换机,name 属性需配置到 RabbitMQ 服务器。

<rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
 <rabbit:bindings>
  <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
 </rabbit:bindings>
</rabbit:topic-exchange>

交换机的四种模式:

  • direct:转发消息到 routigKey 指定的队列。
  • topic:按规则转发消息(最灵活)。
  • headers:(这个还没有接触到)
  • fanout:转发消息到所有绑定队列

交换器的属性:

  • 持久性:如果启用,交换器将会在server重启前都有效。
  • 自动删除:如果启用,那么交换器将会在其绑定的队列都被删除掉之后自动删除掉自身。
  • 惰性:如果没有声明交换器,那么在执行到使用的时候会导致异常,并不会主动声明。

如果没有队列绑定在交换机上,则发送到该交换机上的消息会丢失。

一个交换机可以绑定多个队列,一个队列可以被多个交换机绑定。

topic 类型交换器通过模式匹配分析消息的 routing-key 属性。它将 routing-key 和 binding-key 的字符串切分成单词。这些单词之间用点隔开。它同样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key:*.stock.#匹配 routing key:usd.stcok 和 eur.stock.db,但是不匹配 stock.nana。

因为交换器是命名实体,声明一个已经存在的交换器,但是试图赋予不同类型是会导致错误。客户端需要删除这个已经存在的交换器,然后重新声明并且赋予新的类型。

6. 在 applicationContext.xml 中声明一个队列,name 属性是需要配置到 RabbitMQ 服务器的。

代码如下:

<rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />

  • durable:是否持久化
  • exclusive:仅创建者可以使用的私有队列,断开后自动删除
  • auto-delete:当所有消费端连接断开后,是否自动删除队列

7. 创建生产者端

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @Description: 消息队列发送者
 * @Author:
 * @CreateTime:
 */
@Service
public class Producer {

 @Autowired
 private AmqpTemplate amqpTemplate;

 public void sendQueue(String exchange_key, String queue_key, Object object) {
  // convertAndSend 将Java对象转换为消息发送至匹配key的交换机中Exchange
  amqpTemplate.convertAndSend(exchange_key, queue_key, object);
 }
}

8. 在 applicationContext.xml 中配置监听及消费者端 

<!-- 消费者 -->
<bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>

<!-- 配置监听 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
  <!--
    queues 监听队列,多个用逗号分隔
    ref 监听器
  -->
  <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
</rabbit:listener-container>

消费者 Java 代码:

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

public class RabbitmqService implements MessageListener {
 public void onMessage(Message message) {
  System.out.println("消息消费者 = " + message.toString());
 }
}

至此,我们的所有配置文件就写完了,最终如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:context="http://www.springframework.org/schema/context"
  xmlns:util="http://www.springframework.org/schema/util"
  xmlns:aop="http://www.springframework.org/schema/aop"
  xmlns:tx="http://www.springframework.org/schema/tx"
  xmlns:rabbit="http://www.springframework.org/schema/rabbit"
  xmlns:p="http://www.springframework.org/schema/p"
  xsi:schemaLocation="
  http://www.springframework.org/schema/context
  http://www.springframework.org/schema/context/spring-context-3.0.xsd
  http://www.springframework.org/schema/beans
  http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  http://www.springframework.org/schema/util
  http://www.springframework.org/schema/util/spring-util-3.0.xsd
  http://www.springframework.org/schema/aop
  http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
  http://www.springframework.org/schema/tx
  http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
  http://www.springframework.org/schema/rabbit
  http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

 <!-- RabbitMQ start -->

 <!-- 连接配置 -->
 <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}"
  password="${mq.password}" port="${mq.port}" />

 <rabbit:admin connection-factory="connectionFactory"/>

 <!-- 消息队列客户端 -->
 <rabbit:template id="amqpTemplate" exchange="${mq.queue}_exchange" connection-factory="connectionFactory" />

 <!-- queue 队列声明 -->
 <!--
  durable 是否持久化
  exclusive 仅创建者可以使用的私有队列,断开后自动删除
  auto-delete 当所有消费端连接断开后,是否自动删除队列 -->
 <rabbit:queue id="test_queue" name="${mq.queue}_testQueue" durable="true" auto-delete="false" exclusive="false" />

 <!-- 交换机定义 -->
 <!--
  交换机:一个交换机可以绑定多个队列,一个队列也可以绑定到多个交换机上。
  如果没有队列绑定到交换机上,则发送到该交换机上的信息则会丢失。

  direct模式:消息与一个特定的路由器完全匹配,才会转发
  topic模式:按规则转发消息,最灵活
  -->
 <rabbit:topic-exchange name="${mq.queue}_exchange" durable="true" auto-delete="false">
  <rabbit:bindings>
   <!-- 设置消息Queue匹配的pattern (direct模式为key) -->
   <rabbit:binding queue="test_queue" pattern="${mq.queue}_patt"/>
  </rabbit:bindings>
 </rabbit:topic-exchange>

 <bean name="rabbitmqService" class="com.enh.mq.RabbitmqService"></bean>

 <!-- 配置监听 消费者 -->
 <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
  <!--
   queues 监听队列,多个用逗号分隔
   ref 监听器 -->
  <rabbit:listener queues="test_queue" ref="rabbitmqService"/>
 </rabbit:listener-container>
</beans>

9. 如何使用 RabbitMQ 发送一个消息

  @Autowired
 private Producer producer;
 @Value("#{appConfig['mq.queue']}")
 private String queueId;

 /**
  * @Description: 消息队列
  * @Author:
  * @CreateTime:
  */
 @ResponseBody
 @RequestMapping("/sendQueue")
 public String testQueue() {
  try {
   Map<String, Object> map = new HashMap<String, Object>();
   map.put("data", "hello rabbitmq");
   producer.sendQueue(queueId + "_exchange", queueId + "_patt", map);
  } catch (Exception e) {
   e.printStackTrace();
  }
  return "发送完毕";
 }

嗯。这个测试是 SpringMVC 框架。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

  • Spring学习笔记3之消息队列(rabbitmq)发送邮件功能

    rabbitmq简介: MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求.其中较为成熟的MQ产品有IBM WEBSPHERE MQ. 本节的内容是用户注册时,将邮

  • 详解Spring Boot 配置多个RabbitMQ

    闲话 好久没有写博客了,6月份毕业,因为工作原因,公司上网受限,一直没能把学到的知识点写下来,工作了半年,其实学到的东西也不少,但是现在回忆起来的东西少之又少,有时甚至能在同个问题中踩了几次,越来越觉得及时记录一下学到的东西很重要. 好了,闲话少说,写下这段时间学习的东西,先记录一下用spring Boot配置多个RabbitMQ的情况... 最近公司新启动一个新平台的项目,需要用微服务这个这几年很火的概念来做,所以就学习了Spring Boot方面的知识,给同事展示Spring Boot的一些

  • spring boot整合RabbitMQ(Direct模式)

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持. 1.新建一个Spring Boot工程,命名为:"rabbitmq-hello". 在pom.xml中引入如下依赖内容,其中spring-boot-starter-amqp用于支持RabbitMQ. <dependency> <groupId>org.springframework.boo

  • spring boot整合RabbitMQ实例详解(Fanout模式)

    1.Fanout Exchange介绍 Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).

  • rabbitmq结合spring实现消息队列优先级的方法

    1.1项目背景:做一个灾情预警的消息平台,灾情检查系统需要向消息平台里面推送消息,这里是典型的异构系统的消息传递,我们需要选择一个中间件作为消息队列,调研分析了rabbitmq,zeromq,activemq,kafka等消息中间件,综合性能,安全,可持久化等角度果断选择了rabbitmq作为我们的消息中间件 (其实这里是因为rabbitmq 是spring官方支持的,开发起来方便).需求上我们有多种类型的消息,这里有紧急推送的和一般的等区分,高并发时,就会有对消息进行优先推送的情况出现,于是r

  • 详解spring boot集成RabbitMQ

    RabbitMQ作为AMQP的代表性产品,在项目中大量使用.结合现在主流的spring boot,极大简化了开发过程中所涉及到的消息通信问题. 首先正确的安装RabbitMQ及运行正常. RabbitMQ需啊erlang环境,所以首先安装对应版本的erlang,可在RabbitMQ官网下载 # rpm -ivh erlang-19.0.4-1.el7.centos.x86_64.rpm 使用yum安装RabbitMQ,避免缺少依赖包引起的安装失败 # yum install rabbitmq-s

  • 消息队列 RabbitMQ 与 Spring 整合使用的实例代码

    一.什么是 RabbitMQ RabbitMQ 是实现 AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性.扩展性.高可用性等方面表现不俗.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然. RabbitMQ 是由 Erlang 语言开发,安装 RabbitMQ 服务需要先安装 Erlang 语言包. 二.如何与 Spring 集成 1. 我们都需要哪些 Jar 包? 抛开单独使用 Spring 的包不说,

  • Spring整合Redis完整实例代码

    做过大型软件系统的同学都知道,随着系统数据越来越庞大,越来越复杂,随之带来的问题就是系统性能越来越差,尤其是频繁操作数据库带来的性能损耗更为严重.很多业绩大牛为此提出了众多的解决方案和开发了很多框架以优化这种频繁操作数据库所带来的性能损耗,其中,尤为突出的两个缓存服务器是Memcached和Redis.今天,我们不讲Memcached和Redis本身,这里主要为大家介绍如何使spring与Redis整合. 1.pom构建 <project xmlns="http://maven.apach

  • Spring整合消息队列RabbitMQ流程

    目录 搭建生产者工程 创建工程 添加依赖 配置整合 发送消息 搭建消费者工程 创建工程 添加依赖 配置整合 消息监听器 搭建生产者工程 创建工程 添加依赖 修改pom.xml文件内容为如下: <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.

  • springboot整合消息队列RabbitMQ

    前言: RabbitMQ常用的三种Exchange Type:fanout.direct.topic. fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中. direct:把消息投递到那些binding key与routing key完全匹配的队列中. topic:将消息路由到binding key与routing key模式匹配的队列中. 这里基于springboot整合​​消息队列​​,测试这三种Exchange. 启动RabbitMQ 双击运行rabbitmq-s

  • ActiveMQ消息队列技术融合Spring过程解析

    这篇文章主要介绍了ActiveMQ消息队列技术融合Spring过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.业务逻辑 我想在修改一个物品的状态时,同时发送广播,给对应的监听器去实现,此商品存储到solr中,同时通过网页静态模板生成一个当前物品的详情页面,此时用到了广播机制 当我删除一个商品时,发送一个广播,给对应的监听器,同时删除solr中对应的物品. 广播机制:必须要同时在线,才能接收我的消息 使用消息中间件需要导入配置文件 <

  • Spring Boot整合mybatis(一)实例代码

    sprig-boot是一个微服务架构,加快了spring工程快速开发,以及简便了配置.接下来开始spring-boot与mybatis的整合. 1.创建一个maven工程命名为spring-boot-entity,pom.xml文件配置如下: <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:s

  • spring整合cxf框架实例

    CXF是webService的框架,能够和spring无缝整合 ##服务端编写 1.创建动态web项目 2.导入cxf和spring相关jar包(CXF核心包:cxf-2.4.2.jar) 3.在web.xml中配置CXF框架的核心Servlet <servlet> <servlet-name>cxf</servlet-name> <servlet-class>org.apache.cxf.transport.servlet.CXFServlet</s

  • Spring的事务机制实例代码

    本文研究的主要是Spring的事务机制的相关内容,具体如下. JAVA EE传统事务机制 通常有两种事务策略:全局事务和局部事务.全局事务可以跨多个事务性资源(即数据源,典型的是数据库和消息队列),通常都需要J2EE应用服务器的管理,其底层需要服务器的JTA支持.而局部事务则与底层采用的持久化技术有关,如果底层直接使用JDBC,需要用Connection对象来操事务.如果采用Hibernate持久化技术,则需要使用session对象来操作事务. 通常的,使用JTA事务,JDBC事务及Hibern

  • SpringBoot整合MongoDB完整实例代码

    目录 一.新建项目 二.docker-compose 配置mongoDB 三.SpringBoot配置MongoDB 问题:Exception authenticating MongoCredential 四.编写测试类 五.源码地址 一.新建项目 我们这次直接从IEDA创建项目,具体配置如下,还是万年的Java8. 二.docker-compose 配置mongoDB docker-compose.yml的具体配置如下,注意的是本地的文件夹data2022可以根据需要改成自己的名称,如果本地还

随机推荐