Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

目录
  • 一、创建Springboot项目添加rockermq依赖
  • 二、配置rocketmq
  • 三、新建一个controller来做消息发送
  • 四、创建消费端监听消息消费消息
  • 五、启动服务测试顺序消息发送与消费

如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的;有時候,我们需要顺序消费一批消息,比如电商系统 订单创建、支付、完成操作,需要順序执行;

RocketMQTemplate给我们提供了SendOrderly方法(有多個重载),来实现发送顺序消息;包括以下:

syncSendOrderly,发送同步顺序消息;

asyncSendOrderly,发送异步顺序消息;

sendOneWayOrderly,发送单向顺序消息;

一般我们用syncSendOrderly方法发送同步顺序消息。

参数一:topic 如果想添加tag,可以使用"topic:tag"的写法

参数二:消息内容

参数三:hashKey 使用此参数选择队列。 例如:orderId,productId…

因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,产品ID作为参数值;发送到一个队列,这样方便搞顺序队列;以及消费端接收的时候,默认是并发多线程去接收消息。

RocketMQMessageListener有个属性consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,单线程顺序接收消息;

下面来介绍下 springboot+rockermq 整合实现 顺序消息的发送与消费

一、创建Springboot项目添加rockermq依赖

<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>

二、配置rocketmq

# 端口
server:
  port: 8083

# 配置 rocketmq
rocketmq:
  name-server: 127.0.0.1:9876
  #生产者
  producer:
    #生产者组名,规定在一个应用里面必须唯一
    group: group1
    #消息发送的超时时间 默认3000ms
    send-message-timeout: 3000
    #消息达到4096字节的时候,消息就会被压缩。默认 4096
    compress-message-body-threshold: 4096
    #最大的消息限制,默认为128K
    max-message-size: 4194304
    #同步消息发送失败重试次数
    retry-times-when-send-failed: 3
    #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
    retry-next-server: true
    #异步消息发送失败重试的次数
    retry-times-when-send-async-failed: 3

三、新建一个controller来做消息发送

package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 模拟两个订单发送消息
 *
 * 顺序信息的三种方式:同步、异步、单向
 * syncSendOrderly,发送同步顺序消息;
 * asyncSendOrderly,发送异步顺序消息;
 * sendOneWayOrderly,发送单向顺序消息;
 * 一般我们用第一种发送同步顺序消息;
 * @author qzz
 */
@RestController
public class RocketMQOrderCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送同步顺序消息
     */
    @RequestMapping("/testSyncOrderSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        //参数三:hashKey 用来计算决定消息发送到哪个消息队列, 一般是订单ID,产品ID等
        rocketMQTemplate.syncSendOrderly("test-topic-orderly","111111创建","111111");
        rocketMQTemplate.syncSendOrderly("test-topic-orderly","111111支付","111111");
        rocketMQTemplate.syncSendOrderly("test-topic-orderly","111111完成","111111");
        rocketMQTemplate.syncSendOrderly("test-topic-orderly","222222创建","222222");
        rocketMQTemplate.syncSendOrderly("test-topic-orderly","222222支付","222222");
        rocketMQTemplate.syncSendOrderly("test-topic-orderly","222222完成","222222");
    }
}

四、创建消费端监听消息消费消息

package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 消费顺序消息
 * 配置RocketMQ监听
 *
 * ConsumeMode.ORDERLY:顺序消费
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test",topic = "test-topic-orderly",consumeMode = ConsumeMode.ORDERLY)
public class RocketMQCommonConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("consumer 顺序消费,收到消息:"+s);
    }
}

五、启动服务测试顺序消息发送与消费

测试成功!

到此这篇关于Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程的文章就介绍到这了,更多相关Springboot顺序消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 解决SpringBoot整合RocketMQ遇到的坑

    应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

  • RocketMQ整合SpringBoot实现生产级二次封装

    目录 前言说明 一.为什么要二次封装 1.1 二次封装不同观点 1.2 封装的抽离点 1.3 设计模式的应用 二.二次封装核心要点 2.1 二次封装核心点 2.1.1 封装主要讨论点 2.1.2 发送/消费的几种消息实体 2.2 RocketMQTemplate封装 2.2.1 封装基础实体类 2.2.2 RocketMQTemplate 3.2.3 增强RocketMQTemplate 2.3 RocketMQListener封装 2.4 广播消息的应用场景 2.3 代码封装完结测试 前言说明

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • 解决springboot集成rocketmq关于tag的坑

    springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • SpringBoot中使用RocketMQ的示例代码

    目录 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 1.2 添加配置 1.3 编写测试代码 1.4 测试 2 用户微服务订阅消息 2.1 用户微服务增加rocketmq依赖 2.2 修改主类,启动nacos客户端 2.3 修改配置文件 2.4 编写消息接收服务 2.5 测试 接下来我们模拟一种场景:商品下单成功之后,向下单用户发送短信.以此来示例SpringBoot中RocketMQ的使用方式. 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 <!-

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • SpringBoot详细讲解多个配置文件的配置流程

    目录 配置文件加载顺序 验证 前期准备 验证配置文件加载顺序 验证属性互补 总结 一般情况下,springboot默认会在resource目录下生成一个配置文件(application.properties或application.yaml),但其实springboot允许配置多个配置文件(application.properties或application.yaml),但是这并不意味着这些配置文件一定会替换默认生成的配置文件,它们是互补的存在.如果在某些场景下需要把配置文件单独拿出来并且启动的

  • SpringBoot详细讲解日志文件

    目录 1 日志的功能是什么? 2 如何自定义日志打印 2.1 在程序中获取日志对象 2.2 调用日志对象打印日志 2.3 查看日志打印的结果 3 日志的级别 3.1 日志级别的分类 3.2 日志级别的设置 4 日志持久化 5 更简单的日志输出 5.1 添加 lombok 依赖 5.2 输出日志 5.3 lombok 更多的注解 1 日志的功能是什么? 如果程序报错了, 却不能从控制台查看日志, 那么就不知道错误的原因了. 日志的功能 : 快速的排查和定位问题 记录用户登录的日志 记录系统的操作日

  • SpringBoot详细讲解视图整合引擎thymeleaf

    目录 1. 支持的视图技术 2. Thymeleaf 2.1 Thymeleaf语法 2.2 标准表达式 1. 变量表达式 ${…} 2. 选择变量表达式 *{…} 3. 消息表达式 #{…} 4. 链接表达式 @{…} 5. 片段表达式 ~{…} 3. 基本使用 3.1 Thymeleaf模板基本配置 3.2 静态资源的访问 3.3 完成数据的页面展示 1. 创建Spring Boot项目 2. 编写配置文件 3. 创建web控制类 4. 创建模板页面并引入静态资源文件 5.效果测试 1. 支

  • SpringBoot详细讲解通过自定义classloader加密保护class文件

    目录 背景 maven插件加密 注意事项 自定义classloader 隐藏classloader 被保护class手动加壳 总结 背景 最近针对公司框架进行关键业务代码进行加密处理,防止通过jd-gui等反编译工具能够轻松还原工程代码,相关混淆方案配置使用比较复杂且针对springboot项目问题较多,所以针对class文件加密再通过自定义的classloder进行解密加载,此方案并不是绝对安全,只是加大反编译的困难程度,防君子不防小人,整体加密保护流程图如下图所示 maven插件加密 使用自

  • SpringBoot详细讲解异步任务如何获取HttpServletRequest

    目录 原因分析 解决方案 前置条件 pom配置 requrest共享 自定义request过滤器 自定义任务执行器 调用示例 原因分析 @Anysc注解会开启一个新的线程,主线程的Request和子线程是不共享的,所以获取为null 在使用springboot的自定带的线程共享后,代码如下,Request不为null,但是偶发的其中body/head/urlparam内容出现获取不到的情况,是因为异步任务在未执行完毕的情况下,主线程已经返回,拷贝共享的Request对象数据被清空 Servlet

  • springBoot详细讲解使用mybaties案例

    首先创建springBoot项目,jdk选择1.8 然后倒入mybaties的相关依赖 我们用的springBoot,当然spring全家桶里面含有mybaties,所以我们直接使用升级版的mybaties-plus. 引入这3个 lombok省的我每次创建对象,都需要get.set方法,以及toString (IDEA里面也要安装lombok插件->file->setting->plugin->搜索lombok安装,完后重启idea,这样lombok在idea中不报错) myba

  • SpringBoot详细讲解如何创建及刷新Spring容器bean

    目录 一.前期准备 1.1 创建工程 1.2 创建Controller 二.探究过程 2.1 启动类 2.2 SpringApplication 2.3 ApplicationContextFactory 2.4 SpringApplication 2.5 结论 参考视频:https://www.bilibili.com/video/BV1Bq4y1Q7GZ?p=6 通过视频的学习和自身的理解整理出的笔记. 一.前期准备 1.1 创建工程 创建springboot项目,springboot版本为

  • Java超详细讲解ArrayList与顺序表的用法

    目录 简要介绍 Arraylist容器类的使用 Arraylist容器类的构造 ArrayList的常见方法 ArrayList的遍历 ArrayList中的扩容机制 简要介绍 顺序表是一段物理地址连续的储存空间,一般情况下用数组储存,并在数组上完成增删查改.而在java中我们有ArrayList这个容器类封装了顺序表的方法. 在集合框架中,ArrayList是一个普通的类,其实现了list接口.其源码类定义如图 可见,其实现了RandomAccess, Cloneable, 以及Seriali

随机推荐