Springboot详解RocketMQ实现广播消息流程

RocketMQ消息模式主要有两种:广播模式、集群模式(负载均衡模式)

广播模式是每个消费者,都会消费消息;

负载均衡模式是每一个消费只会被某一个消费者消费一次;

我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示;

我们可以通过@RocketMQMessageListenermessageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡模式

下面来介绍下 springboot+rockermq 整合实现 广播消息

  • 创建Springboot项目,添加rockermq 依赖
<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • 配置rocketmq

# 端口
server:
  port: 8083

# 配置 rocketmq
rocketmq:
  name-server: 127.0.0.1:9876
  #生产者
  producer:
    #生产者组名,规定在一个应用里面必须唯一
    group: group1
    #消息发送的超时时间 默认3000ms
    send-message-timeout: 3000
    #消息达到4096字节的时候,消息就会被压缩。默认 4096
    compress-message-body-threshold: 4096
    #最大的消息限制,默认为128K
    max-message-size: 4194304
    #同步消息发送失败重试次数
    retry-times-when-send-failed: 3
    #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
    retry-next-server: true
    #异步消息发送失败重试的次数
    retry-times-when-send-async-failed: 3

  • 生产端:新建一个 controller 来做消息发送

生产端按正常发送逻辑发送消息即可

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 广播消息
 * @author qzz
 */
@RestController
public class RocketMQBroadCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送广播消息
     */
    @RequestMapping("/testBroadSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        for(int i=0;i<10;i++){
            rocketMQTemplate.convertAndSend("test-topic-broad","test-message"+i);
        }
    }
}
  • 创建两个消费者来消费消息

我们先集群负载均衡测试,加上messageModel=MessageModel.CLUSTERING

消费者1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者1,消费消息:"+s);
    }
}

消费者2: 与消费者1在 同一个consumerGroup 和 topic

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.CLUSTERING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("集群模式 消费者2,消费消息:"+s);
    }
}
  • 启动服务,测试 集群模式消费

集群模式测试: 两个消费者平摊 消息

  • 把上面两个消费者的 messageModel 属性值修改成 广播模式

消费者1:

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息1 广播模式,消费消息:"+s);
    }
}

消费者2: 与消费者1在 同一个consumerGroup 和 topic

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 广播消息
 * 配置RocketMQ监听
 * MessageModel.CLUSTERING:集群模式
 * MessageModel.BROADCASTING:广播模式
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test-broad",topic = "test-topic-broad",messageModel = MessageModel.BROADCASTING)
public class RocketMQBroadConsumerListener2 implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("广播消息2 广播模式,消费消息:"+s);
    }
}
  • 重启服务,测试 广播模式消费

广播模式消费下,两个消费者都消费到Topic的所有消息。

测试成功!

到此这篇关于Springboot详解RocketMQ实现广播消息流程的文章就介绍到这了,更多相关Springboot广播消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot整合rocketmq实现分布式事务

    1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

  • springBoot整合RocketMQ及坑的示例代码

    版本: JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0 pom 配置: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <d

  • Springboot RocketMq实现过程详解

    首先,在虚拟机上安装rocketmq和rocketMq可视化控制,安装不做描述. 1.pom.xml文件添加依赖 mq的版本与连接的rocketmq版本保持一致 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.4.0</version> </depende

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • 浅谈Springboot整合RocketMQ使用心得

    一.阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh 按照官网步骤,创建Topic.申请发布(生产者).申请订阅(消费者) 二.代码 1.配置: public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test"

  • SpringBoot中使用RocketMQ的示例代码

    目录 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 1.2 添加配置 1.3 编写测试代码 1.4 测试 2 用户微服务订阅消息 2.1 用户微服务增加rocketmq依赖 2.2 修改主类,启动nacos客户端 2.3 修改配置文件 2.4 编写消息接收服务 2.5 测试 接下来我们模拟一种场景:商品下单成功之后,向下单用户发送短信.以此来示例SpringBoot中RocketMQ的使用方式. 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 <!-

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • RocketMQ整合SpringBoot实现生产级二次封装

    目录 前言说明 一.为什么要二次封装 1.1 二次封装不同观点 1.2 封装的抽离点 1.3 设计模式的应用 二.二次封装核心要点 2.1 二次封装核心点 2.1.1 封装主要讨论点 2.1.2 发送/消费的几种消息实体 2.2 RocketMQTemplate封装 2.2.1 封装基础实体类 2.2.2 RocketMQTemplate 3.2.3 增强RocketMQTemplate 2.3 RocketMQListener封装 2.4 广播消息的应用场景 2.3 代码封装完结测试 前言说明

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • Springboot详解实现食品仓库管理系统流程

    目录 一,项目简介 二,环境介绍 三,系统展示 3.1 系统功能模块设计 3.1.1 登录模块 3.1.2 客户管理模块 3.1.3 供应商管理功能 3.1.4 商品管理模块 3.1.5 商品进货管理模块 3.1.6 商品退货信息查询模块 四,核心代码展示 五,项目总结 一,项目简介 经过调查研究进行开发设计的这款仓库管理系统,主要是为商家提供商品货物进销存的信息化管理,以便让商家在竞争如此激烈的今天占据一定的优势和商机,通过信息化技术手段的应用来减少库存,提升周转率,降低成本,提高盈利. 仓库

  • 详解RocketMQ 消费端如何监听消息

    目录 前言 流程地图 源码跟踪 核心模块(消息拉取) 拉取流程 拉取消息处理 当pullStatus为FOUND,消息进行提交消费的请求 消息消费进度提交 总结 前言 上一篇文章中我们主要来看RocketMQ消息消费者是如何启动的, 那他有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,那具体怎么做,让我们我们跟着源码来学习一下~ 流程地图 源码跟踪 这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK 核心模块(消息拉取) 入口:this.pul

  • springboot与vue详解实现短信发送流程

    目录 一.前期工作 1.开启邮箱服务 2.导入依赖 3.配置application.yaml文件 二.实现流程 1.导入数据库 2.后端实现 编写实体类 编写工具类ResultVo 编写dao层接口 配置dao层接口的数据库操作 编写service层接口 编写service层的实现方法 实现controller层 Test代码 前端页面的实现 运行截图+sql图 总结 一.前期工作 1.开启邮箱服务 开启邮箱的POP3/SMTP服务(这里以qq邮箱为例,网易等都是一样的) 2.导入依赖 在spr

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • springBoot详解集成Swagger流程

    目录 Swagger简介 SpringBoot集成Swagger 配置Swagger Swagger配置扫描接口 配置是否启动Swagger 配置API文档的分组 实体类配置 常用的注解 小结 目标: 了解Swagger的作用和概念 了解前后端分离 在springBoot中集成Swagger Swagger简介 前后端分离 VUE+springBoot 后端 :后端控制层.服务层.数据访问层 前端 :前端控制层.视图层 前后端通过API进行交互 前后端相对独立,松耦合 可以部署在不同的服务器上

  • 详解Android Activity的启动流程

    前言 activity启动的流程分为两部分:一是在activity中通过startActivity(Intent intent)方法启动一个Activity:二是我们在桌面通过点击应用图标启动一个App然后显示Activity:第二种方式相较于第一种方式更加全面,所以本文会以第二种流程来分析. 简要 我们手机的桌面是一个叫做Launcher的Activity,它罗列了手机中的应用图标,图标中包含安装apk时解析的应用默认启动页等信息.在点击应用图标时,即将要启动的App和Launcher.AMS

  • Java SpringBoot详解集成以及配置Swagger流程

    一.swagge简介 前后端分离: 后端︰后端控制层,服务层,数据访问层[后端团队] 前端:前端控制层,视图层[前端团队] 前后端通过API进行交互 前后端相对独立且松耦合 产生问题:前后端集成,前端或者后端无法做到"及时协商,尽早解决",最终导致问题集中爆发 解决方法:首先定义schema [ 计划的提纲 ],并实时跟踪最新的API,降低集成风险 前后端分离: 前端测试后端接口:postman 后端提供接口,需要实时更新最新的消息及改动! Swagger 号称世界上最流行的API框架

  • SpringBoot详解如果通过@Value注解给静态变量注入值

    目录 前序 方案一 方案二 方案三 使用场景 总结 最近做项目的时候,给static变量赋值, 使用 @value注解 ,结果 获取一直为null , 1.spring不允许/不支持把值注入到静态变量中 2.Spring的@Value依赖注入是依赖set方法 3.set方法是普通的对象方法 4.static变量是类的属性,static没有set方法 前序 SpringBoot中使用@Value()只能给普通变量注入值,不能直接给静态变量赋值 例如,application-dev.properti

随机推荐