RocketMQ设计之主从复制和读写分离

目录
  • 一、主从复制
  • 二、读写分离

一、主从复制

RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费数据丢失。

RocketMQ采用Broker数据主从复制机制,当消息发送到Master服务器后会将消息同步到Slave服务器,如果Master服务器宕机,消息消费者还可以继续从Slave拉取消息。

消息从Master服务器复制到Slave服务器上,有两种复制方式:同步复制SYNC_MASTER和异步复制ASYNC_MASTER

通过配置文件conf/broker.conf文件配置:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

对brokerRole参数进行设置:

同步复制:Master和Slave都写成功后才返回客户端写成功的状态。

  • 优点:Master服务器出现故障,Slave服务器上有全部数据的备份,很容易恢复到Master服务器。
  • 缺点:由于多了一个同步等待的步骤,增加数据写入延迟,降低系统吞吐量。

异步复制:仅Master服务器写成功即可返回给客户端写成功的状态。

  • 优点:没有同步等待的步骤,低延迟,高吞吐。
  • 缺点:如果Master服务器出现故障,有些数据可能未写入Slave服务器,未同步的数据可能丢失

实际应用中,需要结合业务场景,合理设置刷盘方式和主从复制方式。不建议使用同步刷盘方式,因为它频繁触发写磁盘操作,性能下降很明显。**通常把MasterSlave设置为异步刷盘,同步复制,保证数据不丢失。**这样即使一台服务器出故障,仍然可以保证数据不丢失。

二、读写分离

读写分离机制是高性能、高可用架构中常见的设计,例如Mysql实现读写分离机制,Client只能从Master服务器写数据,可以从Master服务器和Slave服务器都读数据。

RocketMQ的Consumer在拉取消息时,Broker会判断Master服务器的消息堆积量来决定Consumer是否从Slave服务器拉取消息消费。默认一开始从Master服务器拉群消息,如果Master服务器的消息堆积超过物理内存40%,则会返回给Consumer的消息结果并告知Consumer,下次从其他Slave服务器上拉取消息。

RocketMQ 有属于自己的一套读写分离逻辑,会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。

Consumer在向 Broker 发送消息拉取请求时,会根据筛选出来的消息队列,判定是从Master,还是从Slave拉取消息,默认是Master。

Broker 接收到消息消费者拉取请求,在获取本地堆积的消息量后,会计算服务器的消息堆积量是否大于物理内存的一定值,如果是,则标记下次从 Slave服务器拉取,计算 Slave服务器的 Broker Id,并响应给消费者。

Consumer在接收到 Broker的响应后,会把消息队列与建议下一次拉取节点的 Broker Id 关联起来,并缓存在内存中,以便下次拉取消息时,确定从哪个节点发送请求。

public class GetMessageResult {

    private final List<SelectMappedBufferResult> messageMapedList =
        new ArrayList<SelectMappedBufferResult>(100);
    private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);
    private GetMessageStatus status;
    private long nextBeginOffset;
    private long minOffset;
    private long maxOffset;
    private int bufferTotalSize = 0;
    // 标识是否通过Slave拉拉取消息
    private boolean suggestPullingFromSlave = false;
    private int msgCount4Commercial = 0;
}

// 针对消息堆积量过大会切换到Slave进行查询。
// maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量。
// TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,
// 以上逻辑即可算出当前消息堆积量是否大于物理内存的 40%,如果大于则将 suggestPullingFromSlave 设置为 true。

long diff = maxOffsetPy - maxPhyOffsetPulling;
long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE
    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));
getResult.setSuggestPullingFromSlave(diff > memory);
  • 决定消费者是否向从服务器拉取消息消费的值存在 GetMessageResult 类中。
  • suggestPullingFromSlave的默认值为 false,即默认消费者不会消费从服务器,但它会在消费者发送消息拉取请求时,动态改变该值,Broker 接收、处理消费者拉取消息请求。
  • 针对本MessageQueue消息堆积量过大会切换到Slave进行查询,maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量,当前消息堆积量是否大于物理内存的 40%就会切换到Slave进行查询。
public class PullMessageResponseHeader implements CommandCustomHeader {
    // suggestWhichBrokerId标识从哪个broker进行查询
    private Long suggestWhichBrokerId;
    private Long nextBeginOffset;
    private Long minOffset;
    private Long maxOffset;
}

public class PullMessageProcessor implements NettyRequestProcessor {

    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
        throws RemotingCommandException {
        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
        final PullMessageRequestHeader requestHeader =
            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);

        response.setOpaque(request.getOpaque());

        final GetMessageResult getMessageResult =
            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);

        if (getMessageResult != null) {
            response.setRemark(getMessageResult.getStatus().name());
            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());
            responseHeader.setMinOffset(getMessageResult.getMinOffset());
            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());

            // 建议从slave消费消息
            if (getMessageResult.isSuggestPullingFromSlave()) {
                // 从slave查询
                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
            } else {
                // 从master查询
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }

            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {
                case ASYNC_MASTER:
                case SYNC_MASTER:
                    break;
                case SLAVE:
                    // 针对SLAVE需要判断是否可读,不可读的情况下读MASTER
                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);
                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
                    }
                    break;
            }

            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {
                // consume too slow ,redirect to another machine
                if (getMessageResult.isSuggestPullingFromSlave()) {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());
                }
                // consume ok
                else {
                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());
                }
            } else {
                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);
            }
        }

        return response;
    }
}

PullMessageResponseHeadersuggestWhichBrokerId标识某个MessageQueue的消息从具体的brokerId进行查询。
针对Slave不可读的情况会设置为从MASTER_ID进行查询。

public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        // 处理MessageQueue对应拉取的brokerId
        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());

        // 省略相关代码

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }

    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {
        // 保存在pullFromWhichNodeTable对象中
        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (null == suggest) {
            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));
        } else {
            suggest.set(brokerId);
        }
    }
}

Consumer收到拉取响应回来的数据后,会将下次建议拉取的 brokerId缓存起来。

public class PullAPIWrapper {
    private final InternalLogger log = ClientLogger.getLog();
    private final MQClientInstance mQClientFactory;
    private final String consumerGroup;
    private final boolean unitMode;
    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);
    private volatile boolean connectBrokerByUser = false;
    private volatile long defaultBrokerId = MixAll.MASTER_ID;
    private Random random = new Random(System.currentTimeMillis());
    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();

    public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {

        // 查找MessageQueue应该从brokerName的哪个节点查询
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);

        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }

            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }

            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

    public long recalculatePullFromWhichNode(final MessageQueue mq) {
        if (this.isConnectBrokerByUser()) {
            return this.defaultBrokerId;
        }

        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);
        if (suggest != null) {
            return suggest.get();
        }

        return MixAll.MASTER_ID;
    }
}

Consumer拉取消息的时候会从 pullFromWhichNodeTable 中取出拉取 brokerId确定去具体的broker进行查询。

到此这篇关于RocketMQ设计之主从复制和读写分离的文章就介绍到这了,更多相关RocketMQ从复制和读写分离内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ设计之故障规避机制

    NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端.故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题.默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外 MQFaultStrategy类的: public class MQFaultStrategy {     private final static InternalLogger log = ClientLogger.get

  • RocketMQ设计之异步刷盘

    上一篇RocketMQ设计之同步刷盘 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大:当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 RocketMQ默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池 CommitLog的handleDiskFlush方法: public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMe

  • RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    目录 消息队列常见问题处理 分布式事务 什么是分布式事务 常见的分布式事务解决方案 基于MQ实现的分布式事务 本地消息表-最终一致性 MQ事务-最终一致性 RocketMQ中如何处理事务 Kafka中如何处理事务 RabbitMQ中的事务 消息防丢失 生产阶段防止消息丢失 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ中的防丢失措施使用SYNC的发送消息方式,等待broker处理结果 存储阶段 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ

  • RocketMQ设计之同步刷盘

    在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件. CommitLog的handleDiskFlush方法: public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {     // Synchronization flush     if (FlushDiskType.SYNC_FLUSH ==

  • Docker rocketmq部署的实现示例

    目录 准备工作 部署过程 初体验 rocketmq 相关问题 帮助文档 最近学习使用 rocketmq,需要搭建 rocketmq 服务端,本文主要记录 rocketmq 搭建过程以及这个过程踩到的一些坑. 准备工作 在搭建之前,我们需要做一些准备工作,这里我们需要使用 docker 搭建服务,所以需要提前安装 docker.此外,由于 rocketmq 需要部署 broker 与 nameserver ,考虑到分开部署比较麻烦,这里将会使用 docker-compose. rocketmq 架

  • RocketMQ设计之主从复制和读写分离

    目录 一.主从复制 二.读写分离 一.主从复制 RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费数据丢失. RocketMQ采用Broker数据主从复制机制,当消息发送到Master服务器后会将消息同步到Slave服务器,如果Master服务器宕机,消息消费者还可以继续从Slave拉取消息. 消息从Master服务器复制到Slave服务器上,有两种复制方式:同步复制SYNC_MASTER和异步复制ASYN

  • CentOS服务器平台搭建mysql主从复制与读写分离的方法

    本文实例讲述了CentOS服务器搭建mysql主从复制与读写分离的方法.分享给大家供大家参考,具体如下: mysql 主从复制的优点: ① 如果主服务器出现问题, 可以快速切换到从服务器提供的服务,保证高可用性 ② 可以在从服务器上执行查询操作, 降低主服务器的访问压力 ③ 可以在从服务器上执行备份, 以避免备份期间影响主服务器的服务 注意事项: ① server-id必须唯一,一般使用ip的后三位 ② 从库Slave_IO_Running:NO 可能原因:帐号无权限操作 ③ Can't exe

  • Mysql主从复制与读写分离图文详解

    文章思维导图 为什么使用主从复制.读写分离 主从复制.读写分离一般是一起使用的.目的很简单,就是为了提高数据库的并发性能. 你想,假设是单机,读写都在一台MySQL上面完成,性能肯定不高. 如果有三台MySQL,一台mater只负责写操作,两台salve只负责读操作,性能不就能大大提高了吗? 所以主从复制.读写分离就是为了数据库能支持更大的并发. 随着业务量的扩展.如果是单机部署的MySQL,会导致I/O频率过高. 采用主从复制.读写分离可以提高数据库的可用性. 主从复制的原理 ①当Master

  • 详解MySQL主从复制及读写分离

    前言 在企业实际应用中,成熟的业务通常数据量都比较大,而单台MySQL服务器在安全性.高可用性和高并发方面都无法满足实际的需求,我们可以在多台MySQL服务器(Master-Slave)部署 主从复制来实现同步数据,再通过 读写分离来提升数据库的并发负载能力.有点类似于rsync,但是不同的是rsync是对磁盘文件做备份,而mysql主从复制是对数据库中的数据.语句做备份. 一.相关概述 主从复制:主数据库(Master)发送更新事件到从数据库(Slave),从数据库读取更新记录,并执行更新记录

  • 一篇文章看懂MySQL主从复制与读写分离

    目录 引言 一.MySQL主从复制 1.MySQL的复制类型 2.MySQL主从复制的原理 3.MySQL主从复制延迟 二.MySQL读写分离 1.常见的 MySQL 读写分离分 2.MySQL 读写分离原理 三.MySQL主从复制与读写分离配置步骤 1.搭建环境 2.解决需要 3.准备阶段(关闭防火墙及控制访问机制) 4.搭建MySQL主从复制 5.搭建MySQL读写分离步骤 6.测试测试读写分离 总结 引言 企业中的业务通常数据量都比较大,而单台数据库在数据存储.安全性和高并发方面都无法满足

  • Mysql数据库的主从复制与读写分离精讲教程

    目录 前言 一.MySQL主从复制 1.支持的复制类型 2.主从复制的工作过程是基于日志 3.请求方式 4.主从复制的原理 5.MySQL集群和主从复制分别适合在什么场景下使用 6.为什么使用主从复制.读写分离 7.用途及条件 8.mysql主从复制存在的问题 9.MySQL主从复制延迟 二.主从复制的形式 三.读写分离 1.原理 2.为什么要读写分离呢? 3.什么时候要读写分离? 5.目前较为常见的MySQL读写分离 四.案例实施 1.案例环境 2.实验思路(解决需求) 3.准备 4.搭建My

  • MySQL数据库主从复制与读写分离

    目录 一.主从复制 主从复制三线程 主从复制的过程: 主从复制的策略: 主从复制高延迟 二.读写分离 读写分离概念 读写分离原因与场景 总结 一.主从复制        主从复制:在实际的生产中,为了解决Mysql的单点故障以及提高MySQL的整体服务性能,一般都会采用主从复制.即:对数据库中的数据.语句做备份. 主从复制三线程        Mysql的主从复制中主要有三个线程:master(binlog dump thread).slave(I/O thread .SQL thread),M

  • 详解MySQL的主从复制、读写分离、备份恢复

    一.MySQL主从复制 1.简介 我们为什么要用主从复制? 主从复制目的: 可以做数据库的实时备份,保证数据的完整性: 可做读写分离,主服务器只管写,从服务器只管读,这样可以提升整体性能. 原理图: 从上图可以看出,同步是靠log文件同步读写完成的. 2.更改配置文件 两天机器都操作,确保 server-id 要不同,通常主ID要小于从ID.一定注意. # 3306和3307分别代表2台机器 # 打开log-bin,并使server-id不一样 #vim /data/3306/my.cnf lo

  • MySQL5.6 Replication主从复制(读写分离) 配置完整版

    MySQL5.6主从复制(读写分离)教程 1.MySQL5.6开始主从复制有两种方式: 基于日志(binlog): 基于GTID(全局事务标示符). 需要注意的是:GTID方式不支持临时表!所以如果你的业务系统要用到临时表的话就不要考虑这种方式了,至少目前最新版本MySQL5.6.12的GTID复制还是不支持临时表的. 所以本教程主要是告诉大家如何通过日志(binlog)方式做主从复制! 2.MySQL官方提供的MySQL Replication教程: http://dev.mysql.com/

  • 详解MySQL主从复制读写分离搭建

    MySQL主从设置 MySQL主从复制,读写分离的设置非常简单: 修改配置my.cnf文件 master 和 slave设置的差不多: [mysqld] log-bin=mysql-bin server-id=222 log-bin=mysql-bin的意思是:启用二进制日志. server-id=222的意思是设置了服务器的唯一ID,默认是1,一般取IP最后一段,可以写成别的,只要不和其他mysql服务器重复就好. 这里,有的MySQL默认的my.cnf文件引用了/etc/mysql/conf

随机推荐