RocketMQ设计之异步刷盘

上一篇RocketMQ设计之同步刷盘

异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

RocketMQ默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池

CommitLog的handleDiskFlush方法:

public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
    // Synchronization flush
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        if (messageExt.isWaitStoreMsgOK()) {
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
            service.putRequest(request);
            boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            if (!flushOK) {
                log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                    + " client address: " + messageExt.getBornHostString());
                putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
            }
        } else {
            service.wakeup();
        }
    }
    // Asynchronous flush
    else {
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            flushCommitLogService.wakeup();
        } else {
            commitLogService.wakeup();
        }
    }
}

不开启缓冲池:默认不开启,刷盘线程FlushRealTimeService会每间隔500毫秒尝试去刷盘。

class FlushRealTimeService extends FlushCommitLogService {
    private long lastFlushTimestamp = 0;
    private long printTimes = 0;

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");

        while (!this.isStopped()) {
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();

            //每次Flush间隔500毫秒
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            //每次Flush最少4页内存数据(16KB)
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();

               //距离上次刷盘时间阈值为10秒
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();

            boolean printFlushProgress = false;

            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }

            try {
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }

                if (printFlushProgress) {
                    this.printFlushProgress();
                }

                long begin = System.currentTimeMillis();
                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }

        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }

        this.printFlushProgress();

        CommitLog.log.info(this.getServiceName() + " service end");
    }

    @Override
    public String getServiceName() {
        return FlushRealTimeService.class.getSimpleName();
    }

    private void printFlushProgress() {
        // CommitLog.log.info("how much disk fall behind memory, "
        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
    }

    @Override
    public long getJointime() {
        return 1000 * 60 * 5;
    }
}
  • 判断是否超过10秒没刷盘了,如果超过强制刷盘
  • 等待Flush间隔500ms
  • 通过MappedFile刷盘
  • 设置StoreCheckpoint刷盘时间点
  • 超过500ms的刷盘记录日志
  • Broker正常停止前,把内存page中的数据刷盘

开启缓冲池:

class CommitRealTimeService extends FlushCommitLogService {

    private long lastCommitTimestamp = 0;

    @Override
    public String getServiceName() {
        return CommitRealTimeService.class.getSimpleName();
    }

    @Override
    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            //每次提交间隔200毫秒
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();

            //每次提交最少4页内存数据(16KB)
            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();

            //距离上次提交时间阈值为200毫秒
            int commitDataThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();

            long begin = System.currentTimeMillis();
            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
                this.lastCommitTimestamp = begin;
                commitDataLeastPages = 0;
            }

            try {
                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                long end = System.currentTimeMillis();
                if (!result) {
                    this.lastCommitTimestamp = end; // result = false means some data committed.
                    //now wake up flush thread.
                    flushCommitLogService.wakeup();
                }

                if (end - begin > 500) {
                    log.info("Commit data to file costs {} ms", end - begin);
                }
                this.waitForRunning(interval);
            } catch (Throwable e) {
                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
            }
        }

        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.commit(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
        CommitLog.log.info(this.getServiceName() + " service end");
    }
}

RocketMQ申请一块和CommitLog文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由FlushRealTimeService来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。

  • 判断是否超过200毫秒没提交,需要强制提交
  • 提交到MappedFile,此时还未刷盘
  • 然后唤醒刷盘线程
  • 在Broker正常停止前,提交内存page中的数据

到此这篇关于RocketMQ设计之异步刷盘的文章就介绍到这了,更多相关RocketMQ异步刷盘内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RocketMQ设计之故障规避机制

    NameServer为了简化和客户端通信,发现Broker故障时并不会立即通知客户端.故障规避机制就是用来解决当Broker出现故障,Producer不能及时感知而导致消息发送失败的问题.默认不开启,如果开启,消息发送失败的时候会将失败的Broker暂时排除在队列选择列表外 MQFaultStrategy类的: public class MQFaultStrategy {     private final static InternalLogger log = ClientLogger.get

  • RocketMQ设计之同步刷盘

    在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件. CommitLog的handleDiskFlush方法: public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {     // Synchronization flush     if (FlushDiskType.SYNC_FLUSH ==

  • RocketMQ设计之主从复制和读写分离

    目录 一.主从复制 二.读写分离 一.主从复制 RocketMQ为了提高消费的高可用性,避免Broker发生单点故障引起Broker上的消息无法及时消费,同时避免单个机器上硬盘坏损出现消费数据丢失. RocketMQ采用Broker数据主从复制机制,当消息发送到Master服务器后会将消息同步到Slave服务器,如果Master服务器宕机,消息消费者还可以继续从Slave拉取消息. 消息从Master服务器复制到Slave服务器上,有两种复制方式:同步复制SYNC_MASTER和异步复制ASYN

  • RabbitMQ,RocketMQ,Kafka 事务性,消息丢失,消息顺序性和消息重复发送的处理策略

    目录 消息队列常见问题处理 分布式事务 什么是分布式事务 常见的分布式事务解决方案 基于MQ实现的分布式事务 本地消息表-最终一致性 MQ事务-最终一致性 RocketMQ中如何处理事务 Kafka中如何处理事务 RabbitMQ中的事务 消息防丢失 生产阶段防止消息丢失 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ中的防丢失措施使用SYNC的发送消息方式,等待broker处理结果 存储阶段 RabbitMQ中的防丢失措施 Kafka中的防丢失措施 RocketMQ

  • Docker rocketmq部署的实现示例

    目录 准备工作 部署过程 初体验 rocketmq 相关问题 帮助文档 最近学习使用 rocketmq,需要搭建 rocketmq 服务端,本文主要记录 rocketmq 搭建过程以及这个过程踩到的一些坑. 准备工作 在搭建之前,我们需要做一些准备工作,这里我们需要使用 docker 搭建服务,所以需要提前安装 docker.此外,由于 rocketmq 需要部署 broker 与 nameserver ,考虑到分开部署比较麻烦,这里将会使用 docker-compose. rocketmq 架

  • RocketMQ设计之异步刷盘

    上一篇RocketMQ设计之同步刷盘 异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的PAGECACHE,写操作的返回快,吞吐量大:当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入 RocketMQ默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池 CommitLog的handleDiskFlush方法: public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMe

  • MySQL延迟问题和数据刷盘策略流程分析

    一.MySQL复制流程 官方文档流程如下: MySQL延迟问题和数据刷盘策略 1.绝对的延时,相对的同步 2.纯写操作,线上标准配置下,从库压力大于主库,最起码从库有relaylog的写入. 二.MySQL延迟问题分析 1.主库DML请求频繁 原因:主库并发写入数据,而从库为单线程应用日志,很容易造成relaylog堆积,产生延迟. 解决思路:做sharding,打散写请求.考虑升级到MySQL5.7+,开启基于逻辑时钟的并行复制. 2.主库执行大事务 原因:类似主库花费很长时间更新了一张大表,

  • 微服务架构设计RocketMQ基础及环境整合

    目录 概述&选型 单机安装配置 双机主从高可用搭建 启动多个NameServer 和 Broker 重要参数说明 可视化管理平台 SpringBoot整合RocketMQ 引入组件rocketmq-spring-boot-starter 依赖 修改application.yml,添加RocketMQ相关配置 编写消息生产者 MessageProduce 编写消息消费者 MessageConsumer 编写单元测试发送消息 测试 概述&选型 消息队列作为高并发系统的核心组件之一,能够帮助业务

  • RocketMQ消息丢失场景以及解决方法

    既然使用在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱交易的场景下,消息丢失还是很致命的.那么在RocketMQ中存在哪几种消息丢失的场景呢? 先来一张最简单的消费流程图: 上图中大致包含了这么几种场景: 生产者产生消息发送给RocketMQRocketMQ接收到了消息之后,必然需要存到磁盘中,否则断电或宕机之后会造成数据的丢失消费者从RocketMQ中获取消息消费,消费成功之后,整个流程结束 这三种场景都可能会产生消息的丢失,如下图所示: 场景1中生产者将消息发送

  • java开发RocketMQ消息中间件原理基础详解

    RocketMQ 是什么 Github 上关于 RocketMQ 的介绍: RcoketMQ 是一款低延迟.高可靠.可伸缩.易于使用的消息中间件.具有以下特性: 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型 在一个队列中可靠的先进先出(FIFO)和严格的顺序传递 支持拉(pull)和推(push)两种消息模式 单一队列百万消息的堆积能力 支持多种消息协议,如 JMS.MQTT 等 分布式高可用的部署架构,满足至少一次消息传递语义 提供 docker 镜像用于隔离测试和云集群部署 提

  • docker安装rocketMQ和安装过程中出现问题的解决

    目录 拉取rocketmq镜像 创建namesrv 创建单个broker节点 rocketMQ-console服务 本文主要介绍安装rocketMQ4.4.0,主要分为四步,分别为: 1.拉取rocketmq镜像. 2.创建namesrv. 3.创建broker节点,我在这里介绍的是创建单个broker的. 4.rocketMQ-console服务,用于管理rocketMQ的管理界面. 拉取rocketmq镜像 docker命令如下 docker pull rocketmqinc/rocketm

  • 使用Docker容器部署rocketmq单机的全过程

    目录 查询镜像 拉取镜像 创建namesrv数据存储路径 构建namesrv容器 创建broker数据存储路径 创建broker配置文件 broker.conf 内容如下: 构建broker容器 构建rocketmq-console-ng 查询镜像 docker search rocketmq NAME DESCRIPTION STARS OFFICIAL AUTOMATED foxiswho/rocketmq rocketmq 69 rocketmqinc/rocketmq Image rep

随机推荐