RocketMQ生产消息与消费消息超详细讲解

目录
  • 1 RocketMQ简介
  • 2 MQ的常见产品
  • 3 环境搭建
  • 4 单生产者单消费者模式
  • 5 单生产者多消费者模式
    • 5.1默认模式(负载均衡)
    • 5.2广播模式
  • 6 多生产者多消费者模式

1 RocketMQ简介

RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目。并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的双十一,RocketMQ流转的消息量达到了万亿级,峰值TPS达到5600万)

2 MQ的常见产品

ActiveMQ:java语言实现,万级数据吞吐量,处理速度ms级,主从架构,成熟度高

RabbitMQ :erlang语言实现,万级数据吞吐量,处理速度us级,主从架构,

RocketMQ :java语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能强,扩展性强

kafka :scala语言实现,十万级数据吞吐量,处理速度ms级,分布式架构,功能较少,应用于大数据较多

3 环境搭建

创建maven工程

引入依赖:

 <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.5.2</version>
</dependency>

4 单生产者单消费者模式

生产者:

//生产者,产生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3启动发送的服务
        producer.start();
        //4.1创建要发送的消息对象,指定topic,指定内容body
        Message msg=new Message("topic1","hello rocketmq".getBytes("UTF-8"));
        //4.2发送消息
        SendResult result = producer.send(msg);
        System.out.println("返回结果:"+result);
        //5.关闭连接
        producer.shutdown();
    }
}

消费者:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

5 单生产者多消费者模式

5.1默认模式(负载均衡)

生产者:

//生产者,产生消息
public class Producer {
    public static void main(String[] args) throws Exception{
        //1.创建一个发送消息的对象Producer
        DefaultMQProducer producer=new DefaultMQProducer("group1");
        //2.设定发送的命名服务器地址
        producer.setNamesrvAddr("192.168.23.127:9876");
        //3启动发送的服务
        producer.start();
        for (int i = 1; i <= 10; i++) {
            Message msg = new Message("topic1",("生产者2: hello rocketmq "+i).getBytes("UTF-8"));
            SendResult result = producer.send(msg);
            System.out.println("返回结果:"+result);
        }
        //5.关闭连接
        producer.shutdown();
    }
}

消费者:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

5.2广播模式

生产者的代码不变,消费者的代码改动如下:

		//设置当前消费者的消费模式(默认模式:负载均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的
        consumer.setMessageModel(MessageModel.BROADCASTING);

具体消费者代码:

//消费者,消费消息
public class Consumer {
    public static void main(String[] args) throws Exception{
        //1.创建一个接收消息的对象Consumer
        DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("192.168.23.127:9876");
        //3.设置接收消息对应的topic,对应的sub标签为任意*
        consumer.subscribe("topic1","*");
        //设置当前消费者的消费模式(默认模式:负载均衡)
        //consumer.setMessageModel(MessageModel.CLUSTERING);
        //设置当前消费者的消费模式为广播模式:所有客户端接收的消息是一样的
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //4.开启监听,用于接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : list) {
                    //System.out.println("收到消息:"+msg);
                    System.out.println("消息:"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者
            }
        });
        //5.启动接收消息的服务
        consumer.start();// 开启多线程 监控消息,持续运行
        System.out.println("接收消息服务已运行");
    }
}

测试:

广播模式的现象

如果 生产者先发送消息, 后启动消费者, 消息只能被消费一次

如果多个消费者先启动(广播模式),后发消息,才有广播的效果

结论: 必须先启动消费者再启动发送者才有广播的效果

6 多生产者多消费者模式

多生产者产生的消息可以被同一个消费者消费,也可以被多个消费者消费

运行多个生产者,在启动消费者

测试:

到此这篇关于RocketMQ生产消息与消费消息超详细讲解的文章就介绍到这了,更多相关RocketMQ生产消息与消费消息内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ事务消息图文示例讲解

    RocketMQ 也允许我们像mysql 一样发送具有事务特征的消息 MQ 的事务流程(本地代码正常执行) MQ 的消息补偿过程(当本地代码执行失败时) MQ 消息的三种状态 提交状态:允许进入队列,此消息与非事务消息无区别 回滚状态:不允许进入队列,此消息等同于未发送过 中间状态:完成了 half 消息的发送,未对 MQ 进行二次状态确认(未知状态) 注意:事务消息仅与生产者有关,与消费者无关 生产者代码(提交状态.回滚状态): public class Producer { public s

  • RocketMq深入分析讲解两种削峰方式

    目录 何时需要削峰 通过消息队列的削峰方法有两种 消费延时控流 总结 何时需要削峰 当上游调用下游服务速率高于下游服务接口QPS时,那么如果不对调用速率进行控制,那么会发生很多失败请求 通过消息队列的削峰方法有两种 控制消费者消费速率和生产者投放延时消息,本质都是控制消费速度 通过消费者参数控制消费速度 先分析那些参数对控制消费速度有作用 1.PullInterval: 设置消费端,拉取mq消息的间隔时间. 注意:该时间算起时间是rocketMq消费者从broker消息后算起.经过PullInt

  • 图文并茂讲解RocketMQ消息类别

    目录 1.同步消息 2.异步消息 3.单向消息 1.同步消息 即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功) 生产者: public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("19

  • 一文彻底掌握RocketMQ 的存储模型

    目录 RocketMQ简介 1 整体概览 2 数据文件 3 消费文件 4 索引文件 5 写到最后 RocketMQ简介 RocketMQ有Producer.Consumer.NameSrv.Broker四个部分.其中Broker用于存储消息,维护消息队列和订阅关系,是RocketMQ四个部分中最重要的一个部分,并且RocketMQ的高性能就是依赖于Broker模块的底层存储模型实现的.所以搞清楚Broker的存储模型是学习RocketMQ最重要的一步. RocketMQ 优异的性能表现,必然绕不

  • RocketMQ生产消息与消费消息超详细讲解

    目录 1 RocketMQ简介 2 MQ的常见产品 3 环境搭建 4 单生产者单消费者模式 5 单生产者多消费者模式 5.1默认模式(负载均衡) 5.2广播模式 6 多生产者多消费者模式 1 RocketMQ简介 RocketMQ是阿里开源的一款非常优秀中间件产品,脱胎于阿里的另一款队列技术MetaQ,后捐赠给Apache基金会作为一款孵化技术,仅仅经历了一年多的时间就成为Apache基金会的顶级项目.并且它现在已经在阿里内部被广泛的应用,并且经受住了多次双十一的这种极致场景的压力(2017年的

  • RocketMQ延迟消息超详细讲解

    目录 一.什么是延时消息 二.延时消息等级 三.延时消息使用场景 四.延时消息示例 五.延时消息实现原理 一.什么是延时消息 当消息写入到Broker后,不能立刻被消费者消费,需要等待指定的时长后才可被消费处理的消息,称为延时消息. 二.延时消息等级 RocketMQ延时消息的延迟时长不支持随意时长的延迟,是通过特定的延迟等级来指定的.默认支持18个等级的延迟消息,延时等级定义在RocketMQ服务端的MessageStoreConfig类中的如下变量中: // MessageStoreConf

  • 超详细讲解Java异常

    目录 一.Java异常架构与异常关键字 Java异常简介 Java异常架构 1.Throwable 2.Error(错误) 3.Exception(异常) 4.受检异常与非受检异常 Java异常关键字 二.Java异常处理 声明异常 抛出异常 捕获异常 如何选择异常类型 常见异常处理方式 1.直接抛出异常 2.封装异常再抛出 3.捕获异常 4.自定义异常 5.try-catch-finally 6.try-with-resource 三.Java异常常见面试题 1.Error 和 Excepti

  • Java 超详细讲解设计模式之中的抽象工厂模式

    目录 抽象工厂模式 1.什么是抽象工厂 2.抽象工厂模式的优缺点 3.抽象工厂模式的结构与实现 4.抽象工厂方法模式代码实现 5.抽象工厂模式的应用场景 6.抽象工厂模式的扩展 抽象工厂模式 前面文章介绍的工厂方法模式中考虑的是一类产品的生产,比如案例中的百事可乐工厂只能生产百事可乐,可口可乐工厂只能生产可口可乐,也就是说:工厂方法模式只考虑生产同等级的产品. 1.什么是抽象工厂 在现实生活中许多工厂是综合型的工厂,能生产多种类)的产品,就拿案例里面的可乐来说,在节日的时候可能会有圣诞版的可乐,

  • Java 超详细讲解核心类Spring JdbcTemplate

    目录 JdbcTemplate概述 JdbcTemplate开发步骤 JdbcTemplate快速入门 Spring产生JdbcTemplate对象 JdbcTemplate的常用操作 修改操作 删除和查询全部操作 查询单个数据操作 本章小结 JdbcTemplate概述 它是spring框架中提供的一个对象,是对原始繁琐的Jdbc API对象的简单封装.spring框架为我们提供了很多的操作 模板类.例如:操作关系型数据的JdbcTemplate和HibernateTemplate,操作nos

  • Java SpringMVC数据响应超详细讲解

    目录 1)页面跳转   2)回写数据 3)配置注解驱动 4)知识要点 1)页面跳转   直接返回字符串:此种方式会将返回的字符串与视图解析器的前后缀拼接后跳转.  返回带有前缀的字符串: 转发: forward:/WEB-INF/views/index.jsp 重定向: redirect:/index.jsp 通过ModelAndView对象返回 @RequestMapping("/quick2") public ModelAndView quickMethod2(){ ModelAn

  • Java超详细讲解WebMvcConfigurer拦截器

    目录 addInterceptors拦截器 addViewControllers页面跳转 addResourceHandlers静态资源 configureViewResolvers视图解析器 addCorsMappings跨域 configureMessageConverters信息转换器 WebMvcConfigurer接口常用的方法: /* 拦截器配置 */ void addInterceptors(InterceptorRegistry var1); /* 视图跳转控制器 */ void

  • php超详细讲解命名管道

    目录 进程间为什么要通信 进程如何实现通信 常见进程通信方式 管道概念 命名管道实现 posix_mkfifo函数 无血缘进程间通信 进程间为什么要通信 进程间通信的目的: 数据传输:一个 进程需要将它的数据 发送给另一个进程. 通知事件:一个进程需要向另一个或一组进程 发送消息,通知它(它们)发生了 某种事件(如进程终止时要通知父进程). 资源共享:多个进程之间 共享同样的资源 .为了做到这一点,需要内核提供互斥和同步机制. 进程控制:有些进程 希望完全控制另一个进程的执行 (如 Debug

  • Netty与NIO超详细讲解

    目录 Linux下的五种I/O模型 阻塞IO的流程 IO复用 信号驱动I/O 异步IO NIO I0多路复用 NIO核心组件 使用Java原生API实现NIO操作 Redis为什么支持高并发 Linux下的五种I/O模型 1)阻塞I/O(blocking I/O) 2)非阻塞I/O (nonblocking I/O) 3) I/O复用(select 和poll) (I/O multiplexing) 4)信号驱动I/O (signal driven I/O (SIGIO)) 5)异步I/O (a

  • Java TCP协议通信超详细讲解

    目录 什么是tcp 服务端 客户端 服务端与客户端代码实现实例 什么是tcp Tcp通信有两个特点分别是面向连接,具有可靠性. 面向连接:指的是客户端与服务端之间的连接,在通信之前会有三次握手的机制来确保连接的可靠性. 可靠性:tcp在确保他的可靠性上做了许多的功夫,这个可靠性体现在下面两方面: tcp有状态:tcp会精确的纪录哪些数据是发送了的,哪些是没有被发送的,他保证数据包是按序到达的,不允许存在半点差错 tcp是可以控制的:如果存在丢包或者网络不好的时候,会根据具体情况对数据包进行发送速

随机推荐