RocketMQ普通消息实战演练详解

目录
  • 引言
  • 普通消息同步发送
  • 普通消息异步发送
  • 普通消息单向发送
  • 集群消费模式
  • 广播消费模式

引言

之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV。

相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm

普通消息同步发送

同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息。这个方式可以确保消息发送到Broker成功,一些重要的消息可以使用此方式,比如重要的通知。

public static void main(String[] args) throws Exception {
    //实例化消息生产者对象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //设置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //启动Producer实例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
        //同步发送方式
        SendResult send = producer.send(msg);
        //确认返回
        System.out.println(send);
    }
    //关闭producer
    producer.shutdown();
}

普通消息异步发送

异步消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理。

public static void main(String[] args) throws Exception {
    //实例化消息生产者对象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //设置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //启动Producer实例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
        //SendCallback会接收异步返回结果的回调
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println(sendResult);
            }
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
    //若是过早关闭producer,会抛出The producer service state not OK, SHUTDOWN_ALREADY的错
    Thread.sleep(10000);
    //关闭producer
    producer.shutdown();
}

普通消息单向发送

单项发送不关心发送的结果,只发送请求不等待应答。发送消息耗时极短。

public static void main(String[] args) throws Exception {
    //实例化消息生产者对象
    DefaultMQProducer producer = new DefaultMQProducer("group_luke");
    //设置NameSever地址
    producer.setNamesrvAddr("127.0.0.1:9876");
    //启动Producer实例
    producer.start();
    for (int i = 0; i < 10; i++) {
        Message msg = new Message("topic_luke", "tag", ("这是第"+i+"条消息。").getBytes(StandardCharsets.UTF_8));
        //同步发送方式
        producer.sendOneway(msg);
    }
    //关闭producer
    producer.shutdown();
}

集群消费模式

消费者采用负载均衡的方式消费消息,同一个Group下的多个Consumer共同消费Queue里的Message,每个Consumer处理的消息不同。

一个Consumer Group中的各个Consumer实例分共同消费消息,即一条消息只会投递到一个Group下面的一个实例,并且只消费一遍。

例如某个Topic有3个队列,其中一个Consumer Group 有 3 个实例,那么每个实例只消费其中的1个队列。集群消费模式是消费者默认的消费方式。

public static void main(String[] args) throws Exception {
    //实例化消息消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //订阅topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册回调实现类来处理从broker拉取回来的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者实例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

广播消费模式

广播消费模式中把消息对一个Group下的各个Consumer实例都投递一遍。也就是说消息也会被 Group 中的每个Consumer都消费一次。

实际上,是一个消费组下的每个消费者实例都获取到了topic下面的每个Message Queue去拉取消费。所以消息会投递到每个消费者实例。

public static void main(String[] args) throws Exception {
    //实例化消息消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_luke");
    //指定nameserver地址
    consumer.setNamesrvAddr("127.0.0.1:9876");
    //订阅topic,"*"表示所有tag
    consumer.subscribe("topic_luke","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    // 注册回调实现类来处理从broker拉取回来的消息
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @SneakyThrows
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt msg : msgs) {
                System.out.println(new String(msg.getBody()));
            }
            // 标记该消息已经被成功消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    // 启动消费者实例
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

以上就是RocketMQ普通消息实战演练详解的详细内容,更多关于RocketMQ普通消息的资料请关注我们其它相关文章!

(0)

相关推荐

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • java rocketmq--消息的产生(普通消息)

    前言 与消息发送紧密相关的几行代码: 1. DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); 2. producer.start(); 3. Message msg = new Message(...) 4. SendResult sendResult = producer.send(msg); 5. producer.shutdown(); 那这几行代码执行时,背后都做了什么? 一. 首先

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • RocketMQ特性Broker存储事务消息实现

    目录 引言 TransactionalMessageService 处理事务消息 第一处: 第二处: 引言 在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的. private void initialTransaction() { this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • RocketMQ普通消息实战演练详解

    目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm 普通消息同步发送 同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息.这个方式可以确保消息发送到Broker成功,一些重要的

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • SpringBoot整合rockerMQ消息队列详解

    目录 Springboot整合RockerMQ 使用总结 消费模式 生产者组和消费者组 生产者投递消息的三种方式 如何保证消息不丢失 顺序消息 分布式事务 Springboot整合RockerMQ 1.maven依赖 <dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>

  • JMS 之 Active MQ 的消息传输(详解)

    本文使用Active MQ5.6 一.消息协商器(Message Broker) broke:消息的交换器,就是对消息进行管理的容器.ActiveMQ 可以创建多个 Broker,客户端与ActiveMQ交互,实际上都是与ActiveMQ中的Broker交互,Broker配置在${MQ_HOME}\conf\activemq.xml. 二.连接器(Connectors)(一).传输连接器 (transportConnectors) transportConnectors 连接器:就是建立brok

  • hmac模块生成加入了密钥的消息摘要详解

    hmac模块 hmac模块用于生成HMAC码.这个HMAC码可以用于验证消息的完整性,其原理也很简单,就是一种加入了密钥的消息摘要,相比起MAC更加安全.JWT(JSON Web Token)中第三部分的消息摘要就是使用了HMAC. HMAC(Hash-based Message Authentication Code) 先大致介绍一下HMAC吧.HMAC是一种消息摘要算法,是一种特殊的MAC(消息认证码),内部使用别的摘要算法进行摘要的计算(比如MD5).相比MAC,HMAC在生成摘要的时候加

  • wxpython多线程防假死与线程间传递消息实例详解

    wxpython中启用线程的方法,将GUI和功能的执行分开. 网上关于python多线程防假死与线程传递消息是几年前的,这里由于wxpython和threading模块已经更新最新,因此给出最新修改代码,能在2017年最新版的python和模块中运行. 原来的publisher()和callafter都无法使用. 修改后的代码. import time import wx from threading import Thread from wx.lib.pubsub import pub cla

  • Android开发App启动流程与消息机制详解

    目录 引言 1.第一步了解 ThreadLocal 2.App的启动流程 3.Activity中创建Handler 引言 相信很多人对这个问题不陌生,但是大家回答的都比较简单,如谈到app启动流程有人就会是app的生命周期去了,谈到消息机制有人就会说looper循环消息进行分发,如果是面试可能面试官不会满意,今天我们搞一篇完善的源码解析来进行阐述上面的问题 1.第一步了解 ThreadLocal 什么是ThreadLocal呢,专业的来讲,ThreadLocal 是一个线程内部的数据存储类,通过

  • java开发RocketMQ消息中间件原理基础详解

    RocketMQ 是什么 Github 上关于 RocketMQ 的介绍: RcoketMQ 是一款低延迟.高可靠.可伸缩.易于使用的消息中间件.具有以下特性: 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 支持拉(pull)和推(push)两种消息模式 单一队列百万消息的堆积能力 支持多种消息协议,如 JMS.MQTT 等 分布式高可用的部署架构,满足至少一次消息传递语义 提供 docker 镜像用于隔离测试和云集群部署 提

  • Java RabbitMQ消息队列详解常见问题

    目录 消息堆积 保证消息不丢失 死信队列 延迟队列 RabbitMQ消息幂等问题 RabbitMQ消息自动重试机制 合理的选择重试机制 消费者开启手动ack模式 rabbitMQ如何解决消息幂等问题 RabbitMQ解决分布式事务问题 基于RabbitMQ解决分布式事务的思路 消息堆积 消息堆积的产生场景: 生产者产生的消息速度大于消费者消费的速度.解决:增加消费者的数量或速度. 没有消费者进行消费的时候.解决:死信队列.设置消息有效期.相当于对我们的消息设置有效期,在规定的时间内如果没有消费的

  • 基于RocketMQ推拉模式详解

    消费者客户端有两种方式从消息中间件获取消息并消费.严格意义上来讲,RocketMQ并没有实现PUSH模式,而是对拉模式进行一层包装,名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现. 通过 Pull 不断轮询 Broker 获取消息.当不存在新消息时,Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息. 1.概述 1.1.PULL方式 由消费者客户端主动向消息中间件(MQ消息服务器代理)拉取消息:采用Pull方式,如何设置Pull消息的拉取频率需要重点去考虑,举个

随机推荐