RocketMQ设计之故障规避机制

NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端。故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外

MQFaultStrategy类的:

public class MQFaultStrategy {
    private final static InternalLogger log = ClientLogger.getLog();
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();

    private boolean sendLatencyFaultEnable = false;

    private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

    public long[] getNotAvailableDuration() {
        return notAvailableDuration;
    }

    public void setNotAvailableDuration(final long[] notAvailableDuration) {
        this.notAvailableDuration = notAvailableDuration;
    }

    public long[] getLatencyMax() {
        return latencyMax;
    }

    public void setLatencyMax(final long[] latencyMax) {
        this.latencyMax = latencyMax;
    }

    public boolean isSendLatencyFaultEnable() {
        return sendLatencyFaultEnable;
    }

    public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
        this.sendLatencyFaultEnable = sendLatencyFaultEnable;
    }

    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        //是否开启故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    //判断Queue是否可用
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }

                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }

            return tpInfo.selectOneMessageQueue();
        }

        //默认轮询
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }

    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }

    private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
    }
}

在选择查找路由时,选择消息队列的关键步骤:

  • 先按轮询算法选择一个消息队列
  • 从故障列表判断该消息队列是否可用

LatencyFaultToleranceImpl中判断是否可用:

@Override
public boolean isAvailable(final String name) {
    final FaultItem faultItem = this.faultItemTable.get(name);
    if (faultItem != null) {
        return faultItem.isAvailable();
    }
    return true;
}

public boolean isAvailable() {
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
  • 判断是否在故障列表中,不在故障列表中代表可用。
  • 在故障列表中判断当前时间是否大于等于故障规避的开始时间startTimestamp

在消息发送结束后和发送出现异常时调用updateFaultItem()方法来更新故障列表,computeNotAvailableDuration()根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。

LatencyFaultToleranceImpl类的updateFaultItem方法:

@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
    FaultItem old = this.faultItemTable.get(name);
    if (null == old) {
        final FaultItem faultItem = new FaultItem(name);
        faultItem.setCurrentLatency(currentLatency);
        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

        //加入故障列表
        old = this.faultItemTable.putIfAbsent(name, faultItem);
        if (old != null) {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    } else {
        old.setCurrentLatency(currentLatency);
        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
    }
}

FaultItem存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用

到此这篇关于RocketMQ设计之故障规避机制的文章就介绍到这了,更多相关RocketMQ故障规避机制内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ重试机制及消息幂代码实例解析

    这篇文章主要介绍了RocketMQ重试机制及消息幂代码实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.重试机制 1.由于MQ经常处于复杂的分布式系统中,考虑网络波动,服务宕机,程序异常因素,很有可能出现消息发送或者消费失败的问题.因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点.如果没有消息重试,就可能产生消息丢失的问题,可能对系统产生很大的影响.所以,秉承宁可多发消息,也不可丢失消息的原则,大部分MQ都对消息重试提供了很好

  • RocketMQ之NameServer架构设计及启动关闭流程源码分析

    目录 NameServer 1.架构设计 2.核心类与配置 NamesrvController NamesrvConfig NettyServerConfig RouteInfoManager 3.启动与关闭流程 3.1.步骤一 3.2.步骤二 3.3.步骤三 NameServer 1.架构设计 消息中间件的设计思路一般都是基于主题订阅与发布的机制,RocketMQ也不例外.RocketMQ中,消息生产者(Producer)发送某主题的消息到消息服务器,消息服务器对消息进行持久化存储,而消息消费

  • 微服务架构设计RocketMQ进阶事务消息原理详解

    目录 前言 RocketMQ事务流程概要 RocketMQ事务流程关键 实现 基础配置 引入组件 添加配置 发送半消息 执行本地事务与回查 消费消息 测试 总结 前言 分布式消息选型的时候是否支持事务消息是一个很重要的考量点,而目前只有RocketMQ对事务消息支持的最好.今天我们来唠唠如何实现RocketMQ的事务消息! Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败

  • 微服务架构设计RocketMQ基础及环境整合

    目录 概述&选型 单机安装配置 双机主从高可用搭建 启动多个NameServer 和 Broker 重要参数说明 可视化管理平台 SpringBoot整合RocketMQ 引入组件rocketmq-spring-boot-starter 依赖 修改application.yml,添加RocketMQ相关配置 编写消息生产者 MessageProduce 编写消息消费者 MessageConsumer 编写单元测试发送消息 测试 概述&选型 消息队列作为高并发系统的核心组件之一,能够帮助业务

  • RocketMQ设计之故障规避机制

    NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端.故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题.默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外 MQFaultStrategy类的: public class MQFaultStrategy {     private final static InternalLogger log = ClientLogger.get

  • RocketMQ设计之主从复制和读写分离

    目录 一.主从复制 二.读写分离 一.主从复制 RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费数据丢失. RocketMQ采用Broker数据主从复制机制,当消息发送到Master服务器后会将消息同步到Slave服务器,如果Master服务器宕机,消息消费者还可以继续从Slave拉取消息. 消息从Master服务器复制到Slave服务器上,有两种复制方式:同步复制SYNC_MASTER和异步复制ASYN

  • RocketMQ设计之同步刷盘

    在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件. CommitLog的handleDiskFlush方法: public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {     // Synchronization flush     if (FlushDiskType.SYNC_FLUSH ==

  • RocketMQ设计之异步刷盘

    上一篇RocketMQ设计之同步刷盘 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大:当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 RocketMQ默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池 CommitLog的handleDiskFlush方法: public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMe

  • RocketMQ消息丢失场景以及解决方法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图: 上图中大致包含了这么几种场景: 生产者产生消息发送给RocketMQRocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束 这三种场景都可能会产生消息的丢失,如下图所示: 场景1中生产者将消息发送

  • 详解RocketMQ中的消费者启动与消费流程分析

    目录 一.简介 1.1 RocketMQ 简介 1.2 工作流程 二.消费者启动流程 2.1 实例化消费者 2.2 设置NameServer和订阅topic过程 2.2.1 添加tag 2.2.2 发送心跳至Broker 2.2.3上传过滤器类至FilterServer 2.3 注册回调实现类 2.4 消费者启动 三.pull/push 模式消费 3.1 pull模式-DefaultMQPullConsumer 3.2 push模式-DefaultMQPushConsumer 3.3 小结 四.

  • RocketMQ消息生产者是如何选择Broker示例详解

    目录 前言 从NameServer查询Topic信息 如何选择Broker 小结 前言 在RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢? 从NameServer查询Topic信息 通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sen

  • 设计高可用和高负载的网站系统的几个注意事项

    其实要设计一个高可用.高负载的系统还是有一定的规矩可循的,其手段无外乎向上扩展(Sacle Up 硬件扩展)或者向外扩展(Scale Out 软件扩展),这两种方案在某一阶段时期,会显著改善网站的性能,但不久之后,问题依旧.本文参考网上相关资料,试图提供一个可行的 "有限" 解决方案. 早期 1. 对业务应用进行垂直分割,将不同的业务边界划分出来.程序员常说的 "多层体系" 只是纵向解决了不同编程层次的划分,相对于业务而言,并没有做出什么处理.现在 SOA 大行其道

  • 详解Android应用沙盒机制

    前言 Android使用沙盒来保护用户不受恶意应用的侵害,同时也将应用隔离开来,防止他们互相访问其数据,本文主要对Android应用沙盒中的几种技术做简要的总结. 一.Android应用DAC沙盒 稍微了解Android一点的人都知道,Android上的App并不像Linux上的用户程序那样,启动应用的uid默认就是登录用户的uid,除非你使用sudo或者setuid等机制.而是每个Android应用都对应了一个uid,也就是一个用户,通过Linux系统的DAC机制将应用的数据严格隔离开来. A

  • 深入理解Mysql事务隔离级别与锁机制问题

    概述 数据库一般都会并发执行多个事务,多个事务可能会并发的对相同的一批数据进行增删改查操作,可能导致脏读.脏写.不可重复度和幻读.这些问题的本质都是数据库的多事务并发问题,为了解决事务并发问题,数据库设计了事务隔离机制.锁机制.MVCC多版本并发控制隔离机制,用一整套机制来解决多事务并发问题. 事务及其ACID属性 原子性:操作的不可分割: 一致性:数据的一致性: 隔离性:事务之间互不干扰: 持久性:数据的修改时永久的: 并发事务处理带来的问题 脏写:丢失更新,最后的更新覆盖了由其他事务所做的更

随机推荐