Springboot详解RocketMQ实现消息发送与接收流程

springboot+rockermq 实现简单的消息发送与接收

普通消息的发送方式有3种:单向发送、同步发送和异步发送。

下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收

  • 创建Springboot项目,添加rockermq 依赖
<!--rocketMq依赖-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.2.1</version>
</dependency>
  • 配置rocketmq

# 端口
server:
  port: 8083

# 配置 rocketmq
rocketmq:
  name-server: 127.0.0.1:9876
  #生产者
  producer:
    #生产者组名,规定在一个应用里面必须唯一
    group: group1
    #消息发送的超时时间 默认3000ms
    send-message-timeout: 3000
    #消息达到4096字节的时候,消息就会被压缩。默认 4096
    compress-message-body-threshold: 4096
    #最大的消息限制,默认为128K
    max-message-size: 4194304
    #同步消息发送失败重试次数
    retry-times-when-send-failed: 3
    #在内部发送失败时是否重试其他代理,这个参数在有多个broker时才生效
    retry-next-server: true
    #异步消息发送失败重试的次数
    retry-times-when-send-async-failed: 3

  • 新建一个 controller 来做消息发送:
package com.example.springbootrocketdemo.controller;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * 普通信息的三种方式:同步、异步、单向
 * @author qzz
 */
@RestController
public class RocketMQCOntroller {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    /**
     * 发送普通消息
     * convertAndSend(String destination, Object payload) 发送字符串比较方便
     */
    @RequestMapping("/send")
    public void send(){
        rocketMQTemplate.convertAndSend("test-topic","test-message");
    }
    /**
     * 发送同步消息
     */
    @RequestMapping("/testSyncSend")
    public void testSyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试");
        System.out.println(sendResult);
    }
    /**
     * 发送异步消息
     */
    @RequestMapping("/testASyncSend")
    public void testASyncSend(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        //参数三:回调
        rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送异常");
                throwable.printStackTrace();
            }
        });
    }
    /**
     * 发送单向消息
     */
    @RequestMapping("/testOneWay")
    public void testOneWay(){
        //参数一:topic   如果想添加tag,可以使用"topic:tag"的写法
        //参数二:消息内容
        rocketMQTemplate.sendOneWay("test-topic","单向消息测试");
    }
}

SpringBoot给我们提供了RocketMQTemplate模板类,我们利用这个类可以以多种形式发送消息。

发送方法指定Topic主题test-topic。

  • 新建消息消费者监听RocketMQConsumerListener,监听消息,消费消息
package com.example.springbootrocketdemo.config;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;
/**
 * 消费消息
 * 配置RocketMQ监听
 * @author qzz
 */
@Service
@RocketMQMessageListener(consumerGroup = "test",topic = "test-topic")
public class RocketMQConsumerListener implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费消息:"+s);
    }
}

消费者类要实现RocketMQListener接口,以及动态指定消息类型String。

类上要加上@RocketMQMessageListener注解,指定topic主题test-topic,以及消费者组test

简单的消息发送与接收搭建完毕!

  • 启动服务,测试消息消费

测试同步消息:

测试异步消息:

测试单向消息:

测试OK,成功消费!

到此这篇关于Springboot详解RocketMQ实现消息发送与接收流程的文章就介绍到这了,更多相关Springboot 消息发送与接收内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 解决springboot集成rocketmq关于tag的坑

    springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • SpringBoot中使用RocketMQ的示例代码

    目录 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 1.2 添加配置 1.3 编写测试代码 1.4 测试 2 用户微服务订阅消息 2.1 用户微服务增加rocketmq依赖 2.2 修改主类,启动nacos客户端 2.3 修改配置文件 2.4 编写消息接收服务 2.5 测试 接下来我们模拟一种场景:商品下单成功之后,向下单用户发送短信.以此来示例SpringBoot中RocketMQ的使用方式. 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 <!-

  • SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

    我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

  • RocketMQ整合SpringBoot实现生产级二次封装

    目录 前言说明 一.为什么要二次封装 1.1 二次封装不同观点 1.2 封装的抽离点 1.3 设计模式的应用 二.二次封装核心要点 2.1 二次封装核心点 2.1.1 封装主要讨论点 2.1.2 发送/消费的几种消息实体 2.2 RocketMQTemplate封装 2.2.1 封装基础实体类 2.2.2 RocketMQTemplate 3.2.3 增强RocketMQTemplate 2.3 RocketMQListener封装 2.4 广播消息的应用场景 2.3 代码封装完结测试 前言说明

  • Springboot 整合 RocketMQ 收发消息

    Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • 详解Android4.4 RIL短信接收流程分析

    最近有客户反馈Android接收不到短信,于是一头扎进RIL里面找原因.最后发现不是RIL的问题,而是BC72上报 短信的格式不对,AT+CNMA=1无作用等几个小问题导致的.尽管问题不在RIL,但总算把RIL短信接收流程搞清楚了. 接收到新信息的log: D/ATC ( 1269): AT< +CMT:,27 D/ATC ( 1268): AT< 0891683108705505F0040d91683117358313f500009101329154922307ea31da2c36a301

  • SpringBoot详解整合Spring Cache实现Redis缓存流程

    目录 1.简介 2.常用注解 2.1.@EnableCaching 2.2.@Cacheable 2.3.@CachePut 2.4.@CacheEvict 3.使用Redis当作缓存产品 3.1.坐标导入 3.2.yml配置 3.3.开启注解功能 3.4.使用@Cacheable 3.5.使用@CacheEvict 4.测试 1.简介 Spring Cache 是一个框架,实现了基于注解的缓存功能,只需要简单地加一个注解,就能实现缓存功能. Spring Cache 提供了一层抽象,底层可以切

  • springboot与vue详解实现短信发送流程

    目录 一.前期工作 1.开启邮箱服务 2.导入依赖 3.配置application.yaml文件 二.实现流程 1.导入数据库 2.后端实现 编写实体类 编写工具类ResultVo 编写dao层接口 配置dao层接口的数据库操作 编写service层接口 编写service层的实现方法 实现controller层 Test代码 前端页面的实现 运行截图+sql图 总结 一.前期工作 1.开启邮箱服务 开启邮箱的POP3/SMTP服务(这里以qq邮箱为例,网易等都是一样的) 2.导入依赖 在spr

  • 详解RocketMQ 消费端如何监听消息

    目录 前言 流程地图 源码跟踪 核心模块(消息拉取) 拉取流程 拉取消息处理 当pullStatus为FOUND,消息进行提交消费的请求 消息消费进度提交 总结 前言 上一篇文章中我们主要来看RocketMQ消息消费者是如何启动的, 那他有一个步骤是非常重要的,就是启动消息的监听,通过不断的拉取消息,来实现消息的监听,那具体怎么做,让我们我们跟着源码来学习一下~ 流程地图 源码跟踪 这一块的代码比较多,我自己对关键点的一些整理,这个图我画的不是很OK 核心模块(消息拉取) 入口:this.pul

  • SpringBoot详解如何整合Redis缓存验证码

    目录 1.简介 2.介绍 3.前期配置 3.1.坐标导入 3.2.配置文件 3.3.配置类 4.Java操作Redis 1.简介 Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache, and message broker. 翻译:Redis 是一个开源的内存中的数据结构存储系统,它可以用作:数据库.缓存和消息中间件. 官网链接:https://redis

  • SpringBoot详解如果通过@Value注解给静态变量注入值

    目录 前序 方案一 方案二 方案三 使用场景 总结 最近做项目的时候,给static变量赋值, 使用 @value注解 ,结果 获取一直为null , 1.spring不允许/不支持把值注入到静态变量中 2.Spring的@Value依赖注入是依赖set方法 3.set方法是普通的对象方法 4.static变量是类的属性,static没有set方法 前序 SpringBoot中使用@Value()只能给普通变量注入值,不能直接给静态变量赋值 例如,application-dev.properti

  • Springboot详解底层启动过程

    目录 SpringApplication构造分析 SpringApplication run分析 SpringApplication构造分析 1.记录 BeanDefinition 源 spring容器刚开始是空的,要去各个源找到beanDefinition,这些源可能是配置类,可能是xml文件.在构造方法里会获取一个主源,也就是引导类,根据引导类去获取beanDefinition. 2.推断应用类型 根据jar包去判断是什么引用类型 3.记录 ApplicationContext 初始化器 对

随机推荐