Springboot 配置RabbitMQ文档的方法步骤

简介

RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗

概念:

  • 生产者 消息的产生方,负责将消息推送到消息队列
  • 消费者 消息的最终接受方,负责监听队列中的对应消息,消费消息
  • 队列 消息的寄存器,负责存放生产者发送的消息
  • 交换机 负责根据一定规则分发生产者产生的消息
  • 绑定 完成交换机和队列之间的绑定

模式:

  • direct:直连模式,用于实例间的任务分发
  • topic:话题模式,通过可配置的规则分发给绑定在该exchange上的队列
  • headers:适用规则复杂的分发,用headers里的参数表达规则
  • fanout:分发给所有绑定到该exchange上的队列,忽略routing key

SpringBoot集成RabbitMQ

一、引入maven依赖

<dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
 <version>1.5.2.RELEASE</version>
</dependency>

二、配置application.properties

# rabbitmq
spring.rabbitmq.host = dev-mq.a.pa.com
spring.rabbitmq.port = 5672
spring.rabbitmq.username = admin
spring.rabbitmq.password = admin
spring.rabbitmq.virtualHost = /message-test/

三、编写AmqpConfiguration配置文件

package message.test.configuration;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpConfiguration {
/**
 * 消息编码
 */
 public static final String MESSAGE_ENCODING = "UTF-8";
 public static final String EXCHANGE_ISSUE = "exchange_message_issue";
 public static final String QUEUE_ISSUE_USER = "queue_message_issue_user";
 public static final String QUEUE_ISSUE_ALL_USER = "queue_message_issue_all_user";
 public static final String QUEUE_ISSUE_ALL_DEVICE = "queue_message_issue_all_device";
 public static final String QUEUE_ISSUE_CITY = "queue_message_issue_city";
 public static final String ROUTING_KEY_ISSUE_USER = "routing_key_message_issue_user";
 public static final String ROUTING_KEY_ISSUE_ALL_USER = "routing_key_message_issue_all_user";
 public static final String ROUTING_KEY_ISSUE_ALL_DEVICE = "routing_key_message_issue_all_device";
 public static final String ROUTING_KEY_ISSUE_CITY = "routing_key_message_issue_city";
 public static final String EXCHANGE_PUSH = "exchange_message_push";
 public static final String QUEUE_PUSH_RESULT = "queue_message_push_result";

 @Autowired
 private RabbitProperties rabbitProperties;

 @Bean
 public Queue issueUserQueue() {
  return new Queue(QUEUE_ISSUE_USER);
 }

 @Bean
 public Queue issueAllUserQueue() {
  return new Queue(QUEUE_ISSUE_ALL_USER);
 }

 @Bean
 public Queue issueAllDeviceQueue() {
  return new Queue(QUEUE_ISSUE_ALL_DEVICE);
 }

 @Bean
 public Queue issueCityQueue() {
  return new Queue(QUEUE_ISSUE_CITY);
 }

 @Bean
 public Queue pushResultQueue() {
  return new Queue(QUEUE_PUSH_RESULT);
 }

 @Bean
 public DirectExchange issueExchange() {
  return new DirectExchange(EXCHANGE_ISSUE);
 }

 @Bean
 public DirectExchange pushExchange() {
  // 参数1:队列
  // 参数2:是否持久化
  // 参数3:是否自动删除
  return new DirectExchange(EXCHANGE_PUSH, true, true);
 }

 @Bean
 public Binding issueUserQueueBinding(@Qualifier("issueUserQueue") Queue queue,
    @Qualifier("issueExchange") DirectExchange exchange) {
   return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_USER);
 }

 @Bean
 public Binding issueAllUserQueueBinding(@Qualifier("issueAllUserQueue") Queue queue,
    @Qualifier("issueExchange") DirectExchange exchange) {
  return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_USER);
 }

 @Bean
 public Binding issueAllDeviceQueueBinding(@Qualifier("issueAllDeviceQueue") Queue queue,
    @Qualifier("issueExchange") DirectExchange exchange) {
  return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_ALL_DEVICE);
 }

 @Bean
 public Binding issueCityQueueBinding(@Qualifier("issueCityQueue") Queue queue,
    @Qualifier("issueExchange") DirectExchange exchange) {
  return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY_ISSUE_CITY);
 }

 @Bean
 public Binding pushResultQueueBinding(@Qualifier("pushResultQueue") Queue queue,
    @Qualifier("pushExchange") DirectExchange exchange) {
  return BindingBuilder.bind(queue).to(exchange).withQueueName();
 }

 @Bean
 public ConnectionFactory defaultConnectionFactory() {
  CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
  connectionFactory.setHost(rabbitProperties.getHost());
  connectionFactory.setPort(rabbitProperties.getPort());
  connectionFactory.setUsername(rabbitProperties.getUsername());
  connectionFactory.setPassword(rabbitProperties.getPassword());
  connectionFactory.setVirtualHost(rabbitProperties.getVirtualHost());
  return connectionFactory;
 }

 @Bean
 public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
    @Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  factory.setConnectionFactory(connectionFactory);
  factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
  return factory;
 }

 @Bean
 public AmqpTemplate rabbitTemplate(@Qualifier("defaultConnectionFactory") ConnectionFactory connectionFactory)
 {
  return new RabbitTemplate(connectionFactory);
 }
}

三、编写生产者

body = JSON.toJSONString(issueMessage).getBytes(AmqpConfiguration.MESSAGE_ENCODING);
 rabbitTemplate.convertAndSend(AmqpConfiguration.EXCHANGE_ISSUE,
            AmqpConfiguration.ROUTING_KEY_ISSUE_USER, body);

四、编写消费者

@RabbitListener(queues = AmqpConfiguration.QUEUE_PUSH_RESULT)
public void handlePushResult(@Payload byte[] data, Channel channel,
    @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {

}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 浅谈SpringBoot是如何实现日志的

    前言 休息日闲着无聊看了下 SpringBoot 中的日志实现,把我的理解跟大家说下. 门面模式 说到日志框架不得不说门面模式.门面模式,其核心为外部与一个子系统的通信必须通过一个统一的外观对象进行,使得子系统更易于使用.用一张图来表示门面模式的结构为: 简单来说,该模式就是把一些复杂的流程封装成一个接口供给外部用户更简单的使用.这个模式中,设计到3个角色. 1).门面角色:外观模式的核心.它被客户角色调用,它熟悉子系统的功能.内部根据客户角色的需求预定了几种功能的组合(模块). 2).子系统(

  • SpringBoot2 task scheduler 定时任务调度器四种方式

    使用@EnableScheduling方式 @Component @Configurable @EnableScheduling public class Task1 { private static Log logger = LogFactory.getLog(Task1.class); @Scheduled(cron = "0/2 * * * * * ") public void execute() { logger.info("Task1>>" +

  • 使用dockercompose搭建springboot-mysql-nginx应用

    上篇使用docker构建spring-boot应用,是把编译好的jar包构建到镜像中. 这篇是把spring-boot连同数据库,做为一组docker服务运行起来. 这里只是把自己操作记录下来,完整运行的代码见"参考"中的引用1中的内容. (我修改mysql映射目录及获取远程ip的方法) 主要步骤: 搭建简单的springboot应用 应用添加docker下支持 编写dockercompose配置文件 实践运行 搭建简单的springboot应用 做一个web应用,统计访问该站点的ip

  • SpringBoot2.0新特性之配置绑定全解析

    在Spring Boot 2.0中推出了Relaxed Binding 2.0,对原有的属性绑定功能做了非常多的改进以帮助我们更容易的在Spring应用中加载和读取配置信息.下面本文就来说说Spring Boot 2.0中对配置的改进. 配置文件绑定 简单类型 在Spring Boot 2.0中对配置属性加载的时候会除了像1.x版本时候那样移除特殊字符外,还会将配置均以全小写的方式进行匹配和加载.所以,下面的4种配置方式都是等价的: properties格式: spring.jpa.databa

  • springboot后端解决跨域问题

    首先我门要知道什么是跨域: 跨域是指 不同域名之间相互访问.跨域,指的是浏览器不能执行其他网站的脚本.它是由浏览器的同源策略造成的,是浏览器对JavaScript施加的安全限制. 也就是如果在A网站中,我们希望使用Ajax来获得B网站中的特定内容 如果A网站与B网站不在同一个域中,那么就出现了跨域访问问题. 什么是同一个域? 同一协议,同一ip,同一端口,三同中有一不同就产生了跨域. 前端解决跨域: 前边也说了,跨域是浏览器不能执行其他网站的脚本.它是由浏览器的同源策略造成的,是浏览器对Java

  • SpringBoot与Quartz集成实现分布式定时任务集群的代码实例

    Spring Boot与Quartz集成实现分布式定时任务集群 直接贴代码 POM <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs

  • Springboot项目打war包docker包找不到resource下静态资源的解决方案

    前一段时间遇到一个问题,是关于读取项目中文件资源的问题.我是一个maven工程 我把一张照片放到resource下面,然后在本地读取的时候可以读取到,但是一旦打成WAR包以后就总是包找不到文件资源错误.我的war包是springboot打的war包,是内嵌的tomcat所以不解压,然后系统去找路径的时候会发现是个WAR包,而图片在WAR包内,所以找不到. 为了解决这个问题,我走了好多弯路,一直在路径上花费时间. 一开始使用修改配置文件的方式: # 配置静态资源访问前缀 spring.mvc.st

  • SpringBoot+Spring Security+JWT实现RESTful Api权限控制的方法

    摘要:用spring-boot开发RESTful API非常的方便,在生产环境中,对发布的API增加授权保护是非常必要的.现在我们来看如何利用JWT技术为API增加授权保护,保证只有获得授权的用户才能够访问API. 一:开发一个简单的API 在IDEA开发工具中新建一个maven工程,添加对应的依赖如下: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-b

  • SpringBoot+Swagger-ui自动生成API文档

    随着互联网技术的发展,现在的网站架构基本都由原来的后端渲染,变成了:前端渲染.先后端分离的形态,而且前端技术和后端技术在各自的道路上越走越远. 这样后段开发好了api 之后就要提交api 文档给前端的朋友.给前端的api 文档各个公司有各个公司的要求,有的是word 有的是 md 文档,或者是 postman 的一个连接. 好了废话不多说说一下 swagger -ui 吧 什么是Swagger Swagger是一个Restful风格接口的文档在线自动生成和测试的框架 官网:http://swag

  • SpringBoot记录Http请求日志的方法

    在使用Spring Boot开发 web api 的时候希望把 request,request header ,response reponse header , uri, method 等等的信息记录到我们的日志中,方便我们排查问题,也能对系统的数据做一些统计. Spring 使用了 DispatcherServlet 来拦截并分发请求,我们只要自己实现一个 DispatcherServlet 并在其中对请求和响应做处理打印到日志中即可. 我们实现一个自己的分发 Servlet ,它继承于 D

随机推荐