springBoot整合rabbitMQ的方法详解

引入pom

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.5</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.wxy</groupId>
	<artifactId>test-rabbitmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>test-rabbitmq</name>
	<description>Demo project for Spring Boot</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.amqp</groupId>
			<artifactId>spring-rabbit-test</artifactId>
			<scope>test</scope>
		</dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

测试

package com.wxy.rabbit;

import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

@RunWith(SpringRunner.class)
@SpringBootTest
class TestRabbitmqApplicationTests {

	@Autowired
	RabbitTemplate rabbitTemplate;

	@Test
	public  void sendmessage() {
		String exchange  = "exchange.direct";
		String routingkey  = "wxy.news";
		//object为消息发送的消息体,可以自动实现消息的序列化
		Map<String,Object> msg = new HashMap<>();
		msg.put("msg","使用mq发送消息");
		msg.put("data", Arrays.asList("helloword",123456,true));
		rabbitTemplate.convertAndSend(exchange, routingkey,msg);
	}

	@Test
	public  void receive() {
		Object object  = rabbitTemplate.receiveAndConvert("wxy.news");
		System.out.println(object);
	}

}

默认消息转换类型

 ###############在RabbitTemplate默认使用的是SimpleMessageConverter#######
  private MessageConverter messageConverter = new SimpleMessageConverter();

  ###############源码:使用SerializationUtils.deserialize###############
   public Object fromMessage(Message message) throws MessageConversionException {
        Object content = null;
        MessageProperties properties = message.getMessageProperties();
        if (properties != null) {
            String contentType = properties.getContentType();
            if (contentType != null && contentType.startsWith("text")) {
                String encoding = properties.getContentEncoding();
                if (encoding == null) {
                    encoding = this.defaultCharset;
                }

                try {
                    content = new String(message.getBody(), encoding);
                } catch (UnsupportedEncodingException var8) {
                    throw new MessageConversionException("failed to convert text-based Message content", var8);
                }
            } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
                try {
                    content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
                } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
                    throw new MessageConversionException("failed to convert serialized Message content", var7);
                }
            }
        }

将默认消息类型转化成自定义json格式

第一:上面SimpleMessageConverter是org.springframework.amqp.support.converter包下MessageConverter接口的一个实现类

第二:查看该接口MessageConverter下支持哪些消息转化
ctrl+H查看该接口中的所有实现类

第三步:找到json相关的convert

RabbitTemplateConfigurer中定义if (this.messageConverter != null)则使用配置的messageConverter
 ################## if (this.messageConverter != null)则使用配置的messageConverter
public void configure(RabbitTemplate template, ConnectionFactory connectionFactory) {
        PropertyMapper map = PropertyMapper.get();
        template.setConnectionFactory(connectionFactory);
        if (this.messageConverter != null) {
            template.setMessageConverter(this.messageConverter);
        }

        template.setMandatory(this.determineMandatoryFlag());
        Template templateProperties = this.rabbitProperties.getTemplate();
        if (templateProperties.getRetry().isEnabled()) {
            template.setRetryTemplate((new RetryTemplateFactory(this.retryTemplateCustomizers)).createRetryTemplate(templateProperties.getRetry(), Target.SENDER));
        }

        templateProperties.getClass();
        map.from(templateProperties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
        templateProperties.getClass();
        map.from(templateProperties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
        templateProperties.getClass();
        map.from(templateProperties::getExchange).to(template::setExchange);
        templateProperties.getClass();
        map.from(templateProperties::getRoutingKey).to(template::setRoutingKey);
        templateProperties.getClass();
        map.from(templateProperties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
    }

配置一个messageConversert(org.springframework.amqp.support.converter包中的)

package com.wxy.rabbit.config;

import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MessageConverConfig {

    @Bean
    public MessageConverter getMessageConvert(){
        return  new Jackson2JsonMessageConverter();
    }
}

再次发送消息体json格式

使用注解@RabbitListener监听

监听多个队列

@RabbitListener(queues = {"wxy.news","wxy.emps"})

监听单个队列

@RabbitListener(queues = "wxy.news")
package com.wxy.rabbit.service;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class RabbitMqReceiveService {

    @RabbitListener(queues = {"wxy.news","wxy.emps"})
    public void  getReceiveMessage(){
        System.out.println("监听到性的消息");
    }

    @RabbitListener(queues =  {"wxy.news","wxy.emps"})
    public void  getReceiveMessageHead(Message message){
        System.out.println(message.getBody());
        System.out.println( message.getMessageProperties());
    }

}

在程序中创建队列,交换器,并进行绑定

@Test
	public  void create() {
		//创建一个点对点的交换器
		amqpAdmin.declareExchange(new DirectExchange("amqpexchange.direct"));
		//创建一个队列
		// String name,:队列名称
		// boolean durable :持久化
		amqpAdmin.declareQueue(new Queue("amqp.queue",true));
		//绑定
		//String destination, Binding.DestinationType destinationType, String exchange, String routingKey
		//  @Nullable Map<String, Object> arguments
		amqpAdmin.declareBinding(new Binding("amqp.queue", Binding.DestinationType.QUEUE,
				"amqpexchange.direct","wxy.news", null));

	}

到此这篇关于springBoot整合rabbitMQ的方法详解的文章就介绍到这了,更多相关springBoot整合rabbitMQ内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Spring Boot整合RabbitMQ实例(Topic模式)

    1.Topic交换器介绍 Topic Exchange 转发消息主要是根据通配符. 在这种交换机下,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息. 在这种交换机模式下: 路由键必须是一串字符,用句号(.) 隔开,比如说 agreements.us,或者 agreements.eu.stockholm 等. 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:agreements..b.*

  • SpringBoot整合RabbitMQ 手动应答(简单demo)

    版本说明 JDK 1.8 RabbitMQ 3.7.15 Erlang 22.0 SpringBoot 2.3.3.RELEASE // TODO 2021年1月8日 整理CentOS安装RabbitMQ流程 1. 在RabbitMQ的Web管理界面,创建test队列 参数的含义 durability:是否持久化(重启或宕机后消息依然保存) durable 持久 transient 暂时 新建maven项目. 2. pom.xml <?xml version="1.0" enco

  • spring boot整合RabbitMQ实例详解(Fanout模式)

    1.Fanout Exchange介绍 Fanout Exchange 消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略. 如上图所示,即当使用fanout交换器时,他会将消息广播到与该交换器绑定的所有队列上,这有利于你对单条消息做不同的反应. 例如存在以下场景:一个web服务要在用户完善信息时,获得积分奖励,这样你就可以创建两个对列,一个用来处理用户信息的请求,另一个对列获取这条消息是来完成积分奖励的任务. 2.代码示例 1).

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • Spring Boot整合RabbitMQ开发实战详解

    这篇文章主要讲基本的整合.先把代码跑起来,再说什么高级特性. RabbitMQ 中的一些术语 如果你打开 RabbitMQ web 控制台,你会发现其中有一个 Exhanges 不好理解.下面简单说明一下. 交换器(Exchange) 交换器就像路由器,我们先是把消息发到交换器,然后交换器再根据路由键(routingKey)把消息投递到对应的队列.(明白这个概念很重要,后面的代码里面充分体现了这一点) 队列(Queue) 队列很好理解,就不用解释了. 绑定(Binding) 交换器怎么知道把这条

  • Springboot 整合RabbitMq(用心看完这一篇就够了)

    该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,provider消息推送实例,consumer消息消费实例,Direct.Topic.Fanout的使用,消息回调.手动确认等. (但是关于rabbitMq的安装,就不介绍了) 在安装完rabbitMq后,输入http://ip:15672/ ,是可以看到一个简单后台管理界面的. 在这个界面里面我们可以做些什么? 可以手动创建虚拟host,创建用户,分配权限,创建交换机,创建队列等等,还有查看队列消息,消费效率,推送效率等等. 以上

  • SpringBoot整合RabbitMQ, 实现生产者与消费者的功能

    自然,依赖是少不了的.除了spring-boot-starter-web依赖外. 就这个是最主要的依赖了,其他的看着办就是了.我用的是gradle,用maven的看着弄也一样的.无非就是包+包名+版本 //AMQP compile('org.springframework.boot:spring-boot-starter-amqp:2.0.4.RELEASE') 这里有一个坑.导致我后来发送消息时一直连不上去.报错: java.net.SocketException: socket closed

  • spring boot整合RabbitMQ(Direct模式)

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持. 1.新建一个Spring Boot工程,命名为:"rabbitmq-hello". 在pom.xml中引入如下依赖内容,其中spring-boot-starter-amqp用于支持RabbitMQ. <dependency> <groupId>org.springframework.boo

  • springboot整合rabbitmq的示例代码

    概述 RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,或者简单地将作业队列以便让分布式服务器进行处理. 它现实了AMQP协议,并且遵循Mozilla Public License开源协议,它支持多种语言,可以方便的和spring集成. 消息队列使用消息将应用程序连接起来,这些消息通过像RabbitMQ这样的消息代理服务器在应用程序之间路由. 基本概念 Broker 用来处理数据的消息队列服务器实体 vhost 由RabbitMQ服务器创建的虚拟消息

  • springBoot整合rabbitMQ的方法详解

    引入pom <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0

  • SpringBoot整合Shiro的方法详解

    目录 1.Shito简介 1.1 什么是shiro 1.2 有哪些功能 2.QuickStart 3.SpringBoot中集成 1.导入shiro相关依赖 2.自定义UserRealm 3.定义shiroConfig 4.新建页面进行测试 1.Shito简介 1.1 什么是shiro Apache Shiro是一个java安全(权限)框架 Shiro可以非常容易的开发出足够好的应用,其不仅可以用在javase环境,也可以用在javaee环境 shiro可以完成,认证,授权,加密,会话管理,we

  • SpringBoot整合RocketMQ的方法详解

    目录 一:Ubuntu安装RocketMQ 二:添加RocketMQ依赖 三:在application中添加RocketMQ配置 四:编写消费者,消息生产者,消息实体类(自定义) 五:测试Controller 一:Ubuntu安装RocketMQ 1.下载(在下面地址选择自己需要的版本的rocketmq) http://rocketmq.apache.org/release_notes/ 2.解压,更改配置 将下载的zip文件解压到自己需要安装的位置 在unbuntu系统下需要修改安装跟目录下的

  • springboot整合solr的方法详解

    这一篇写一下springboot整合solr,代码已经上传到github,传送门. 1.新建core并配置schema solr create -c "book_core" ,配置分词器并且field类型定义为分词器类型. <fieldType name="ik_word" class="solr.TextField"> <analyzer type="index"> <tokenizer cla

  • SpringBoot整合mybatis的方法详解

    目录 1依赖配置 2使用 2.1SpringBoot配置整合mybatis: 2.2SpringBoot注解整合mybatis: 2.3在配置类上增加@MapperScan注解,扫描某个包下的全部Mapper文件: 总结 1 依赖配置 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> &l

  • springboot整合mybatisplus的方法详解

    目录 POM: application.yaml: POJO: mapper接口: 包扫描: 测试: 总结 POM: <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.5.1</version> </dependency> <dependenc

  • SpringBoot实现整合微信支付方法详解

    目录 1.准备工作 1.1 数据库表 1.2 实体类 1.3 导入依赖 1.4 配置文件 1.5 创建读取微信支付相关信息的工具类 1.6 其他工具类 2.生成订单 2.1 远程调用用户模块和课程模块 2.2 远程调用方法的实现 2.3 根据课程id和用户id生成订单 3.查询订单信息 3.1 controller层 3.2 service层 4.生成微信支付的二维码 4.1 controller层 4.2 service层 5.查询订单支付状态 5.1 controller层 5.2 serv

  • SpringBoot整合Shiro的代码详解

    shiro是一个权限框架,具体的使用可以查看其官网 http://shiro.apache.org/  它提供了很方便的权限认证和登录的功能. 而springboot作为一个开源框架,必然提供了和shiro整合的功能!接下来就用springboot结合springmvc,mybatis,整合shiro完成对于用户登录的判定和权限的验证. 1.准备数据库表结构 这里主要涉及到五张表:用户表,角色表(用户所拥有的角色),权限表(角色所涉及到的权限),用户-角色表(用户和角色是多对多的),角色-权限表

  • SpringBoot整合MongoDB的步骤详解

    项目结构: 1.pom引入mongodb依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> 2 配置application.properties #spring.data.mongodb.host=127.0.0.1 #spr

  • SpringBoot整合Swagger2的步骤详解

    简介 swagger是一个流行的API开发框架,这个框架以"开放API声明"(OpenAPI Specification,OAS)为基础, 对整个API的开发周期都提供了相应的解决方案,是一个非常庞大的项目(包括设计.编码和测试,几乎支持所有语言). springfox大致原理: springfox的大致原理就是,在项目启动的过种中,spring上下文在初始化的过程, 框架自动跟据配置加载一些swagger相关的bean到当前的上下文中,并自动扫描系统中可能需要生成api文档那些类,

随机推荐