Springboot整合Active消息队列

简单理解:

Active是Apache公司旗下的一个消息总线,ActiveMQ是一个开源兼容Java Message Service(JMS) 面向消息的中件间. 是一个提供松耦合的应用程序架构.

主要用来在服务与服务之间进行异步通信的。

一、搭建步骤
    1、相应jar包

<!-- 整合消息队列ActiveMQ -->
  <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-activemq</artifactId>
  </dependency>

 <!-- 如果配置线程池则加入 -->
  <dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-pool</artifactId>
  </dependency>

2、application.properties文件

#整合jms测试,安装在别的机器,防火墙和端口号记得开放
spring.activemq.broker-url=tcp://47.96.44.110:61616

spring.activemq.user=admin
spring.activemq.password=admin
#下列配置要增加依赖
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=100

#集群配置(后续需要在配上)
#spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉)
# spring.jms.pub-sub-domain=true

3、Springboot主类

<!-- 主类需要多加一个@EnableJms注解,不过貌似我没有加的时候,也能运行,为安全起见姑且加上 -->
@SpringBootApplication
@EnableJms

4.5.......根据不同消息模式来写了。

二、点对点案例
   我在这里案例中创建了两个点对点队列,所以他会有两个queue对象,同样对应每个queue对象,都会有单一对应的消费者。

1、Springboot主类

@SpringBootApplication
@EnableJms
public class Main {

 public static void main(String[] args) {
  SpringApplication.run(Main.class, args);
 }

 //新建一个的Queue对象,交给sringboot管理,这个queue的名称叫"first.queue".
 @Bean
 public Queue queue(){
  return new ActiveMQQueue("first.queue");
 }
}

2.1、first.queue对应消费者

@Component
public class FirstConsumer {

 //名为"first.queue"消息队列的消费者,通过JmsListener进行监听有没有消息,有消息会立刻读取过来
 @JmsListener(destination="first.queue")
 public void receiveQueue(String text){
  System.out.println("FirstConsumer收到的报文为:"+text);
 }
}

2.2、two.queue对应消费者(后面会创建)

@Component
public class TwoConsumer {

 //名为"two.queue"消息队列的消费者
 @JmsListener(destination="two.queue")
 public void receiveQueue(String text){
  System.out.println("TwoConsumer收到的报文为:"+text);
 }
}

3、Service类

/**
 * 功能描述:消息生产
 */
public interface ProducerService {

 // 功能描述:指定消息队列,还有消息
 public void sendMessage(Destination destination, final String message);

 // 功能描述:使用默认消息队列, 发送消息
 public void sendMessage( final String message);

}

4、ServiceImpl实现类

/**
 * 功能描述:消息生产者实现类
 */
@Service
public class ProducerServiceImpl implements ProducerService{

 //这个队列就是Springboot主类中bean的对象
 @Autowired
 private Queue queue;

 //用来发送消息到broker的对象,可以理解连接数据库的JDBC
 @Autowired
 private JmsMessagingTemplate jmsTemplate; 

 //发送消息,destination是发送到的队列,message是待发送的消息
 @Override
 public void sendMessage(Destination destination, String message) {
  jmsTemplate.convertAndSend(destination, message);
 }

 //发送消息,queue是发送到的队列,message是待发送的消息
 @Override
 public void sendMessage(final String message) {
  jmsTemplate.convertAndSend(this.queue, message);
 }
}

5.QueueController类

/**
 * 功能描述:点对点消息队列控制层
 */
@RestController
@RequestMapping("/api/v1")
public class QueueController {

 @Autowired
 private ProducerService producerService;  

 // 这里后面调用的是Springboot主类的quene队列
 @GetMapping("first")
 public Object common(String msg){
  producerService.sendMessage(msg);
  return "Success";
 }  

 // 这个队列是新建的一个名为two.queue的点对点消息队列
 @GetMapping("two")
 public Object order(String msg){

  Destination destination = new ActiveMQQueue("two.queue");
  producerService.sendMessage(destination, msg);

  return "Success";
 }
}

6、案例演示:

从演示效果可以得出以下结论:

1:当springboot启动时候,就生成了这两个队列,而且他们都会有一个消费者

2:当我通过页面访问的时候,就相当于生产者把消息放到队列中,一旦放进去就会被消费者监听到,就可以获取生产者放进去的值并在后台打印出

顺便对页面中四个单词进行解释:

Number Of Pending Messages :待处理消息的数量。我们每次都会被监听处理掉,所以不存在待处理,如果存在就说这里面哪里出故障了,需要排查

Number Of Consumers : 消费者数量

Messages Enqueued:    消息排列,这个只增不见,代表已经处理多少消息

Messages Dequeued:    消息出队。

 三、发布/订阅者模式

在上面点对点代码的基础上,添加发布/订阅相关代码

1.appliaction.properties文件

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉)
spring.jms.pub-sub-domain=true

2.Springboot主类添加

//新建一个topic队列
 @Bean
 public Topic topic(){
  return new ActiveMQTopic("video.topic");
 }

3.添加多个消费者类

//这里定义了三个消费者
@Component
public class TopicSub {

 @JmsListener(destination="video.topic")
 public void receive1(String text){
  System.out.println("video.topic 消费者:receive1="+text);
 }

 @JmsListener(destination="video.topic")
 public void receive2(String text){
  System.out.println("video.topic 消费者:receive2="+text);
 }

 @JmsListener(destination="video.topic")
 public void receive3(String text){
  System.out.println("video.topic 消费者:receive3="+text);
 }
}

4.Service类

 //功能描述:消息发布者
 public void publish(String msg);

5.ServiceImpl实现类

//=======发布订阅相关代码=========

  @Autowired
  private Topic topic;

   @Override
  public void publish(String msg) {
   this.jmsTemplate.convertAndSend(this.topic, msg);

  }

6.Controller类

// 这个队列是新建的一个名为two.queue的点对点消息队列
  @GetMapping("topic")
  public Object topic(String msg){

   producerService.publish(msg);

   return "Success";
  }

7.演示效果:

从演示效果总结如下:

1:Springboot启动的时候,在Topics目录下,一共出现了5个消费者。first.queue一个消费者、two.queue一个消费者、video.topic三个消费者

2:当我在控制台输入信息后,video.topic的三个消费者都会监听video.topic发布的消息,并在控制台打印。

四、如何让点对点和发布订阅同时有效

为什么这么说呢,因为当我向上面一样同时开启,会发现点对点模式已经失效了。

效果演示

从演示效果,可以得出如下结论:

1:我们发现我们在页面输入..../two?msg=555消息后,后台并没有成功打印消息。再看Active界面发现,这个queue对象,确实有一条待处理的消息,但是我们发现,它对应的消费者数量是为0.

2:然而我们在打开topic页面发现,这里却存在一个消费者。

所以我个人理解是,当同时启动的时候,所产生的消费者默认都是Topic消费者,没有Queue消费者,所以它监听不到queue所待处理的消息。

当配置文件不加:spring.jms.pub-sub-domain=true  那么系统会默认支持quene(点对点模式),但一旦加上这段配置,系统又变成只支持发布订阅模式。

那如何同时都可以成功呢?

思路如下:

第一步:还是需要去掉配置文件中的:

#消息队列默认是点对点的,如果需要发布/订阅模式那么需要加上下面注解(如果同时需要点对点发布订阅这里也需注释掉)
#spring.jms.pub-sub-domain=true

第二步:在发布订阅者的中消费者中指定独立的containerFactory

因为你去掉上面的配置,那么系统就默认是queue,所以@JmsListener如果不指定独立的containerFactory的话是只能消费queue消息

@JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
 public void receive1(String text){
  System.out.println("video.topic 消费者:receive1="+text);
 }

 @JmsListener(destination="video.topic", containerFactory="jmsListenerContainerTopic")
 public void receive2(String text){
  System.out.println("video.topic 消费者:receive2="+text);
 }

 //第三步我不添加containerFactory="jmsListenerContainerTopic"看等下是否会打印出
 @JmsListener(destination="video.topic")
 public void receive3(String text){
  System.out.println("video.topic 消费者:receive3="+text);
 }

第三步:定义独立的topic定义独立的JmsListenerContainer

在springboot主类中添加:

@Bean
  public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
   DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
   bean.setPubSubDomain(true);
   bean.setConnectionFactory(activeMQConnectionFactory);
   return bean;
  }

效果:

得出结论:

1:点对点,和发布订阅都有用

2:receive3没有指定独立的containerFactory一样没有打印出来。

源码
github地址:https://github.com/yudiandemingzi/springbootAcitveMQ

(0)

相关推荐

  • Springboot Activemq整合过程代码图解

    这篇文章主要介绍了Springboot Activemq整合过程代码图解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 Springboot+Activemq整合 1 导入整合所需要的依赖: <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifac

  • SpringBoot利用redis集成消息队列的方法

    一.pom文件依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 二.创建消息接收者 变量.方法及构造函数进行标注,完成自动装配的工作. 通过 @Autowired的使用来消除 set ,get方法. @Autowired pub

  • 详解Springboot整合ActiveMQ(Queue和Topic两种模式)

    写在前面: 从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目.这里对学习Springboot的一些知识总结记录一下.如果你也在学习SpringBoot,可以关注我,一起学习,一起进步. ActiveMQ简介 1.ActiveMQ简介 Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件:由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行. 2.ActiveMQ下载 下载地址:htt

  • Springboot整合activemq的方法步骤

    今天呢心血来潮,也有很多以前的学弟问到我关于消息队列的一些问题,有个刚入门,有的有问题都来问我,那么今天来说说如何快速入门mq. 一.首先说下什么是消息队列? 1.消息队列是在消息的传输过程中保存消息的容器. 二.为什么要用到消息队列? 主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达 MySQL ,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误.通过使用消息队列

  • activemq整合springboot使用方法(个人微信小程序用)

    主题 ActiveMQ Spring Boot 小程序开发 1.引入依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.3.RELEASE</version> <relativePath /> <!-- lookup

  • SpringBoot整合ActiveMQ过程解析

    目录结构 引入 maven依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> </parent> <properties> &l

  • Springboot整合Active消息队列

    简单理解: Active是Apache公司旗下的一个消息总线,ActiveMQ是一个开源兼容Java Message Service(JMS) 面向消息的中件间. 是一个提供松耦合的应用程序架构. 主要用来在服务与服务之间进行异步通信的. 一.搭建步骤     1.相应jar包 <!-- 整合消息队列ActiveMQ --> <dependency> <groupId>org.springframework.boot</groupId> <artifa

  • SpringBoot整合RabbitMQ消息队列的完整步骤

    SpringBoot整合RabbitMQ 主要实现RabbitMQ以下三种消息队列: 简单消息队列(演示direct模式) 基于RabbitMQ特性的延时消息队列 基于RabbitMQ相关插件的延时消息队列 公共资源 1. 引入pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId>

  • SpringBoot整合rockerMQ消息队列详解

    目录 Springboot整合RockerMQ 使用总结 消费模式 生产者组和消费者组 生产者投递消息的三种方式 如何保证消息不丢失 顺序消息 分布式事务 Springboot整合RockerMQ 1.maven依赖 <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • Springboot 整合 RabbitMQ 消息队列 详情

    目录 生产者工程 POM依赖 application文件 生产者业务代码 测试 Direct 模式 业务代码 消费者 消息监听 Topic 模式 生产者 消费者 生产者工程 POM依赖 可以在创建工程时直接选择添加依赖. application文件 因为rabbitmq具有默认地址及用户信息,所以如果是本地rabbitmq可以不需要进行配置. RabbitMQ配置文件: 在使用相关交换机及队列时,我们需要实现声明交换机及队列,如果没有对应信息,则启动项目会失败.所以在使用springboot整合

  • Spring boot 整合KAFKA消息队列的示例

    这里使用 spring-kafka 依赖和 KafkaTemplate 对象来操作 Kafka 服务. 一.添加依赖和添加配置项 1.1.在 Pom 文件中添加依赖 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> 1.2.添加配置项 spring: kafka: b

  • SpringBoot整合RabbitMQ实现消息确认机制

    前面几篇案例已经将常用的交换器(DirectExchange.TopicExchange.FanoutExchange)的用法介绍完了,现在我们来看一下消息的回调,也就是消息确认. 在rabbitmq-provider项目的application.yml文件上加上一些配置 server: port: 8021 spring: #给项目来个名字 application: name: rabbitmq-provider #配置rabbitMq 服务器 rabbitmq: host: 127.0.0.

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • springboot整合redis之消息队列

    目录 一.项目准备 二.配置类 三.redis中list数据类型 定时器监听队列 运行即监控队列 四.发布/订阅模式 五.ZSet实现延迟队列 一.项目准备 依赖 <!-- RedisTemplate --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> &

  • springboot整合消息队列RabbitMQ

    前言: RabbitMQ常用的三种Exchange Type:fanout.direct.topic. fanout:把所有发送到该Exchange的消息投递到所有与它绑定的队列中. direct:把消息投递到那些binding key与routing key完全匹配的队列中. topic:将消息路由到binding key与routing key模式匹配的队列中. 这里基于springboot整合​​消息队列​​,测试这三种Exchange. 启动RabbitMQ 双击运行rabbitmq-s

  • Springboot整合ActiveMQ实现消息队列的过程浅析

    目录 pom中导入坐标 书写yml配置 业务层代码 监听器代码 业务层代码 确保你启动了自己电脑的activemq. pom中导入坐标 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> 书写yml配置 spring:  activemq:

随机推荐