Spring Boot 入门之消息中间件的使用

一、前言

在消息中间件中有 2 个重要的概念:消息代理和目的地。当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地。

我们常用的消息代理有 JMS 和 AMQP 规范。对应地,它们常见的实现分别是 ActiveMQ 和 RabbitMQ。

二、整合 ActiveMQ

2.1 添加依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!-- 如果需要配置连接池,添加如下依赖 -->
<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-pool</artifactId>
</dependency>

2.2 添加配置

# activemq 配置
spring.activemq.broker-url=tcp://192.168.2.12:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.pool.enabled=false
spring.activemq.pool.max-connections=50
# 使用发布/订阅模式时,下边配置需要设置成 true
spring.jms.pub-sub-domain=false

此处 spring.activemq.pool.enabled=false,表示关闭连接池。

2.3 编码

配置类:

@Configuration
public class JmsConfirguration {
  public static final String QUEUE_NAME = "activemq_queue";

  public static final String TOPIC_NAME = "activemq_topic";

  @Bean
  public Queue queue() {
    return new ActiveMQQueue(QUEUE_NAME);
  }

  @Bean
  public Topic topic() {
    return new ActiveMQTopic(TOPIC_NAME);
  }
}

负责创建队列和主题。

消息生产者:

@Component
public class JmsSender {
  @Autowired
  private Queue queue;

  @Autowired
  private Topic topic;

  @Autowired
  private JmsMessagingTemplate jmsTemplate;

  public void sendByQueue(String message) {
    this.jmsTemplate.convertAndSend(queue, message);
  }

  public void sendByTopic(String message) {
    this.jmsTemplate.convertAndSend(topic, message);
  }
}

消息消费者:

@Component
public class JmsReceiver {

  @JmsListener(destination = JmsConfirguration.QUEUE_NAME)
  public void receiveByQueue(String message) {
    System.out.println("接收队列消息:" + message);
  }

  @JmsListener(destination = JmsConfirguration.TOPIC_NAME)
  public void receiveByTopic(String message) {
    System.out.println("接收主题消息:" + message);
  }
}

消息消费者使用 @JmsListener 注解监听消息。

2.4 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class JmsTest {
  @Autowired
  private JmsSender sender;
  @Test
  public void testSendByQueue() {
    for (int i = 1; i < 6; i++) {
      this.sender.sendByQueue("hello activemq queue " + i);
    }
  }

  @Test
  public void testSendByTopic() {
    for (int i = 1; i < 6; i++) {
      this.sender.sendByTopic("hello activemq topic " + i);
    }
  }
}

打印结果:

接收队列消息:hello activemq queue 1
接收队列消息:hello activemq queue 2
接收队列消息:hello activemq queue 3
接收队列消息:hello activemq queue 4
接收队列消息:hello activemq queue 5

测试发布/订阅模式时,设置 spring.jms.pub-sub-domain=true

接收主题消息:hello activemq topic 1
接收主题消息:hello activemq topic 2
接收主题消息:hello activemq topic 3
接收主题消息:hello activemq topic 4
接收主题消息:hello activemq topic 5

三、整合 RabbitMQ

3.1 添加依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

3.2 添加配置

spring.rabbitmq.host=192.168.2.30
spring.rabbitmq.port=5672
spring.rabbitmq.username=light
spring.rabbitmq.password=light
spring.rabbitmq.virtual-host=/test

3.3 编码

配置类:

@Configuration
public class AmqpConfirguration {
  //=============简单、工作队列模式===============

  public static final String SIMPLE_QUEUE = "simple_queue";
  @Bean
  public Queue queue() {
    return new Queue(SIMPLE_QUEUE, true);
  }

  //===============发布/订阅模式============

  public static final String PS_QUEUE_1 = "ps_queue_1";
  public static final String PS_QUEUE_2 = "ps_queue_2";
  public static final String FANOUT_EXCHANGE = "fanout_exchange";

  @Bean
  public Queue psQueue1() {
    return new Queue(PS_QUEUE_1, true);
  }

  @Bean
  public Queue psQueue2() {
    return new Queue(PS_QUEUE_2, true);
  }

  @Bean
  public FanoutExchange fanoutExchange() {
    return new FanoutExchange(FANOUT_EXCHANGE);
  }

  @Bean
  public Binding fanoutBinding1() {
    return BindingBuilder.bind(psQueue1()).to(fanoutExchange());
  }

  @Bean
  public Binding fanoutBinding2() {
    return BindingBuilder.bind(psQueue2()).to(fanoutExchange());
  }
  //===============路由模式============

  public static final String ROUTING_QUEUE_1 = "routing_queue_1";
  public static final String ROUTING_QUEUE_2 = "routing_queue_2";
  public static final String DIRECT_EXCHANGE = "direct_exchange";

  @Bean
  public Queue routingQueue1() {
    return new Queue(ROUTING_QUEUE_1, true);
  }

  @Bean
  public Queue routingQueue2() {
    return new Queue(ROUTING_QUEUE_2, true);
  }

  @Bean
  public DirectExchange directExchange() {
    return new DirectExchange(DIRECT_EXCHANGE);
  }

  @Bean
  public Binding directBinding1() {
    return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("user");
  }

  @Bean
  public Binding directBinding2() {
    return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("order");
  }

  //===============主题模式============

  public static final String TOPIC_QUEUE_1 = "topic_queue_1";
  public static final String TOPIC_QUEUE_2 = "topic_queue_2";
  public static final String TOPIC_EXCHANGE = "topic_exchange";

  @Bean
  public Queue topicQueue1() {
    return new Queue(TOPIC_QUEUE_1, true);
  }

  @Bean
  public Queue topicQueue2() {
    return new Queue(TOPIC_QUEUE_2, true);
  }

  @Bean
  public TopicExchange topicExchange() {
    return new TopicExchange(TOPIC_EXCHANGE);
  }

  @Bean
  public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add");
  }

  @Bean
  public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#");
  }
}

RabbitMQ 有多种工作模式,因此配置比较多。想了解相关内容的读者可以查看《RabbitMQ 工作模式介绍》或者自行百度相关资料。

消息生产者:

@Component
public class AmqpSender {
  @Autowired
  private AmqpTemplate amqpTemplate;
  /**
   * 简单模式发送
   *
   * @param message
   */
  public void simpleSend(String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.SIMPLE_QUEUE, message);
  }
  /**
   * 发布/订阅模式发送
   *
   * @param message
   */
  public void psSend(String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.FANOUT_EXCHANGE, "", message);
  }
  /**
   * 路由模式发送
   *
   * @param message
   */
  public void routingSend(String routingKey, String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.DIRECT_EXCHANGE, routingKey, message);
  }
  /**
   * 主题模式发送
   *
   * @param routingKey
   * @param message
   */
  public void topicSend(String routingKey, String message) {
    this.amqpTemplate.convertAndSend(AmqpConfirguration.TOPIC_EXCHANGE, routingKey, message);
  }
}

消息消费者:

@Component
public class AmqpReceiver {
  /**
   * 简单模式接收
   *
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.SIMPLE_QUEUE)
  public void simpleReceive(String message) {
    System.out.println("接收消息:" + message);
  }
  /**
   * 发布/订阅模式接收
   *
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_1)
  public void psReceive1(String message) {
    System.out.println(AmqpConfirguration.PS_QUEUE_1 + "接收消息:" + message);
  }
  @RabbitListener(queues = AmqpConfirguration.PS_QUEUE_2)
  public void psReceive2(String message) {
    System.out.println(AmqpConfirguration.PS_QUEUE_2 + "接收消息:" + message);
  }
  /**
   * 路由模式接收
   *
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_1)
  public void routingReceive1(String message) {
    System.out.println(AmqpConfirguration.ROUTING_QUEUE_1 + "接收消息:" + message);
  }
  @RabbitListener(queues = AmqpConfirguration.ROUTING_QUEUE_2)
  public void routingReceive2(String message) {
    System.out.println(AmqpConfirguration.ROUTING_QUEUE_2 + "接收消息:" + message);
  }
  /**
   * 主题模式接收
   *
   * @param message
   */
  @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_1)
  public void topicReceive1(String message) {
    System.out.println(AmqpConfirguration.TOPIC_QUEUE_1 + "接收消息:" + message);
  }

  @RabbitListener(queues = AmqpConfirguration.TOPIC_QUEUE_2)
  public void topicReceive2(String message) {
    System.out.println(AmqpConfirguration.TOPIC_QUEUE_2 + "接收消息:" + message);
  }
}

消息消费者使用 @RabbitListener 注解监听消息。

3.4 测试

@RunWith(SpringRunner.class)
@SpringBootTest
public class AmqpTest {
  @Autowired
  private AmqpSender sender;
  @Test
  public void testSimpleSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.simpleSend("test simpleSend " + i);
    }
  }
  @Test
  public void testPsSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.psSend("test psSend " + i);
    }
  }

  @Test
  public void testRoutingSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.routingSend("order", "test routingSend " + i);
    }
  }

  @Test
  public void testTopicSend() {
    for (int i = 1; i < 6; i++) {
      this.sender.topicSend("user.add", "test topicSend " + i);
    }
  }
}

测试结果略过。。。

踩坑提醒1:ACCESS_REFUSED – Login was refused using authentication mechanism PLAIN

解决方案:

1) 请确保用户名和密码是否正确,需要注意的是用户名和密码的值是否包含空格或制表符(笔者测试时就是因为密码多了一个制表符导致认证失败)。

2) 如果测试账户使用的是 guest,需要修改 rabbitmq.conf 文件。在该文件中添加 “loopback_users = none” 配置。

踩坑提醒2:Cannot prepare queue for listener. Either the queue doesn't exist or the broker will not allow us to use it

解决方案:

我们可以登陆 RabbitMQ 的管理界面,在 Queue 选项中手动添加对应的队列。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解SpringBoot文件上传下载和多文件上传(图文)

    最近在学习SpringBoot,以下是最近学习整理的实现文件上传下载的Java代码: 1.开发环境: IDEA15+ Maven+JDK1.8 2.新建一个maven工程: 3.工程框架 4.pom.xml文件依赖项 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation

  • 详解SpringBoot配置连接池

    内置的连接池 目前spring Boot中默认支持的连接池有dbcp,dbcp2, tomcat, hikari三种连接池. 数据库连接可以使用DataSource池进行自动配置. 由于Tomcat数据源连接池的性能和并发,在tomcat可用时,我们总是优先使用它. 如果HikariCP可用,我们将使用它. 如果Commons DBCP可用,我们将使用它,但在生产环境不推荐使用它. 最后,如果Commons DBCP2可用,我们将使用它. 以上的几种连接池,可以通过在配置application文

  • springboot实现拦截器之验证登录示例

    整理文档,搜刮出一个springboot实现拦截器之验证登录示例,稍微整理精简一下做下分享. 添加jar包,这个jar包不是必须的,只是在拦截器里用到了,如果不用的话,完全可以不引入 <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dep

  • springboot与mybatis整合实例详解(完美融合)

    简介 从 Spring Boot 项目名称中的 Boot 可以看出来,Spring Boot 的作用在于创建和启动新的基于 Spring 框架的项目.它的目的是帮助开发人员很容易的创建出独立运行和产品级别的基于 Spring 框架的应用.Spring Boot 会选择最适合的 Spring 子项目和第三方开源库进行整合.大部分 Spring Boot 应用只需要非常少的配置就可以快速运行起来. Spring Boot 包含的特性如下: 创建可以独立运行的 Spring 应用. 直接嵌入 Tomc

  • SpringBoot定时任务两种(Spring Schedule 与 Quartz 整合 )实现方法

    前言 最近在项目中使用到定时任务,之前一直都是使用Quartz 来实现,最近看Spring 基础发现其实Spring 提供 Spring Schedule 可以帮助我们实现简单的定时任务功能. 下面说一下两种方式在Spring Boot 项目中的使用. Spring Schedule 实现定时任务 Spring Schedule 实现定时任务有两种方式 1. 使用XML配置定时任务, 2. 使用 @Scheduled 注解. 因为是Spring Boot 项目 可能尽量避免使用XML配置的形式,

  • Spring boot 默认静态资源路径与手动配置访问路径的方法

    在application.propertis中配置 ##端口号 server.port=8081 ##默认前缀 spring.mvc.view.prefix=/ ## 响应页面默认后缀 spring.mvc.view.suffix=.html # 默认值为 /** spring.mvc.static-path-pattern=/** # 这里设置要指向的路径,多个使用英文逗号隔开,默认值为 classpath:/META-INF/resources/,classpath:/resources/,

  • Spring Boot 日志配置方法(超详细)

    默认日志 Logback : 默认情况下,Spring Boot会用Logback来记录日志,并用INFO级别输出到控制台.在运行应用程序和其他例子时,你应该已经看到很多INFO级别的日志了. 从上图可以看到,日志输出内容元素具体如下: 时间日期:精确到毫秒 日志级别:ERROR, WARN, INFO, DEBUG or TRACE 进程ID 分隔符:- 标识实际日志的开始 线程名:方括号括起来(可能会截断控制台输出) Logger名:通常使用源代码的类名 日志内容 添加日志依赖 假如mave

  • Spring boot实现热部署的两种方式详解

    热部署是什么 大家都知道在项目开发过程中,常常会改动页面数据或者修改数据结构,为了显示改动效果,往往需要重启应用查看改变效果,其实就是重新编译生成了新的 Class 文件,这个文件里记录着和代码等对应的各种信息,然后 Class 文件将被虚拟机的 ClassLoader 加载. 而热部署正是利用了这个特点,它监听到如果有 Class 文件改动了,就会创建一个新的 ClaassLoader 进行加载该文件,经过一系列的过程,最终将结果呈现在我们眼前. 类加载机制 Java 中的类经过编译器可以把代

  • 详解eclipse下创建第一个spring boot项目

    spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.通过这种方式,Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者.也就是说,Spring Boot是为了简化Spring开发而生,主要思想是降低spring的入门,使得新手可以以最快的速度让程序在spring框架下跑起来. 今天我们就来创建

  • Spring Boot 入门之消息中间件的使用

    一.前言 在消息中间件中有 2 个重要的概念:消息代理和目的地.当消息发送者发送消息后,消息就被消息代理接管,消息代理保证消息传递到指定目的地. 我们常用的消息代理有 JMS 和 AMQP 规范.对应地,它们常见的实现分别是 ActiveMQ 和 RabbitMQ. 二.整合 ActiveMQ 2.1 添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>sprin

  • Spring Boot入门(web+freemarker)

    1.配置maven文件pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apach

  • 超详细的Spring Boot入门笔记(总结)

    1. Spring Boot 入门 Spring Boot是Spring社区较新的一个项目.该项目的目的是帮助开发者更容易的创建基于Spring的应用程序和服务,让更多人的人更快的对Spring进行入门体验,让Java开发也能够实现Ruby on Rails那样的生产效率.为Spring生态系统提供了一种固定的.约定优于配置风格的框架. Spring Boot具有如下特性: 为基于Spring的开发提供更快的入门体验 开箱即用,没有代码生成,也无需XML配置.同时也可以修改默认值来满足特定的需求

  • Spring Boot 入门指南

    0x0 前言 记得当初放弃 Java 主要原因是几个框架整合,花了大半天去编写配置文件,编写任务后运行依然有报错,甚是心累,故转前端开发了.最近周围很多 Java 朋友说微服务开发很爽,各种简单,自己本地体验下,的确很简单.所以对此抱有很大的学习兴趣.再加上之前使用 Nestjs 项目很像 Spring Boot 风格寻思还不如直接使用它. 0x1 简介 Spring Boot 是由 Pivotal 团队提供的全新框架,其设计目的是用来简化新 Spring 应用的初始搭建以及开发过程.该框架使用

  • Spring Boot 入门教程

    简介 相信很多人都接触spring框架很长时间了,每次搭建spring框架的时候都需要配置好多的jar.xml,做很多繁琐重复的配置,稍微不留神就会出现各种各样的问题,每次调试真的是香菇.蓝瘦啊. spring boot的出现帮助我们彻底解决了这些jar的依赖,只需要很少的配置就可以完成我们的开发工作,我们可以把自己的应用打包成jar,使用java -jar来运行spring web应用,spring boot集成了很多的web容器.今天给大家介绍一下spring Boot MVC,让我们学习一

  • spring boot入门开始你的第一个应用

    Spring Boot应用可以通过如下三种方法创建: 通过 https://start.spring.io/ 网站创建 通过Spring Initializr创建 自主创建 推荐开发工具: JDK 1.8+ maven 3.2+ IntelliJ IDEA 14 1. 通过 https://start.spring.io/ 网站创建 进入https://start.spring.io/,填写对应的信息,点击"Generate Project"按钮即可下载生成好的项目的zip压缩包,如图

  • Spring boot集成Kafka消息中间件代码实例

    一.创建Spring boot项目,添加如下依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <ar

  • spring boot入门之诞生背景及优势影响

    目录 springboot诞生的背景 springboot改变了什么 SpringBoot主要特性 SpringBoot集成第三方类库的步骤 spring boot诞生的背景 在spring boot出现以前,使用spring框架的程序员是这样配置web应用环境的,需要大量的xml配置.下图展示了在xml配置的时代和SpringBoot的配置量的差别. 随着web项目集成软件的不断增多,xml配置也不断的增多,xml配置文件也在不断地增多,项目的依赖管理也越发的复杂.spring框架也因此饱受争

  • spring boot(一)之入门篇

    本文给大家介绍构建微服务:Spring boot 入门篇,具体内容详情如下所示: 什么是spring boot Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.用我的话来理解,就是spring boot其实不是什么新的框架,它默认配置了很多框架的使用方式,就像maven整合了所有的jar包,spring boot整合了所有的框架(不知道这样比喻是否合

  • Spring Boot 集成MyBatis 教程详解

    Spring Boot是由Pivotal团队提供的全新框架,其设计目的是用来简化新Spring应用的初始搭建以及开发过程.该框架使用了特定的方式来进行配置,从而使开发人员不再需要定义样板化的配置.通过这种方式,Spring Boot致力于在蓬勃发展的快速应用开发领域(rapid application development)成为领导者. 在集成MyBatis前,我们先配置一个druid数据源. Spring Boot 系列 1.Spring Boot 入门 2.Spring Boot 属性配置

随机推荐