图文并茂讲解RocketMQ消息类别

目录
  • 1、同步消息
  • 2、异步消息
  • 3、单向消息

1、同步消息

即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8"));
            //同步消息发送
            SendResult result = producer.send(msg);
            System.out.println("返回结果:"+result);
        }
        producer.shutdown();
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.23.127:9876");
        consumer.subscribe("topic2","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

2、异步消息

即时性较弱,但需要有回执的消息,例如订单中的某些信息

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            //异步消息发送
            Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8"));
            producer.send(msg, new SendCallback() {
                //表示成功返回结果
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                //表示发送消息失败
                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception{
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        consumer.setNamesrvAddr("192.168.23.127:9876");
        consumer.subscribe("topic2","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

3、单向消息

不需要有回执的消息,例如日志类消息

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        producer.setNamesrvAddr("192.168.23.127:9876");
        producer.start();
        for (int i = 1; i <= 5; i++) {
            //单向消息
            Message msg = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8"));
            producer.sendOneway(msg);
        }
        //添加一个休眠操作,确保异步消息返回后能够输出
        TimeUnit.SECONDS.sleep(10);
        producer.shutdown();
    }
}

消费者代码同上

测试:

总结 同步消息

SendResult result = producer.send(msg);

异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)

		producer.send(msg, new SendCallback() {
                //表示成功返回结果
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println(sendResult);
                }
                //表示发送消息失败
                @Override
                public void onException(Throwable throwable) {
                    System.out.println(throwable);
                }
            });

单向消息

producer.sendOneway(msg);

到此这篇关于图文并茂讲解RocketMQ消息类别的文章就介绍到这了,更多相关RocketMQ消息类别内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 一文彻底掌握RocketMQ 的存储模型

    目录 RocketMQ简介 1 整体概览 2 数据文件 3 消费文件 4 索引文件 5 写到最后 RocketMQ简介 RocketMQ有Producer.Consumer.NameSrv.Broker四个部分.其中Broker用于存储消息,维护消息队列和订阅关系,是RocketMQ四个部分中最重要的一个部分,并且RocketMQ的高性能就是依赖于Broker模块的底层存储模型实现的.所以搞清楚Broker的存储模型是学习RocketMQ最重要的一步. RocketMQ 优异的性能表现,必然绕不

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • RocketMQ事务消息图文示例讲解

    RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息 MQ 的事务流程(本地代码正常执行) MQ 的消息补偿过程(当本地代码执行失败时) MQ 消息的三种状态 提交状态:允许进入队列,此消息与非事务消息无区别 回滚状态:不允许进入队列,此消息等同于未发送过 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态) 注意:事务消息仅与生产者有关,与消费者无关 生产者代码(提交状态.回滚状态): public class Producer { public s

  • RocketMq深入分析讲解两种削峰方式

    目录 何时需要削峰 通过消息队列的削峰方法有两种 消费延时控流 总结 何时需要削峰 当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求 通过消息队列的削峰方法有两种 控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度 通过消费者参数控制消费速度 先分析那些参数对控制消费速度有作用 1.PullInterval: 设置消费端,拉取mq消息的间隔时间. 注意:该时间算起时间是rocketMq消费者从broker消息后算起.经过PullInt

  • 图文并茂讲解RocketMQ消息类别

    目录 1.同步消息 2.异步消息 3.单向消息 1.同步消息 即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功) 生产者: public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("19

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • RocketMQ消息丢失场景以及解决方法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图: 上图中大致包含了这么几种场景: 生产者产生消息发送给RocketMQRocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束 这三种场景都可能会产生消息的丢失,如下图所示: 场景1中生产者将消息发送

  • Java 图文并茂讲解两种找二叉树的最近公共祖先的方法

    目录 思路一:先假设这棵树是二叉搜索树 思路二:假设该树是用孩子双亲表示法 思路一:先假设这棵树是二叉搜索树 首先我们补充说明一下什么是二叉搜索树: 在二叉搜索树中,对于每一个节点来说,他的左子树中的值都比他小,右子树的中的值都比他大.所以二叉搜索树的中序遍历是一组有序的数据. 对于上述这棵树,假设要求 p q 的最近公共祖先. 那么它有以下情况: 对于普通的二叉树来说,也无非就这几种情况:pq都在左,pq都在右,pq一左一右,pq有一个是根节点. 所以分别递归的去左子树和右子树中找 p q 节

  • C语言图文并茂讲解分支语句用法

    目录 一.if 语句分析 二.switch 语句分析 三.小结 一.if 语句分析 if 语句用于根据条件选择执行语句 else 不能独立存在且总是与它最近的 if 相匹配 else 语句后可以接连其他 if 语句 if 语句中零值比较的注意点 bool 型变量应该直接出现于条件中,不要进行比较 变量和 0 值比较时,0 值应该出现在比较符号左边(这条规则可以拓展为任意字面量与变量比较时,字面量应该放在左边,变量放在右边,这样即使手误写成了 = ,编译器也能发现) float 型变量不能直接进行

  • C++图文并茂讲解类型转换函数

    目录 一.类型转换函数(上) 1.再论类型转换 2.问题 3.再论构造函数 4.另一个视角 5.编译器的行为 6.小结(上) 二.类型转换函数(下) 1.类型转换 2.编译器的行为 3.注意事项 4.小结(下) 一.类型转换函数(上) 1.再论类型转换 标准数据类型之间会进行隐式的类型安全转换 转换规则如下: 下面来看一个有趣的隐式类型转换: #include <iostream> #include <string> using namespace std; int main()

  • C++图文并茂讲解继承

    目录 一.生活中的例子 二.惊艳的继承 三.继承的意义 四.小结 一.生活中的例子 组合关系∶整体与部分的关系 下面看一个组合关系的描述代码: #include <iostream> #include <string> using namespace std; class Memory { public: Memory() { cout << "Memory()" << endl; } ~Memory() { cout <<

  • SpringBoot图文并茂讲解依赖管理的特性

    目录 1.父依赖parent介绍 2.修改默认版本号 3.starter场景启动器 1.父依赖parent介绍 pom文件中含有父依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.4.RELEASE</version> </pa

  • SpringBoot图文并茂讲解登录拦截器

    目录 1.相关概念 1.实现效果 2.实现步骤 2.代码实现 1.配置文件 2.java代码 3.前端代码 3.运行测试 1.相关概念 1.实现效果 当没有输入正确的账号密码登录成功时, 除了登录页,其他页面都无法访问(静态资源要放行) 2.实现步骤 编写一个拦截器实现HandlerInterceptor接口 拦截器注册到容器中(实现WebMvcConfigurer的addInterceptors()) 指定拦截规则(注意,如果是拦截所有,静态资源也会被拦截) 2.代码实现 1.配置文件 pom

  • MyBatis图文并茂讲解注解开发一对一查询

    目录 MyBatis的注解实现复杂映射开发 一对一查询 一对一查询的模型 一对一查询的语句 创建PersonMapper接口 使用注解配置Mapper 测试类 一对一配置总结 MyBatis的注解实现复杂映射开发 实现复杂关系映射之前我们可以在映射文件中通过配置来实现,使用注解开发后,我们可以使用@Results注解,@Result注解,@One注解,@Many注解组合完成复杂关系的配置 一对一查询 一对一查询的模型 一对一查询的需求:查询一个用户信息,与此同时查询出该用户对应的身份证信息 一对

随机推荐