详解Springboot整合ActiveMQ(Queue和Topic两种模式)

写在前面: 从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目。这里对学习Springboot的一些知识总结记录一下。如果你也在学习SpringBoot,可以关注我,一起学习,一起进步。

ActiveMQ简介

1、ActiveMQ简介

Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

2、ActiveMQ下载

下载地址:http://activemq.apache.org/components/classic/download/

下载完成后解压双击activemq.bat文件打开(不用安装,直接使用),目录和打开后效果如下:

运行后,浏览器访问http://localhost:8161/地址进入一下界面。

点击Manage ActiveMQ broker登录到ActiveMQ管理页面,默认账号和密码都是admin。管理页面如下:

SpringBoot整合ActiveMQ

1、新建SpringBoot项目

新建Springboot项目,添加对应的依赖。项目完整的pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>
 <parent>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-parent</artifactId>
  <version>2.2.5.RELEASE</version>
  <relativePath/> <!-- lookup parent from repository -->
 </parent>
 <groupId>com.mcy</groupId>
 <artifactId>springboot-mq</artifactId>
 <version>0.0.1-SNAPSHOT</version>
 <name>springboot-mq</name>
 <description>Demo project for Spring Boot</description>

 <properties>
  <java.version>1.8</java.version>
 </properties>

 <dependencies>
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
  </dependency>

  <!--Activemq依赖-->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>

  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
   <exclusions>
    <exclusion>
     <groupId>org.junit.vintage</groupId>
     <artifactId>junit-vintage-engine</artifactId>
    </exclusion>
   </exclusions>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
   </plugin>
  </plugins>
 </build>
</project>

2、项目结构

3、相关配置信息

在application.properties类中添加ActiveMQ相关的配置信息

server.port=8080
server.servlet.context-path=/mq

#MQ服务器地址
spring.activemq.broker-url=tcp://localhost:61616
#用户名
spring.activemq.user=admin
#密码
spring.activemq.password=admin
#设置是Queue队列还是Topic,false为Queue,true为Topic,默认false-Queue
spring.jms.pub-sub-domain=false
#spring.jms.pub-sub-domain=true

#变量,定义队列和topic的名称
myqueue: activemq-queue
mytopic: activemq-topic

4、ActiveMQ配置类

ActiveMQ配置类ConfigBean,配置了Queue队列和topic两种模式,代码如下:

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.stereotype.Component;
import javax.jms.Topic;

/**
 1. MQ配置类
 */
@Component
@EnableJms
public class ConfigBean {
 @Value("${myqueue}")
 private String myQueue;
 @Value("${mytopic}")
 private String topicName;

 //队列
 @Bean
 public ActiveMQQueue queue(){
  return new ActiveMQQueue(myQueue);
 }

 //topic
 @Bean
 public Topic topic(){
  return new ActiveMQTopic(topicName);
 }
}

Queue队列模式

队列模式即点对点传输。
点对点消息传递域的特点如下:

每个消息只能有一个消费者,类似于1对1的关系。好比个人快递自己领自己的。

消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。

消息被消费后队列中不会再存储,所以消费者不会消费到已经被消费掉的消息。

1、队列生产者

QueueProducerController类为队列生产者控制器,主要向消息队列中发送消息。代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Queue;

/*
 * 队列消息生产者
 */
@RestController
public class QueueProducerController {
 @Autowired
 private JmsMessagingTemplate jmsMessagingTemplate;

 @Autowired
 private Queue queue;

 /*
  * 消息生产者
  */
 @RequestMapping("/sendmsg")
 public void sendmsg(String msg) {
  System.out.println("发送消息到队列:" + msg);
  // 指定消息发送的目的地及内容
  this.jmsMessagingTemplate.convertAndSend(this.queue, msg);
 }
}

2、队列消费者

QueueConsumerController类为队列消费者控制器,具体代码如下:

import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;

/*
 1. 队列queue消费者控制器
 */
@RestController
public class QueueConsumerController {
 /*
  * 消费者接收消息
  */
 @JmsListener(destination="${myqueue}")
 public void readActiveQueue(String message) {
  System.out.println("接受到:" + message);
 }
}

3、测试效果

运行项目在浏览器中访问http://localhost:8080/mq/sendmsg?msg=123。向消息队列中发送123。控制台输出效果:

ActiveMQ控制台显示:

  • Number Of Pending Messages:消息队列中待处理的消息
  • Number Of Consumers:消费者的数量
  • Messages Enqueued:累计进入过消息队列的总量
  • Messages Dequeued:累计消费过的消息总量

【注】队列模式时,配置文件application.properties中spring.jms.pub-sub-domain属性必须设置为false。

Topic模式

topic模式基于发布/订阅模式的传输。
基于发布/订阅模式的传输的特点如下:

  • 生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系;
  • 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
  • 生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息。

1、topic生产者

TopicProducerController类为topic生产者控制器,主要向消息队列中发送消息。代码如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.jms.Queue;
import javax.jms.Topic;

/*
* topic消息生产者
*/
@RestController
public class TopicProducerController {
 @Autowired
 private JmsMessagingTemplate jmsMessagingTemplate;

 @Autowired
 private Topic topic;

 /*
 * 消息生产者
 */
 @RequestMapping("/topicsendmsg")
 public void sendmsg(String msg) {
  System.out.println("发送消息到MQ:" + msg);
  // 指定消息发送的目的地及内容
  this.jmsMessagingTemplate.convertAndSend(this.topic, msg);
 }
}

2、topic消费者

TopicConsumerController类为topic消费者控制器,其中写了两个消费者方法,可以理解为有两个用户订阅。具体代码如下:

import org.springframework.jms.annotation.JmsListener;
import org.springframework.web.bind.annotation.RestController;

/*
 1. topic消费者控制器
 */
@RestController
public class TopicConsumerController {
 /*
  * 消费者接收消息
  */
 @JmsListener(destination="${mytopic}")
 public void readActiveQueue(String message) {
  System.out.println("接受到:" + message);
 }

 @JmsListener(destination="${mytopic}")
 public void readActiveQueue1(String message) {
  System.out.println("接受到:" + message);
 }
}

3、测试效果

运行项目在浏览器中访问http://localhost:8080/mq/topicsendmsg?msg=123。向消息队列中发送123。控制台输出效果(有两个消费者方法):

ActiveMQ控制台显示:

  • Number Of Consumers:消费者的数量
  • Messages Enqueued:累计进入过消息队列的总量
  • Messages Dequeued:累计消费过的消息总量

【注】Topic模式时,配置文件application.properties中spring.jms.pub-sub-domain属性必须设置为true。

到此这篇关于详解Springboot整合ActiveMQ(Queue和Topic两种模式)的文章就介绍到这了,更多相关Springboot整合ActiveMQ内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • activemq整合springboot使用方法(个人微信小程序用)

    主题 ActiveMQ Spring Boot 小程序开发 1.引入依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> <relativePath /> <!-- lookup

  • springboot集成activemq的实例代码

    ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 特性 多种语言和协议编写客户端.语言: Java,C,C++,C#,Ruby,Perl,Python,PHP.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  • Springboot Activemq整合过程代码图解

    这篇文章主要介绍了Springboot Activemq整合过程代码图解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Springboot+Activemq整合 1 导入整合所需要的依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifac

  • SpringBoot整合ActiveMQ过程解析

    目录结构 引入 maven依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> </parent> <properties> &l

  • Springboot整合activemq的方法步骤

    今天呢心血来潮,也有很多以前的学弟问到我关于消息队列的一些问题,有个刚入门,有的有问题都来问我,那么今天来说说如何快速入门mq. 一.首先说下什么是消息队列? 1.消息队列是在消息的传输过程中保存消息的容器. 二.为什么要用到消息队列? 主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达 MySQL ,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误.通过使用消息队列

  • 详解Springboot整合ActiveMQ(Queue和Topic两种模式)

    写在前面: 从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目.这里对学习Springboot的一些知识总结记录一下.如果你也在学习SpringBoot,可以关注我,一起学习,一起进步. ActiveMQ简介 1.ActiveMQ简介 Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件:由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行. 2.ActiveMQ下载 下载地址:htt

  • 详解Springboot之接收json字符串的两种方式

    第一种方式.通过关键字段@RequestBody,标明这个对象接收json字符串.还有第二种方式,直接通过request来获取流.在spring中,推荐使用. 代码地址 https://gitee.com/yellowcong/springboot-demo/tree/master/springboot-json 项目结构 其实项目里面没啥类容,就是一个控制器和pom.xml配置 配置fastjson 添加fastjson的依赖到pom.xml中 <dependency> <groupI

  • 详解Spring-boot中读取config配置文件的两种方式

    了解过spring-Boot这个技术的,应该知道Spring-Boot的核心配置文件application.properties,当然也可以通过注解自定义配置文件的信息. Spring-Boot读取配置文件的方式: 一.读取核心配置文件信息application.properties的内容 核心配置文件是指在resources根目录下的application.properties或application.yml配置文件,读取这两个配置文件的方法有两种,都比较简单. 核心配置文件applicati

  • 详解SpringBoot整合RabbitMQ如何实现消息确认

    目录 简介 生产者消息确认 介绍 流程 配置 ConfirmCallback ReturnCallback 注册ConfirmCallback和ReturnCallback 消费者消息确认 介绍 手动确认三种方式 简介 本文介绍SpringBoot整合RabbitMQ如何进行消息的确认. 生产者消息确认 介绍 发送消息确认:用来确认消息从 producer发送到 broker 然后broker 的 exchange 到 queue过程中,消息是否成功投递. 如果消息和队列是可持久化的,那么确认消

  • 详解springboot整合ueditor踩过的坑

    有一天老板突然找我让我改富文本(一脸懵逼,不过也不能推啊默默地接下了),大家都知道现在的富文本视频功能都是只有上传链接的没有从本地上传这一说(就连现在的csdn的也是)于是我找了好多个,最终发现百度的ueditor可以. 经过几天的日夜,甚至牺牲了周末休息时间开始翻阅资料... 废话不多说,开始教程: 第一步: 去ue官网下载他的源码 第二步: 解压下载的源码(下载可能会慢,好像需要翻墙下载) 然后打开项目把源码拖进项目的resources/static中去 第三步 就是重点了 由于spring

  • 详解SpringBoot整合MyBatis详细教程

    1. 导入依赖 首先新建一个springboot项目,勾选组件时勾选Spring Web.JDBC API.MySQL Driver 然后导入以下整合依赖 <!-- https://mvnrepository.com/artifact/org.mybatis.spring.boot/mybatis-spring-boot-starter --> <dependency> <groupId>org.mybatis.spring.boot</groupId> &

  • 详解SpringBoot使用RedisTemplate操作Redis的5种数据类型

    目录 1.字符串(String) 1.1 void set(K key, V value):V get(Object key) 1.2 void set(K key, V value, long timeout, TimeUnit unit) 1.3 V getAndSet(K key, V value) 1.4 Integer append(K key, V value) 1.5 Long size(K key) 2.列表(List) 2.1 Long leftPushAll(K key, V

  • 详解springboot解决CORS跨域的三种方式

    目录 一.实现WebMvcConfigurer接口 二.实现filter过滤器方式 三.注解@CrossOrigin 四.实战 五.cookie的跨域 一.实现WebMvcConfigurer接口 @Configuration public class WebConfig implements WebMvcConfigurer { /** * 添加跨域支持 */ @Override public void addCorsMappings(CorsRegistry registry) { // 允

  • 详解Python修复遥感影像条带的两种方式

    GDAL修复Landsat ETM+影像条带 Landsat7 ETM+卫星影像由于卫星传感器故障,导致此后获取的影像出现了条带.如下图所示, 影像中均匀的布满条带. 使用GDAL修复影像条带的代码如下: def gdal_repair(tif_name, out_name, bands): """ tif_name(string): 源影像名 out_name(string): 输出影像名 bands(integer): 影像波段数 """ #

  • 详解git的分支与合并的两种方法

    如何将两个分支合并到一起.就是说我们新建一个分支,在其上开发某个新功能,开发完成后再合并回主线. 1.   git merge 咱们先来看一下第一种方法 -- git merge 在 Git 中合并两个分支时会产生一个特殊的提交记录,它有两个父节点.翻译成自然语言相当于:"我要把这两个父节点本身及它们所有的祖先都包含进来."下面具体解释. # 创建新分支 bugFix git branch bugFix # 切换到该分支 git checkout bugFix # 提交一次 git c

随机推荐