RocketMQ producer容错机制源码解析

目录
  • 1. 前言
  • 2. 失败重试
  • 3. 延迟故障
    • 3.1 最普通的选择策略
    • 3.2 延迟故障的实现

1. 前言

本文主要是介绍一下RocketMQ消息生产者在发送消息的时候发送失败的问题处理?这里有两个点,一个是关于消息的处理,一个是关于broker的处理,比如说发送消息到broker-a的broker失败了,我们可能下次就不想发送到这个broker-a,这就涉及到一个选择broker的问题,也就是选择MessageQueue的问题。

2. 失败重试

其实失败重试我们在介绍RocketMQ消息生产者发送消息的时候介绍过了,其实同步发送与异步发送都会失败重试的,比如说我发送一个消息,然后超时了,这时候在MQProducer层就会进行控制重试,默认是重试2次的,加上你发送那次,一共是发送3次,如果重试完还是有问题的话,这个时候就会抛出异常了。

我们来看下这一块的代码实现( DefaultMQProducerImpl 类sendDefaultImpl方法):

这块其实就是用for循环实现的,其实不光RocketMQ,分布式远程调用框架Dubbo的失败重试也是用for循环实现的。

3. 延迟故障

我们都知道,在RocketMQ中一个topic其实是有多个MessageQueue这么一个概念的,然后这些MessageQueue可能对应着不同的broker name,比如说id是0和1的MessageQueue 对应的broker name是 broker-a ,然后id是2和3的MessageQueue对应的broker name 是broker-b

我们发送消息的时候,其实涉及到发送给哪个MessageQueue这么一个问题,当然我们可以在发送消息的时候指定这个MessageQueue,如果你不指定的话,RocketMQ就会根据MQFaultStrategy 这么一个策略类给选择出来一个MessageQueue。

我们先来看下是在哪里选择的,其实就是在我们重试的循环中: org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendDefaultImpl

...
// 重试发送
for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    // todo 选择message queue
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    ...

我们可以看到,它会把topicPublishInfo 与 lastBrokerName 作为参数传进去,topicPublishInfo 里面其实就是那一堆MessageQueue, 然后这个lastBrokerName 是上次我们选择的那个broker name , 这个接着我们来看下这个selectOneMessageQueue实现:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // todo
    return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
}

可以看到它调用了MQFaultStrategy 这个类的selectOneMessageQueue 方法,我们接着进去:

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    // 发送延迟故障启用,默认为false
    if (this.sendLatencyFaultEnable) {
        try {
            // 获取一个index
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 选取的这个broker是可用的 直接返回
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            // 到这里 找了一圈 还是没有找到可用的broker
            // todo 选择 距离可用时间最近的
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        return tpInfo.selectOneMessageQueue();
    }
    // todo
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

这种延迟故障策略其实是由sendLatencyFaultEnable来控制的,它默认是关闭的。

3.1 最普通的选择策略

我们先来看下最普通的选择策略,可以看到调用了TopicPublishInfo 的selectOneMessageQueue方法:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 消息第一个发送的时候 还没有重试 也没有上一个brokerName
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        // 这个 出现在重试的时候
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            // 避开 上次发送的brokerName
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        // todo 到最后 没有避开  只能随机选一个
        return selectOneMessageQueue();
    }
}

它这里里面分成了2部分,一个是没有 这个lastBroker的,也就是这个这个消息还没有被重试过,这是第一次发送这个消息,这个时候它的lastBrokerName就是null,然后他就会直接走selectOneMessageQueue 这个无参方法。

public MessageQueue selectOneMessageQueue() {
    // 相当于 某个线程轮询
    int index = this.sendWhichQueue.getAndIncrement();
    int pos = Math.abs(index) % this.messageQueueList.size();
    if (pos < 0)
        pos = 0;
    return this.messageQueueList.get(pos);
}

先是获取这个index ,然后使用index % MessageQueue集合的大小获得一个MessageQueue集合值的一个下标(索引),这个index 其实某个线程内自增1的,这样就形成了某个线程内轮询的效果。这个样子的话,同步发送其实就是单线程的轮询,异步发送就是多个线程并发发送,然后某个线程内轮询,我们看下他这个单个线程自增1效果是怎样实现的。

public class ThreadLocalIndex {
    private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
    private final Random random = new Random();
    public int getAndIncrement() {
        Integer index = this.threadLocalIndex.get();
        // 如果不存在就创建  然后设置到threadLocalIndex中
        if (null == index) {
            index = Math.abs(random.nextInt());
            this.threadLocalIndex.set(index);
        }
        index = Math.abs(index + 1);
        this.threadLocalIndex.set(index);
        return index;
    }
}

可以看到这个sendWhichQueue 是用ThreadLocal实现的,然后这个样子就可以一个线程一个index,而且不会出现线程安全问题。

好了这里我们就把这个消息第一次发送时候MessageQueue看完了,然后我们再来看下它其他重试的时候是怎样选择的,也就是lastBrokerName不是null的时候:

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    // 消息第一个发送的时候 还没有重试 也没有上一个brokerName
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        // 这个 出现在重试的时候
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.getAndIncrement();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            // 避开 上次发送的brokerName
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        // todo 到最后 没有避开  只能随机选一个
        return selectOneMessageQueue();
    }
}

这里其实就是选择一个不是lastBrokerName 的MessageQueue,可以看到它是循环 MessageQueue 集合大小数个,这样可能把所有的MessageQueue都看一遍,注意 这个循环只是起到选多少次的作用,具体的选择还是要走某线程轮询的那一套,到最后是在是选不出来了,也就是没有这一堆MessageQueue都是在lastBrokerName上的,只能调用selectOneMessageQueue轮询选一个了。

到这我们就把最普通的选择一个MessageQueue介绍完了。

3.2 延迟故障的实现

下面我们再来介绍下那个延迟故障的实现,这个其实就是根据你这个broker 的响应延迟时间的大小,来影响下次选择这个broker的权重,他不是绝对的,因为根据它这个规则是在找不出来的话,他就会使用那套普通选择算法来找个MessageQueue。

它是这样一个原理:

  • 在每次发送之后都收集一下它这次的一个响应延迟,比如我10点1分1秒200毫秒给broker-a了一个消息,然后到了10点1分1秒900毫秒的时候才收到broker-a 的一个sendResult也就是响应,这个时候他就是700ms的延迟,它会跟你就这个300ms的延迟找到一个时间范围,他就认为你这个broker-a 这个broker 在某个时间段内,比如说30s内是不可用的。然后下次选择的时候,他在第一轮会找那些可用的broker,找不到的话,就找那些上次不是这个broker的,还是找不到的话,他就绝望了,用最普通的方式,也就是上面说的那种轮询算法找一个MessageQueue出来。

接下来我们先来看下它的收集延迟的部分,是这个样子的,还是在这个失败重试里面,然后它会在响应后或者异常后面都加一行代码来收集这些延迟:

...
// todo 进行发送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = System.currentTimeMillis();
// todo isolation 参数为false(看一下异常情况)
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
...

这是正常响应后的,注意它的isolation 参数,也就是隔离 是false,在看下异常的

...
catch (RemotingException e) {
    endTimestamp = System.currentTimeMillis();
    this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
    log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
    log.warn(msg.toString());
    exception = e;
    continue;
}
...

他这个isolation 参数就是true ,也就是需要隔离的意思。

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    // todo
    this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
}

可以看到是调用了mqFaultStrategy 的updateFaultItem 方法:

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
    // 是否开启延迟故障容错
    if (this.sendLatencyFaultEnable) {
        // todo 计算不可用持续时间
        long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
        // todo 存储
        this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
    }
}

先是判断是否开启了这个延迟故障的这么一个配置,默认是不启动的,但是你可以自己启动set下就可以了setSendLatencyFaultEnable(true)

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setSendLatencyFaultEnable(true);

首先是计算这个它认为broker不可用的这么一个时间,参数就是你那个响应延迟,熔断的话就配置30000毫秒, 否则的话就是正常的那个响应时间

/**
 * 计算不可用持续时间
 * @param currentLatency 当前延迟
 */
private long computeNotAvailableDuration(final long currentLatency) {
    // latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    // notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
    // 倒着遍历
    for (int i = latencyMax.length - 1; i >= 0; i--) {
        // 如果延迟大于某个时间,就返回对应服务不可用时间,可以看出来,响应延迟100ms以下是没有问题的
        if (currentLatency >= latencyMax[i])
            return this.notAvailableDuration[i];
    }
    return 0;
}

他这个计算规则是这个样子的,他有两个数组,一个是响应延迟的,一个是不可使用的时间,两个排列都是从小到大的顺序,倒着先找响应延迟,如果你这个延迟大于某个时间,就找对应下标的不可使用的时间,比如说响应延迟700ms,这时候他就会找到30000ms不可使用时间。

计算完这个不可使用时间后接着调用了latencyFaultTolerance的updateFaultItem方法,这个方法其实就是用来存储的:

public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    // 从缓存中获取
    FaultItem old = this.faultItemTable.get(name);
    // 缓存没有的情况
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        // 设置延迟
        faultItem.setCurrentLatency(currentLatency);
        // 设置启用时间
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        // 设置faultItemTable 中
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        // 如果已经有了,拿到 老的进行更新
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        // 缓存中已经有了,直接拿老的进行更新
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

他有个faultItemTable 这个缓存,记录着 每个broker的FaultItem的项,这个FaultItem就是保存它能够使用的一个时间(当前时间戳+不可使用时间),其实这个方法就是做更新或者插入操作。

好了到这我们就把它这个收集响应延迟指标与计算可用时间这快就解析完了,再回头看下那个选择MessageQueue的方法:

可以看到它先是找那种可用的,然后不是上一个broker的那个,如果好几轮下来没有找到的话就选择一个

public String pickOneAtLeast() {
    // 将map中里面的放到tmpList 中
    final Enumeration<FaultItem> elements = this.faultItemTable.elements();
    List<FaultItem> tmpList = new LinkedList<FaultItem>();
    while (elements.hasMoreElements()) {
        final FaultItem faultItem = elements.nextElement();
        tmpList.add(faultItem);
    }
    // 如果不是null
    if (!tmpList.isEmpty()) {
        // 洗牌算法
        Collections.shuffle(tmpList);
        // 排序
        Collections.sort(tmpList);
        final int half = tmpList.size() / 2;
        // 没有 2台机器
        if (half <= 0) {
            // 选择第一个
            return tmpList.get(0).getName();
        } else {
            // 有2台机器及以上,某个线程内随机选排在前半段的broker
            final int i = this.whichItemWorst.getAndIncrement() % half;
            return tmpList.get(i).getName();
        }
    }
    return null;
}

先是排序,然后将所有的broker/2 ,如果是小于等于0的话,说明就2个broker以下,选第一个,如果是2台以上,就轮询选一个

先来看下排序规则:

/**
 * 失败条目(规避规则条目)
 */
class FaultItem implements Comparable<FaultItem> {
    // 条目唯一键,这里是brokerName
    private final String name;
    // todo currentLatency 和startTimestamp  被volatile修饰
    // 本次消息发送的延迟时间
    private volatile long currentLatency;
    // 故障规避的开始时间
    private volatile long startTimestamp;
    public FaultItem(final String name) {
        this.name = name;
    }
    @Override
    public int compareTo(final FaultItem other) {
        // 将能提供服务的放前面
        if (this.isAvailable() != other.isAvailable()) {
            if (this.isAvailable())
                return -1;
            if (other.isAvailable())
                return 1;
        }
        // 找延迟低的 放前面
        if (this.currentLatency < other.currentLatency)
            return -1;
        else if (this.currentLatency > other.currentLatency) {
            return 1;
        }
        // 找最近能提供服务的  放前面
        if (this.startTimestamp < other.startTimestamp)
            return -1;
        else if (this.startTimestamp > other.startTimestamp) {
            return 1;
        }
        return 0;
    }

它是把能提供服务的放前面,然后没有,就找那种延迟低的放前面,也没有的话就找最近能提供服务的放前头。 找到这个broker 之后然后根据这个broker name 获取写队列的个数,其实你这个写队列个数有几个,然后你这个broker对应的MessageQueue就有几个,如果write size >0的话,然后这个broker 不是null,就找一个mq,然后设置上它的broker name 与queue id

如果write<=0,直接移除这个broker对应FaultItem,最后实在是找不到就按照上面那种普通方法来找了。

好了,到这我们延迟故障也介绍完成了。

参考文章

RocketMQ4.8注释github地址

RocketMQ源码分析专栏

以上就是RocketMQ producer容错机制源码解析的详细内容,更多关于RocketMQ producer容错机制的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    目录 RocketMq消息处理 1. 处理PULL_MESSAGE请求 2. 获取消息 3. 挂起请求:PullRequestHoldService#suspendPullRequest 3.1 处理挂起请求的线程:PullRequestHoldService 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup 3.3 消息分发中唤醒consumer请求 总结 RocketMq消息处理 RocketMq消息处理整个流程如下: 本系列Roc

  • RocketMQ broker文件清理源码解析

    目录 1. broker 清理文件介绍 1.1 哪些文件需要清理 1.2 RocketMQ文件清理的机制 2. 源码解析 2.1 清理commitlog 2.2 ConsumeQueue 清理 2.3 indexFile 清理 3. 总结 1. broker 清理文件介绍 本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈 1.1 哪些文件需要清理 首先我们需要介绍下在RocketMQ中哪些文件需要清理,其实可以想一想,在RocketMQ

  • RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

    目录 前言 ConsumeQueue详解 IndexFile详解 IndexHeader slots槽位 indexes索引数据 实时更新ConsumeQueue与IndexFile源码分析 CommitLogDispatcherBuildConsumeQueue源码分析 CommitLogDispatcherBuildIndex源码分析 IndexFile如何解决Hash冲突 总结 前言 前面我们介绍了消息是如何存储的,消息是如何刷盘的,讲的都是CommitLog是如何存储和刷盘的.虽然Com

  • RocketMQ producer同步发送单向发送源码解析

    目录 RocketMQ生产者发送消息分为三种模式 1. 同步发送 1.1 DefaultMQProducerImpl#sendDefaultImpl 1.2 DefaultMQProducerImpl#sendKernelImpl 1.3 MQClientAPIImpl#sendMessage 1.4 MQClientAPIImpl#sendMessageSync 1.5 NettyRemotingClient#invokeSync 2. 单向发送 2.1 DefaultMQProducerIm

  • 详解Redis 缓存删除机制(源码解析)

    删除的范围 过期的 key 在内存满了的情况下,如果继续执行 set 等命令,且所有 key 都没有过期,那么会按照缓存淘汰策略选中的 key 过期删除 redis 中设置了过期时间的 key 会单独存储一份 typedef struct redisDb { dict *dict; // 所有的键值对 dict *expires; //设置了过期时间的键值对 // ... } redisDb; 设置有效期 Redis 中有 4 个命令可以给 key 设置过期时间,分别是 expire pexpi

  • React事件机制源码解析

    React v17里事件机制有了比较大的改动,想来和v16差别还是比较大的. 本文浅析的React版本为17.0.1,使用ReactDOM.render创建应用,不含优先级相关. 原理简述 React中事件分为委托事件(DelegatedEvent)和不需要委托事件(NonDelegatedEvent),委托事件在fiberRoot创建的时候,就会在root节点的DOM元素上绑定几乎所有事件的处理函数,而不需要委托事件只会将处理函数绑定在DOM元素本身. 同时,React将事件分为3种类型--d

  • Kubernetes controller manager运行机制源码解析

    目录 Run StartControllers ReplicaSet ReplicaSetController syncReplicaSet Summary Run 确立目标 理解 kube-controller-manager 的运行机制 从主函数找到run函数,代码较长,这里精简了一下 func Run(c *config.CompletedConfig, stopCh <-chan struct{}) error { // configz 模块,在kube-scheduler分析中已经了解

  • Spring AOP实现声明式事务机制源码解析

    目录 一.声明式全局事务 二.源码 三.小结: 一.声明式全局事务 在Seata示例工程中,能看到@GlobalTransactional,如下方法示例: @GlobalTransactional public boolean purchase(long accountId, long stockId, long quantity) { String xid = RootContext.getXID(); LOGGER.info("New Transaction Begins: " +

  • jquery事件绑定解绑机制源码解析

    引子 为什么Jquery能实现不传回调函数也能解绑事件?如下: $("p").on("click",function(){ alert("The paragraph was clicked."); }); $("#box1").off("click"); 事件绑定解绑机制 调用on函数的时候,将生成一份事件数据,结构如下: { type: type, origType: origType, data: da

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • Flink状态和容错源码解析

    目录 引言 概述 State Keyed State 状态实例管理及数据存储 HeapKeyedStateBackend RocksDBKeyedStateBackend OperatorState 上层封装 总结 引言 计算模型 DataStream基础框架 事件时间和窗口 状态和容错 部署&调度 存储体系 底层支撑 Flink中提供了State(状态)这个概念来保存中间计算结果和缓存数据,按照不同的场景,Flink提供了多种不同类型的State,同时为了实现Exactly once的语义,F

  • Tomcat的类加载机制流程及源码解析

    目录 前言 1.Tomcat 的类加载器结构图: 2.Tomcat 的类加载流程说明: 3.源码解析: 4.为什么tomcat要实现自己的类加载机制: 前言 在前面 Java虚拟机:对象创建过程与类加载机制.双亲委派模型 文章中,我们介绍了 JVM 的类加载机制以及双亲委派模型,双亲委派模型的类加载过程主要分为以下几个步骤: (1)初始化 ClassLoader 时需要指定自己的 parent 是谁 (2)先检查类是否已经被加载过,如果类已经被加载了,直接返回 (3)若没有加载则调用父加载器 p

  • Mysql锁内部实现机制之C源码解析

    目录 概述 行锁结构 表锁结构 事务中锁的描述 概述 虽然现在关系型数据库越来越相似,但其背后的实现机制可能大相径庭.实际使用方面,因为SQL语法规范的存在使得我们熟悉多种关系型数据库并非难事,但是有多少种数据库可能就有多少种锁的实现方法. Microsoft Sql Server2005之前只提供页锁,直到2005版本才开始支持乐观并发.悲观并发,乐观模式下允许实现行级别锁,在Sql Server的设计中锁是一种稀缺资源,锁的数量越多,开销就越大,为了避免因为锁的数量快速攀升导致性能断崖式下跌

  • Android消息循环机制源码深入理解

    Android消息循环机制源码 前言: 搞Android的不懂Handler消息循环机制,都不好意思说自己是Android工程师.面试的时候一般也都会问这个知识点,但是我相信大多数码农肯定是没有看过相关源码的,顶多也就是网上搜搜,看看别人的文章介绍.学姐不想把那个万能的关系图拿出来讨论. 近来找了一些关于android线程间通信的资料,整理学习了一下,并制作了一个简单的例子. andriod提供了 Handler 和 Looper 来满足线程间的通信.例如一个子线程从网络上下载了一副图片,当它下

随机推荐