java开发RocketMQ生产者高可用示例详解

目录
  • 引言
  • 1 消息
    • 1.1 topic
    • 1.2 Body
    • 1.3 tag
    • 1.4 key
    • 1.5 延迟级别
  • 2 生产者高可用
    • 2.1 客户端保证生产者高可用
      • 2.1.1 重试机制
      • 2.1.2 客户端容错
    • 2.2 Broker端保证生产者高可用

引言

前边两章说了点基础的,从这章开始,我们挖挖源码。看看RocketMQ是怎么工作的。

首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了。

我们看看消息孩子们都长啥样。

1 消息

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题名字
    private String topic;
    //消息扩展信息,Tag,keys,延迟级别都存在这里
    private Map<String, String> properties;
    //消息体,字节数组
    private byte[] body;
    //设置消息的key,
    public void setKeys(String keys) {}
    //设置topic
    public void setTopic(String topic) {}
    //延迟级别
    public int setDelayTimeLevel(int level) {}
    //消息过滤的标记
    public void setTags(String tags) {}
    //扩展信息存放在此
    public void putUserProperty(final String name, final String value) {}
}

消息就是孩子们,这些孩子们呢,有各自的特点,也有共性。同一个家长送来的两个孩子可以是去同一个地方的,也可以是去不同的地方的。

1.1 topic

首先呢,每个孩子消息都有一个属性topic,这个我们上文说到了,是一个候船大厅。孩子们进来之后,走到自己指定的候船大厅的指定区域(平时出门坐火车高铁不也是指定的站台乘车么),坐到message queue座位上等,等着出行。

Broker有一个或者多个topic,消息会存放到topic内的message queue内,等待被消费。

1.2 Body

孩子消息,也有一个Body属性,这就是他的能力,他会画画,他会唱歌,他会干啥干啥,就记录在这个Body属性里。等走出去了,体现价值的地方也是这个Body属性。

Body就是消息体,消费者会根据消息体执行对应的操作。

1.3 tag

这个tag我们上节说了,就是一个标记,有的孩子背着画板,相机,有的游船就特意找到这些孩子拉走,完成他们的任务。

可以给消息设置tag属性,消费者可以选择含有特定tag属性的消息进行消费。

1.4 key

key就是每个孩子消息的名字了。要找哪个孩子,喊他名就行。

对发送的消息设置好 Key,以后可以根据这个Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。

1.5 延迟级别

当然,还有的孩子来就不急着走,来之前就想好了,要恰个饭,得30分钟,所以自己来了会等30分钟后被接走。

设置延迟级别可以规定多久后消息可以被消费。

2 生产者高可用

每个送孩子来的家长都希望能送到候船大厅里,更不希望孩子被搞丢了,这个时候这个候船大厅就需要一些保证机制了。

2.1 客户端保证生产者高可用

2.1.1 重试机制

就是说家长送来了,孩子进到候船大厅之后,没能成功坐到message queue座位上,这个时候工作人员会安排重试,再去看是否有座位坐。重试次数默认是2次,也就是说,消息孩子共有3次找座位坐的机会。

看源码,我特意加了注解,大致可以看懂一些了。

//这里取到了重试的次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    //获取消息队列
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        mq = mqSelected;
        brokersSent[times] = mq.getBrokerName();
        try {
            beginTimestampPrev = System.currentTimeMillis();
            if (times > 0) {
                //Reset topic with namespace during resend.
                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
            }
            long costTime = beginTimestampPrev - beginTimestampFirst;
            if (timeout < costTime) {
                callTimeout = true;
                break;
            }
            //发送消息
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            ...
        } catch (RemotingException e) {
            ...
            continue;
        } catch (MQClientException e) {
            ...
            continue;
        } catch (MQBrokerException e) {
            ...
            continue;
        } catch (InterruptedException e) {
            //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
            throw e;
        }
    } else {
        break;
    }
}

重试代码如上,这个sendDefaultImpl方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。

2.1.2 客户端容错

若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤比较容易进入的来进入。当然那些已经关闭的停电的没有服务能力的,我们是不会进的。

MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.

选择Broker就是在选择message queue,对应的代码如下:

这里会先判断延迟容错开关是否开启,这个开关默认是关闭的,若是开启的话,会优先选择延迟较低的Broker。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //判断发送延迟容错开关是否开启
    if (this.sendLatencyFaultEnable) {
        try {
            //选择一个延迟上可以接受,并且和上次发送相同的Broker
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //若是Broker的延迟时间可以接受,则返回这个Broker
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        //若是前边都没选中一个Broker,就随机选一个Broker
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

但是当延迟容错开关关闭状态的时候,执行的代码如下:

为了均匀分散Broker的压力,会选择与之前不同的Broker

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //若是没有上次的Brokername做参考,就随机选一个
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //如果有,那么就选一个其他的Broker
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //这里判断遇上一个使用的Broker不是同一个
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //若是上边的都没选中,那么就随机选一个
        return selectOneMessageQueue();
    }
}

2.2 Broker端保证生产者高可用

Broker候船大厅为了能确切的接收到消息孩子,至少会有两个厅,一个主厅一个副厅,一般来说孩子都会进入到主厅,然后一顿操作,卡该忙信那机资(影分身之术),然后让分身进入到副厅,这样当主厅停电了,不工作了,副厅的分身只要去完成了任务就ok的。一般来说都是主厅的消息孩子去坐船完成任务。

之后我们会聊到Broker的主从复制,分为同步复制和异步复制,同步复制时指当master 收到消息之后,同步到slaver才算消息发送成功。异步复制是只要master收到消息就算成功。生产中建议至少部署两台master和两台slaver。

下一篇,我们聊聊,消息的发送流程,就是说,一个消息孩子,从进码头的门到坐到message queue座位上,都经历了啥。

以上就是java开发RocketMQ生产者高可用示例详解的详细内容,更多关于java RocketMQ生产者高可用的资料请关注我们其它相关文章!

(0)

相关推荐

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • java开发线上事故理解RocketMQ异步精髓

    目录 引言 1 业务场景 2 线程池模式 3 本地内存 + 定时任务 4 MQ 模式 5 Agent 服务 + MQ 模式 6 总结 第一层:什么场景下需要异步 第二层:异步的外功心法 第三层:异步的本质 引言 在高并发的场景下,异步是一个极其重要的优化方向. 前段时间,生产环境发生一次事故,笔者认为事故的场景非常具备典型性 . 写这篇文章,笔者想和大家深入探讨该场景的架构优化方案.希望大家读完之后,可以对异步有更深刻的理解. 1 业务场景 老师登录教研平台,会看到课程列表,点击课程后,课程会以

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • java开发RocketMQ生产者高可用示例详解

    目录 引言 1 消息 1.1 topic 1.2 Body 1.3 tag 1.4 key 1.5 延迟级别 2 生产者高可用 2.1 客户端保证生产者高可用 2.1.1 重试机制 2.1.2 客户端容错 2.2 Broker端保证生产者高可用 引言 前边两章说了点基础的,从这章开始,我们挖挖源码.看看RocketMQ是怎么工作的. 首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了. 我们看看消息孩子们都长啥样. 1 消息 public class Message implemen

  • RocketMQ Push 消费模型示例详解

    目录 使用 DefaultMQPushConsumer 消费消息 基于长轮询机制的伪 push 实现 客户端侧发起的长轮询请求 服务端阻塞请求 客户端回调处理 客户端发起请求的底层逻辑 PullCallback 回调 总结 Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端). 使用 DefaultMQPushConsumer 消费消息 下面是使用 DefaultMQPushConsumer 消费消息的

  • Java使用FileInputStream流读取文件示例详解

    一.File流概念 JAVA中针对文件的读写操作设置了一系列的流,其中主要有FileInputStream,FileOutputStream,FileReader,FileWriter四种最为常用的流 二.FileInputStream 1)FileInputStream概念  FileInputStream流被称为文件字节输入流,意思指对文件数据以字节的形式进行读取操作如读取图片视频等 2)构造方法 2.1)通过打开与File类对象代表的实际文件的链接来创建FileInputStream流对象

  • Java开发中为什么要使用单例模式详解

    一.什么是单例模式? 单例设计模式(Singleton Design Pattern)理解起来非常简单.一个类只允许创建一个对象(或者实例),那这个类就是一个单例类,这种设计模式就叫作单例设计模式,简称单例模式. 二.实战案例一:处理资源访问冲突 我们先来看第一个例子.在这个例子中,我们自定义实现了一个往文件中打印日志的 Logger 类.具体的代码实现如下所示: public class Logger { private FileWriter writer; public Logger() {

  • Java设计模式之建造者模式的示例详解

    目录 定义 案例 需求 方案一 方案二 对比分析 总结 建造者模式的优势: 注意点 定义 建造者模式(Builder Pattern),又叫生成器模式,是一种对象构建模式 它可以将复杂对象的建造过程抽象出来,使这个抽象过程的不同实现方法可以构造出不同表现的对象.建造者模式是一步一步创建一个复杂的对象,它允许用户只通过指定复杂对象的类型和内容就可 以构建它们,用户不需要知道内部的具体构建细节. 案例 需求 女生每天化妆,假如只需要做发型,香水,衣服,并要求按照发型——>香水——>衣服的顺序进行,

  • Java设计模式之原型模式的示例详解

    目录 定义 案例 需求 方案一 方案二 对比分析 总结 定义 用原型实例指定创建对象的种类,并且通过拷贝这些原型创建新的对象 即实现了一个原型接口,该接口用于创建当前对象的克隆,当直接创建对象的代价比较大时,则采用这种模式 案例 需求 张三要打印100000份照片 方案一 定义照片类 /** * 照片类 * @author:liyajie * @createTime:2022/2/15 11:47 * @version:1.0 */ @Data @AllArgsConstructor publi

  • Java设计模式之桥接模式的示例详解

    目录 定义 案例 需求 方案一 方案二 对比分析 总结 定义 桥梁模式是对象的结构模式.又称为柄体(Handle and Body)模式或接口(Interface)模式.桥梁模式的用意是“将抽象化(Abstraction)与实现化(Implementation)脱耦,使得二者可以独立地变化”. 案例 需求 通过企业微信和qq的方式给员工发送消息 方案一 定义发送消息的接口 /** * 发送消息的接口 * @author:liyajie * @createTime:2022/2/21 10:33

  • Java设计模式之享元模式示例详解

    目录 定义 原理类图 案例 需求 方案:享元模式 分析 总结 定义 享元模式(FlyWeight Pattern),也叫蝇量模式,运用共享技术,有效的支持大量细粒度的对象,享元模式就是池技术的重要实现方式. 原理类图 Flyweight :抽象的享元角色,他是抽象的产品类,同时他会定义出对象的内部状态和外部状态 ConcreteFlyweight :是具体的享元角色,具体的产品类,实现抽象角色,实现具体的业务逻辑 UnsharedConcreteFlyweight :不可共享的角色,这个角色也可

  • Android开发Kotlin实现圆弧计步器示例详解

    目录 效果图 定义控件的样式 自定义StepView 绘制文本坐标 Android获取中线到基线距离 效果图 定义控件的样式 看完效果后,我们先定义控件的样式 <!-- 自定义View的名字 StepView --> <!-- name 属性名称 format 格式 string 文字 color 颜色 dimension 字体大小 integer 数字 reference 资源或者颜色 --> <declare-styleable name="StepView&q

  • Java中枚举类的用法示例详解

    目录 1.引入枚举类 2.实现枚举类 3.枚举类的使用注意事项 4.枚举的常用方法 5.enum细节 1.引入枚举类 Java 枚举是一个特殊的类,一般表示一组常量,比如一年的 4 个季节,一个年的 12 个月份,一个星期的 7 天,方向有东南西北等. Java 枚举类使用 enum 关键字来定义,各个常量使用逗号 , 来分割. 示例: enum Color { RED, GREEN, BLUE; } 2.实现枚举类 接下来我们来看一个一个简单的DEMO示例: /** * java枚举 */ p

随机推荐