Spring Boot ActiveMQ发布/订阅消息模式原理解析

本文在《Spring Boot基于Active MQ实现整合JMS》的基础上,介绍如何使用ActiveMQ的发布/订阅消息模式。发布/订阅消息模式是消息发送者发送消息到主题(topic),而多个消息接收者监听这个主题;其中,消息发送者和接收者分别叫做发布者(publisher)和订阅者(subscriber),对于发布者来说,它和所有的订阅者就构成了一个1对多的关系。这种关系如下图所示:

发布/订阅模式的工作示意图

消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该消息。

和点对点方式不同,发布到topic的消息会被所有订阅者消费;当生产者发布消息时,不管是否有消费者,都不会保存消息;一定要先有消息的消费者,后有消息的生产者。

软件环境

  • ActiveMQ 5.15.13
  • java version 13.0.1
  • IntelliJ IDEA 2019.3.2 (Ultimate Edition)
  • Spring Boot 2.3.0.RELEASE

配置ActiveMQ连接信息

spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.in-memory=true
spring.activemq.pool.enabled=false
spring.activemq.password=admin
spring.activemq.user=admin
#默认值false,表示point to point(点到点)模式,true时代表发布订阅模式,需要手动开启
#spring.jms.pub-sub-domain=true

创建生产者和消费者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import javax.jms.Destination;

/**
 * 生产者
 */
@Service
public class Publisher {
  @Autowired
  private JmsMessagingTemplate jmsMsgTemplate;

  /**
   * 发送topic
   *
   * @param destination
   * @param message
   */
  public void publish(Destination destination, String message) {
    jmsMsgTemplate.convertAndSend(destination, message);
  }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;

/**
 * 消费者
 */
@Service
public class Subscriber2 {
  private static Logger logger = LoggerFactory.getLogger(Subscriber2.class);

  @JmsListener(destination = "topicListener2")
  public void subscriber(String text) {
    logger.info("Subscriber2 收到的报文:{}", text);
  }
}
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.JMSException;

/**
 * 消费者
 */
@Component
public class Subscriber1 {
  private static Logger logger = LoggerFactory.getLogger(Subscriber1.class);

  /**
   * 订阅 topicListener1
   *
   * @param text
   * @throws JMSException
   */
  @JmsListener(destination = "topicListener1")
  public void subscriber(String text) {
    logger.info("Subscriber1 收到的报文:{}", text);
  }

}

发布订阅模式和点对点模式的消费者没有区别,换换监听对象destination的值就行。接下来测试发布订阅模式。

测试发布订阅模式

创建Junit测试用例:

@Test
  public void topicTest() {
    // 设置话题监听者,可以自由切换
    Destination destination = new ActiveMQTopic("topicListener2");
    for (int i = 0; i < 6; i++) {
      publisher.publish(destination, "Topic Message " + i);
    }
    try {
      Thread.sleep(300);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    System.out.println("使线程睡 300 毫秒,保证消费者消费完毕!");
  }

此处设置的订阅者是topicListener2,读者可以切换为topicListener1。发布/订阅模式和点对点模式的生产者的代码主要区别就是Destination的创建方式,点对点模式是调用new ActiveMQQueue (QUEUE_NAME),而发布/订阅模式是调用new ActiveMQTopic (QUEUE_NAME)。

执行结果:

Subscriber2 队列收到的报文:Topic Message 0
Subscriber2 队列收到的报文:Topic Message 1
Subscriber2 队列收到的报文:Topic Message 2
Subscriber2 队列收到的报文:Topic Message 3
Subscriber2 队列收到的报文:Topic Message 4
Subscriber2 队列收到的报文:Topic Message 5

使线程睡 300 毫秒,保证消费者消费完毕!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Boot ActiveMQ如何设置访问密码

    Apache ActiveMQ是Apache出品,是最流行的,能力很强的开源消息总线.默认情况下,程序连接ActiveMQ是不需要密码的,为了安装起见,需要设置密码,提高安全性.本文分享如何设置访问ActiveMQ的账号密码. 小编使用的ActiveMQ版本是apache-activemq-5.15.13. 一.设置控制台管理密码 ActiveMQ使用的是jetty服务器,找到 ActiveMQ安装目录下的\conf\jetty.xml文件: <bean id="adminSecurity

  • Spring Boot基于Active MQ实现整合JMS

    我们使用jms一般是使用spring-jms和activemq相结合,通过spring Boot为我们配置好的JmsTemplate发送消息到指定的目的地Destination.本文以点到点消息模式为例,演示如何在Spring Boot中整合 JMS 和 Active MQ ,实现 MQ 消息的生产与消费. 点到点消息模式定义:当消息发送者发送消息,消息代理获得消息后,把消息放入一个队列里,当有消息接收者来接收消息的时候,消息将从队列里取出并且传递给接收者,这时候队列里就没有此消息了.队列Que

  • 详解Springboot整合ActiveMQ(Queue和Topic两种模式)

    写在前面: 从2018年底开始学习SpringBoot,也用SpringBoot写过一些项目.这里对学习Springboot的一些知识总结记录一下.如果你也在学习SpringBoot,可以关注我,一起学习,一起进步. ActiveMQ简介 1.ActiveMQ简介 Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件:由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行. 2.ActiveMQ下载 下载地址:htt

  • Spring Boot ActiveMQ连接池配置过程解析

    spring.activemq.pool.enabled=false时,每发送一条数据都需要创建一个连接,这样会出现频繁创建和销毁连接的场景.为了不踩这个坑,我们参考池化技术的思想,配置ActiveMQ连接池.在Spring Boot ActiveMQ发布/订阅消息模式原理解析的基础上配置ActiveMQ连接池,只需要做两项修改--配置文件和添加连接池依赖. 修改application.properties配置文件 ## URL of the ActiveMQ broker. Auto-gene

  • 浅谈Spring Boot 整合ActiveMQ的过程

    RabbitMQ是比较常用的AMQP实现,这篇文章是一个简单的Spring boot整合RabbitMQ的教程. 安装ActiveMQ服务器,(也可以不安装,如果不安装,会使用内存mq) 构建Spring boot项目,增加依赖项,只需要添加这一项即可 <!-- 添加acitivemq依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring

  • Springboot整合activemq的方法步骤

    今天呢心血来潮,也有很多以前的学弟问到我关于消息队列的一些问题,有个刚入门,有的有问题都来问我,那么今天来说说如何快速入门mq. 一.首先说下什么是消息队列? 1.消息队列是在消息的传输过程中保存消息的容器. 二.为什么要用到消息队列? 主要原因是由于在高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达 MySQL ,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误.通过使用消息队列

  • springboot集成activemq的实例代码

    ActiveMQ ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线.ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位. 特性 多种语言和协议编写客户端.语言: Java,C,C++,C#,Ruby,Perl,Python,PHP.应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

  • SpringBoot整合ActiveMQ过程解析

    目录结构 引入 maven依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.4.RELEASE</version> <relativePath/> </parent> <properties> &l

  • Spring Boot ActiveMQ发布/订阅消息模式原理解析

    本文在<Spring Boot基于Active MQ实现整合JMS>的基础上,介绍如何使用ActiveMQ的发布/订阅消息模式.发布/订阅消息模式是消息发送者发送消息到主题(topic),而多个消息接收者监听这个主题:其中,消息发送者和接收者分别叫做发布者(publisher)和订阅者(subscriber),对于发布者来说,它和所有的订阅者就构成了一个1对多的关系.这种关系如下图所示: 发布/订阅模式的工作示意图 消息生产者将消息(发布)到topic中,可以同时有多个消息消费者(订阅)消费该

  • Spring Boot集成RabbitMQ以及队列模式操作

    目录 前言 一.场景描述 二.准备工作 三.发布/订阅模式(Fanout) 生产者 消费者 四.Work模式 4.1 轮询模式 生产者 消费者 4.2 公平分发 生产者 消费者 生产者 消费者 五.路由模式(Direct) 六.主题模式(Topic) 小结 前言 本篇博客将会通过我们的实际场景来演示如何在Spring Boot中集成RabbitMQ以及如何对各种队列模式进行操作. 一.场景描述 我们通过模仿用户下订单时,订单系统分别通过短信,邮件或微信进行推送消息,如下图: 二.准备工作 (1)

  • 把spring boot项目发布tomcat容器(包含发布到tomcat6的方法)

    spring boot因为内嵌tomcat容器,所以可以通过打包为jar包的方法将项目发布,但是如何将spring boot项目打包成可发布到tomcat中的war包项目呢? 1. 既然需要打包成war包项目,首先需要在pom.xml文件中修改打包类型,将spring boot默认的<packaging>jar</packaging>修改为<packaging>war</packaging>形式: 2. 其次spring boot的web项目中内嵌tomca

  • 深入研究spring boot集成kafka之spring-kafka底层原理

    目录 前言 简单集成 引入依赖 添加配置 测试发送和接收 Spring-kafka-test嵌入式KafkaServer 引入依赖 启动服务 创建新的Topic 程序启动时创建TOPIC 代码逻辑中创建 PS:其他的方式创建TOPIC 引入依赖 api方式创建 命令方式创建 消息发送之KafkaTemplate探秘 获取发送结果 异步获取 同步获取 KAFKA事务消息 REPLYINGKAFKATEMPLATE获得消息回复 Spring-kafka消息消费用法探秘 @KAFKALISTENER的

  • Spring Boot 整合RocketMq实现消息过滤功能

    目录 简介 根据TAG过滤消息 生产者 消费者 测试结果 根据SQL表达式过滤消息 生产者 消费者 启动程序报错The broker does not support consumer to filter message by SQL92 测试结果 总结 简介 消息过滤是指消费者一端在消费消息时,对消息进行选择性过滤,只消费符合过滤条件的消息. RocketMQ的消息过滤机制大致分为两种:标签过滤和类过滤.其中标签过滤又分为Tag过滤和SQL92过滤. 根据TAG过滤消息 消息发送端只能设置一个

  • spring boot整合RabbitMQ(Direct模式)

    springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp项目对消息各种支持. 1.新建一个Spring Boot工程,命名为:"rabbitmq-hello". 在pom.xml中引入如下依赖内容,其中spring-boot-starter-amqp用于支持RabbitMQ. <dependency> <groupId>org.springframework.boo

  • Spring boot部署发布到linux的操作方法

    限制条件: The default script supports most Linux distributions and is tested on CentOS and Ubuntu. Other platforms, such as OS X and FreeBSD, will require the use of a custom embeddedLaunchScript. 当前文档适用于 CentOS and Ubuntu,其他平台需要定制脚本 1.部署发布 1.1 打包 maven打

  • Spring Boot 项目发布到 Tomcat 服务器的操作步骤

    第 1 步:将这个 Spring Boot 项目的打包方式设置为 war. <packaging>war</packaging> SpringBoot 默认有内嵌的 tomcat 模块,因此,我们要把这一部分排除掉. 即:我们在 spring-boot-starter-web 里面排除了 spring-boot-starter-tomcat ,但是我们为了在本机测试方便,我们还要引入它,所以我们这样写: <dependency> <groupId>org.s

随机推荐