RocketMQ生产者如何规避故障Broker方式详解

目录
  • 前言
  • 收集故障Broker
  • 选择Broker
  • 小结

前言

在消息发送过程中,生产者从NameServer中获取到了指定Topic对应的Broker信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的。那么Broker有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker的呢?

收集故障Broker

我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl类中查找:

this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);

无论是发送成功还是失败,RocketMQ生产者客户端都会做这一步操作:

// 发送成功的话,isolation传false,失败isolation传true
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    if (this.sendLatencyFaultEnable) {
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}
private long computeNotAvailableDuration(final long currentLatency) {
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

如果Broker产生故障,那么会创建一个FaultItem对象记录故障的Broker,并把结果放进故障规避表faultItemTable中,数据格式如下:

"broker-a": {
  // broker名称
  "name": "broker-a",
  "currentLatency": 发送消息消耗的时间,毫秒值之差,
  // 解除规避的时间,绝对时间
  "startTimestamp": 时间戳毫秒值
},
"broker-b": {
  // broker名称
  "name": "broker-b",
  "currentLatency": 发送消息消耗的时间,毫秒值之差,
  // 解除规避的时间,绝对时间
  "startTimestamp": 时间戳毫秒值
}

发送成功的Broker设置的故障规避时间为0,发送失败的Broker将被设置为规避30秒;

选择Broker

MQFaultStrategy.selectOneMessageQueue()方法中,我们分三部分来分析如何选择Broker。

  • 轮询选择一个可用的Broker
// 轮询的基本套路,一个自增变量
int index = tpInfo.getSendWhichQueue().incrementAndGet();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
     // 通过对队列数量取模,获取选定的Broker所在的位置
     int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
     if (pos < 0)
         pos = 0;
     MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
     // 判断Broker是否在规避时间内,如果不在规避时间内,就选择这个Broker,否则继续循环直至所有Broker都在规避时间内
     if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
         return mq;
}

1.轮询的基本套路都是通过一个自增变量来对所有的Broker数量取模,这样就可以命中一个Broker;

2.针对命中的Broker判断是否在规避时间范围内,不在规避时间内就可以返回;否则只能进入第二个方案;

  • 选择一个相对延迟低的Broker
// 把所有规避列表中的Broker按延迟高低排序,并从延迟低的Broker中选择一个
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
// 判断该Broker是否允许写消息
if (writeQueueNums > 0) {
    final MessageQueue mq = tpInfo.selectOneMessageQueue();
    if (notBestBroker != null) {
        mq.setBrokerName(notBestBroker);
        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
    }
    // 返回选中的Broker
    return mq;
}

1.从规避列表中找到延时比较低的Broker;

2.判断该Broker是否允许写消息,允许写消息的话就直接返回,否则再进入下一个方案;

  • 默认的选择
return tpInfo.selectOneMessageQueue();

最后直接轮询一个Broker直接返回:

    public MessageQueue selectOneMessageQueue() {
        int index = this.sendWhichQueue.incrementAndGet();
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        return this.messageQueueList.get(pos);
    }

该方案是默认方案,没有开启故障规避配置的话,所有Broker的选择都是使用的该方案;

小结

RocketMQ通过设置故障规避表的方式,把所有的Broker的延迟数据都保留在故障规避表中,根据该列表制定了以下几种策略:

1.优先选择不在规避时间范围内的Broker

2.如果所有Broker都在规避时间内,优先选择延迟低的Broker

3.如果依然没有选中合适的Broker,那么就直接挑一个Broker来用;

以上就是RocketMQ生产者如何规避故障Broker方式详解的详细内容,更多关于RocketMQ生产者规避故障Broker的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ broker文件清理源码解析

    目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2. 源码解析 2.1 清理commitlog 2.2 ConsumeQueue 清理 2.3 indexFile 清理 3. 总结 1. broker 清理文件介绍 本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈 1.1 哪些文件需要清理 首先我们需要介绍下在RocketMQ中哪些文件需要清理,其实可以想一想,在RocketMQ

  • RocketMQ 源码分析Broker消息刷盘服务

    目录 前言 刷盘服务源码分析 CommitRealTimeService刷盘源码分析 FlushRealTimeService刷盘源码分析 GroupCommitService刷盘源码分析 总结 前言 上篇文章我们介绍了消息刷盘的四种方式,本篇文章我们来介绍Broker是如何实现这四种刷盘方式. 刷盘服务源码分析 Broker中的四种刷盘分别是由CommitRealTimeService,FlushRealTimeService,GroupCommitService将消息从内存中刷到磁盘上的.在

  • RocketMQ消息生产者是如何选择Broker示例详解

    目录 前言 从NameServer查询Topic信息 如何选择Broker 小结 前言 在RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢? 从NameServer查询Topic信息 通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sen

  • RocketMQ Broker如何保存消息源码解析

    目录 前言 消息存储格式总览 CommitLog介绍 MappedFile详解 消息存储格式介绍 DefaultMessageStore介绍 消息存储源码分析 发送消息存储流程 消息预处理阶段 消息保存阶段 消息保存结果处理阶段 总结 前言 前面我们介绍了RocketMQ是如何接收消息的,下面我们来介绍Broker是如何保存消息的. 消息存储格式总览 Broker消息存储主要包括CommitLog,ConsumerQueue和Index三个部分. CommitLog CommitLog主要用于消

  • RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    目录 RocketMq消息处理 1. 处理PULL_MESSAGE请求 2. 获取消息 3. 挂起请求:PullRequestHoldService#suspendPullRequest 3.1 处理挂起请求的线程:PullRequestHoldService 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup 3.3 消息分发中唤醒consumer请求 总结 RocketMq消息处理 RocketMq消息处理整个流程如下: 本系列Roc

  • RocketMq 消息重试机制及死信队列详解

    目录 生产者消息重试 消费者消息重试 并发消费 顺序消费 并发消费和顺序消费区别 死信队列 实践出真知 公共部分创建 测试并发消费 并发消费状态 测试顺序消费 顺序消费状态 测试死信队列 死信队列特性 生产者消息重试 消息队列中的消息消费时并不能保证总是成功的,那失败的消息该怎么进行消息补偿呢?这就用到今天的主角消息重试和死信队列了. 有时因为网路等原因生产者也可能发送消息失败,也会进行消息重试,生产者消息重试比较简单,在springboot中只要在配置文件中配置一下就可以了. # 异步消息发送

  • springboot 注册服务注册中心(zk)的两种方式详解

    在使用springboot进行开发的过程中,我们经常需要处理这样的场景:在服务启动的时候,需要向服务注册中心(例如zk)注册服务状态,以便当服务状态改变的时候,可以故障摘除和负载均衡. 我遇到过两种注册的途径: 1.在Spring的webapplication启动完成后,直接进行注册: 2.在servlet容器启动完成后,通过listener进行注册. 本文通过一个demo讲述一下这两种注册方式,使用的是传统的向zk注册的方案. 1.Spring webapplication启动完成后注册 先上

  • Java多线程通信实现方式详解

    这篇文章主要介绍了Java多线程通信实现方式详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 线程通信的方式: 1.共享变量 线程间通信可以通过发送信号,发送信号的一个简单方式是在共享对象的变量里设置信号值.线程A在一个同步块里设置boolean型成员变量hasDataToProcess为true,线程B也在同步代码块里读取hasDataToProcess这个成员变量.这个简单的例子使用了一个持有信号的对象,并提供了set和get方法. pu

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • Spark三种属性配置方式详解

    随着Spark项目的逐渐成熟, 越来越多的可配置参数被添加到Spark中来.在Spark中提供了三个地方用于配置: 1.Spark properties:这个可以控制应用程序的绝大部分属性.并且可以通过 SparkConf对象或者Java 系统属性进行设置: 2.环境变量(Environment variables):这个可以分别对每台机器进行相应的设置,比如IP.这个可以在每台机器的$SPARK_HOME/ conf/spark-env.sh脚本中进行设置: 3.日志:所有的日志相关的属性可以

  • JS定义类的六种方式详解

    在前端开发中,经常需要定义JS类.那么在JavaScript中,定义类的方式有几种,分别是什么呢?本文就JS定义类的六中方式说明如下(案例说明): 1.工厂方式 function Car(){ var ocar = new Object; ocar.color = "blue"; ocar.doors = 4; ocar.showColor = function(){ document.write(this.color) }; return ocar; } var car1 = Car

  • 关于react-router的几种配置方式详解

    本文介绍关于react-router的几种配置方式详解,分享给大家,具体如下: 路由的概念 路由的作用就是将url和函数进行映射,在单页面应用中路由是必不可少的部分,路由配置就是一组指令,用来告诉router如何匹配url,以及对应的函数映射,即执行对应的代码. react-router 每一门JS框架都会有自己定制的router框架,react-router就是react开发应用御用的路由框架,目前它的最新的官方版本为4.1.2.本文给大家介绍的是react-router相比于其他router

  • classloader类加载器_基于java类的加载方式详解

    基础概念 Classloader 类加载器,用来加载 Java 类到 Java 虚拟机中.与普通程序不同的是.Java程序(class文件)并不是本地的可执行程序.当运行Java程序时,首先运行JVM(Java虚拟机),然后再把Java class加载到JVM里头运行,负责加载Java class的这部分就叫做Class Loader. JVM本身包含了一个ClassLoader称为Bootstrap ClassLoader,和JVM一样,BootstrapClassLoader是用本地代码实现

  • hibernate4基本配置方式详解

    可编程的配置方式-1 如果在配置cfg.xml的时候,不想在里面配置hbm.xml怎么办呢?可在程序里使用可编程的配置方式,也就是使用程序来指定在cfg.xml里面的配置信息,不推荐这种方式.如下: Configuration cfg= new Configuration() .addResource("Item.hbm.xml") .addResource("Bid.hbm.xml"); 一个替代方法(有时是更好选择)是,指定被映射的类,让Hibernate帮你寻

随机推荐