RocketMQ 源码分析Broker消息刷盘服务

目录
  • 前言
  • 刷盘服务源码分析
    • CommitRealTimeService刷盘源码分析
    • FlushRealTimeService刷盘源码分析
    • GroupCommitService刷盘源码分析
  • 总结

前言

上篇文章我们介绍了消息刷盘的四种方式,本篇文章我们来介绍Broker是如何实现这四种刷盘方式。

刷盘服务源码分析

Broker中的四种刷盘分别是由CommitRealTimeService,FlushRealTimeService,GroupCommitService将消息从内存中刷到磁盘上的。在了解刷盘这三个刷盘服务之前,我们先来了解MappedFile中下面几个属性

public class MappedFile extends ReferenceResource {
    // 当前写文件位置,即数据被写入MappedFile的最新指针,可能存在ByteBuffer中,没有提交
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    // 数据被写入文件的最新指针(只是被写入文件映射,不一定被刷盘)
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    // 刷盘位置,该指针之前的数据都持久化存储到磁盘中
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    // 文件大小,默认是1042*1024*4(4GB)
    protected int fileSize;
    // 起始偏移量,MappedFile创建时从文件名中解析
    private long fileFromOffset;
}

上面几个属性在MappedFile中的位置如下图所示

上面几个位置关系: flushedPosition ≤ commitedPosition ≤ wrotePosition

CommitRealTimeService刷盘源码分析

CommitRealTimeService类的作用就是将上图中红色的消息(也就是committedPosition -> wrotePosition之间的消息)从直接内存ByteBuffer提交到FileChannel,提交完成并不带表刷盘完成,还需要将FileChannel将数据刷到硬盘中,才正式刷盘完成。CommitRealTimeService核心代码逻辑是在run()中,在run()中是包含一个死循环,死循环中每个200ms提交一次消息,每次最少提交4页的消息,每页大小是4kb,也就是说只有wrotePosition - committedPosition ≥ 4*4kb,消息才会被提交。

// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
public void run() {
	// 死循环
  while (!this.isStopped()) {
      // 消息提交时间间隔,默认200ms
      int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
      // 最少提交页数,默认是4
      int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
      try {
        	// 提交消息
          boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
        	// 等待200ms
          this.waitForRunning(interval);
      } catch (Throwable e) {
          CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
      }
  }
}

上面mappedFileQueue#commit提交最终会调用MappedFile#commit0,commit0代码逻辑如下,将直接内存ByteBuffer中的数据拷贝到fileChannel中。

// org.apache.rocketmq.store.MappedFile#commit0
protected void commit0() {
    // 写指针
    int writePos = this.wrotePosition.get();
    // 最后提交指针
    int lastCommittedPosition = this.committedPosition.get();
    // byteBuffer的数据提交到FileChannel
    if (writePos - lastCommittedPosition > 0) {
        try {
            ByteBuffer byteBuffer = writeBuffer.slice();
            byteBuffer.position(lastCommittedPosition);
            byteBuffer.limit(writePos);
            this.fileChannel.position(lastCommittedPosition);
            this.fileChannel.write(byteBuffer);
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
}

FlushRealTimeService刷盘源码分析

FlushRealTimeService的代码与CommitRealTimeService类似,核心代码也带run()中,run()中也是一个死循环,每隔500ms调用mappedFileQueue#flush刷盘。

// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run
public void run() {
    while (!this.isStopped()) {
        // 定时刷盘时间间隔,默认500ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        // 一次刷盘页数,默认是4页
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
        try {
            if (flushCommitLogTimed) {
                // sleep 500ms
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }
						// 消息刷盘
            CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
        } catch (Throwable e) {
            this.printFlushProgress();
        }
    }
}

mappedFileQueue#flush刷盘最终调用了MappedFile#flush,代码如下所示,可以看到如果MappedFile中有直接内存写缓存,则会调用fileChannel.force(false)刷盘,如果没有写缓存,则消息直接提交到MappedFile的内存映射文件mappedByteBuffer中,因此调用mappedByteBuffer.force()刷盘。

// org.apache.rocketmq.store.MappedFile#flush
public int flush(final int flushLeastPages) {
    if (this.isAbleToFlush(flushLeastPages)) {
        if (this.hold()) {
            int value = getReadPosition();
            try {
                // 如果使用了堆外内存,那么通过fileChannel强制刷盘,这是异步堆外内存的逻辑
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                } else {
                    // 如果没有使用堆外内存,那么通过fileChannel强制刷盘,这是同步或者异步刷盘走的逻辑
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
            // 设置刷盘位置为写入位置
            this.flushedPosition.set(value);
            // 减少对该MappedFile的引用次数
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
}

GroupCommitService刷盘源码分析

同步刷盘GroupCommitService代码与上述代码类似,都继承了ServiceThread,它的核心逻辑在GroupCommitService#run,在run()中也是一个死循环,每隔10ms调用一次doCommit(),虽然这个方法的名字叫doCommit,实际底层也与FlushRealTimeService相同,都是调用的mappedFileQueue#flush,将mappedByteBuffer中的数据刷入磁盘。

// org.apache.rocketmq.store.CommitLog.GroupCommitService#run
public void run() {
    // 死循环
    while (!this.isStopped()) {
        try {
            // 间隔10ms
            this.waitForRunning(10);
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
}

看到这里大家可能会有疑问,为什么同步刷盘也是定时刷盘,这与异步刷盘有什么区别呢?实际上这里有着相当精妙的设计,在上篇文章中我们了解到同步刷盘包括等待消息保存与不等待消息保存。

如果不等待消息保存,则调用了ServiceThread#wakeup方法。

public void wakeup() {
    if (hasNotified.compareAndSet(false, true)) {
        waitPoint.countDown();
    }
}

ServiceThread状态如下所示,如果刷盘线程在10ms等待中,hasNotified属性值为false,hastNotified更新成功,刷盘线程被唤醒,立即停止等待。如果刷盘线程正在执行中,hasNotified更新失败,刷盘线程唤醒失败。只能等待下一次被唤醒或者下一次时间间隔后再次刷盘。

如果是要等待刷盘成功后才返回结果,就要利用到GroupCommitService属性中两个刷盘请求容器

  • requestWrite

同步刷盘请求暂存容器

  • requestsRead

处理中的刷盘请求容器

class GroupCommitService extends FlushCommitLogService {
    // 同步刷盘请求暂存容器
    private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
    // 每次处理刷盘的request容器
    private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
}

提交刷盘请求首先会被放入到requestsWrite容器中,然后再唤醒刷盘线程。

// org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest
public synchronized void putRequest(final GroupCommitRequest request) {
    lock.lock();
    try {
        // 写请求
        this.requestsWrite.add(request);
    } finally {
        lock.unlock();
    }
    // 唤醒当前线程
    this.wakeup();
}

刷盘线程被唤醒或者线程结束等待时都会调用onWaitEnd()方法,交换请求暂存容器和刷盘request容器

// org.apache.rocketmq.store.CommitLog.GroupCommitService#onWaitEnd
@Override
protected void onWaitEnd() {
    this.swapRequests();
}
// org.apache.rocketmq.store.CommitLog.GroupCommitService#swapRequests
// 交换请求暂存容器和刷盘request容器
private void swapRequests() {
    lock.lock();
    try {
        LinkedList<GroupCommitRequest> tmp = this.requestsWrite;
        this.requestsWrite = this.requestsRead;
        this.requestsRead = tmp;
    } finally {
        lock.unlock();
    }
}

线程被唤醒后会调用doCommit(),从下面代码可以发现,不管requestsRead是否包含要处理的刷盘请求,实际都是通过调用mappedFileQueue#flush执行刷盘。

  • 如果requestsRead中包含刷盘请求

则有可能需要调用mappedFileQueue#flush,确保当前请求的消息能够被刷盘,并返回刷盘结果给客户端,如果包含请求,最多会调用两次刷盘方法,确保消息能够正确刷盘。

由于文件是固定大小,有可能刷盘位置在上一个MappedFile中,当前消息请求在最新的MappedFile中,刷盘两次,确保当前消息能够被刷入硬盘中

  • 如果requestsRead中不包含刷盘请求

处理请求容器中包含request,直接调用MappedFileQueue#flush,如果当前消息不在flushPosition所在的mappedFile中,则本次刷盘有可能并不会将当前消息持久化到磁盘中,需要等待下次刷盘。

// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit
private void doCommit() {
    // 如果处理Request不空
    if (!this.requestsRead.isEmpty()) {
        // 遍历处理Request
        for (GroupCommitRequest req : this.requestsRead) {
            // 如果刷盘指针大于刷盘请求中需要刷盘的offSet
            boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            // 消息刷盘
            for (int i = 0; i < 2 && !flushOK; i++) {
                CommitLog.this.mappedFileQueue.flush(0);
                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            }
            // 唤醒客户端
            req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
    } else {
        // 如果消息不等待刷盘成功就返回,则不会提交刷盘请求,调用这个方法
        CommitLog.this.mappedFileQueue.flush(0);
    }
}

总结

本次我们了解了RocketMQ中四种刷盘策略对应的刷盘服务

  • 同步刷盘-等待消息保存到磁盘
  • 同步刷盘-不等待消息保存到磁盘上

上面两个同步刷盘都是由GroupCommitService实现的,由GroupCommitService将MappedByteBuffer消息刷盘到磁盘上

  • 异步刷盘-开启堆外缓存

如果开启了堆外缓存,刷盘时会先由CommitRealTimeService将消息从Bytebuffer拷贝到FileChannel,FlushRealTimeService再将消息从FileChannel刷到磁盘上

  • 异步刷盘-不开启堆外缓存

这种方式也是默认的刷盘方式,由FlushRealTimeService将MappedByteBuffer消息刷盘到磁盘上

以上就是RocketMQ 源码分析Broker消息刷盘服务的详细内容,更多关于RocketMQ Broker刷盘服务的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ Broker如何保存消息源码解析

    目录 前言 消息存储格式总览 CommitLog介绍 MappedFile详解 消息存储格式介绍 DefaultMessageStore介绍 消息存储源码分析 发送消息存储流程 消息预处理阶段 消息保存阶段 消息保存结果处理阶段 总结 前言 前面我们介绍了RocketMQ是如何接收消息的,下面我们来介绍Broker是如何保存消息的. 消息存储格式总览 Broker消息存储主要包括CommitLog,ConsumerQueue和Index三个部分. CommitLog CommitLog主要用于消

  • RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

    目录 前言 ConsumeQueue详解 IndexFile详解 IndexHeader slots槽位 indexes索引数据 实时更新ConsumeQueue与IndexFile源码分析 CommitLogDispatcherBuildConsumeQueue源码分析 CommitLogDispatcherBuildIndex源码分析 IndexFile如何解决Hash冲突 总结 前言 前面我们介绍了消息是如何存储的,消息是如何刷盘的,讲的都是CommitLog是如何存储和刷盘的.虽然Com

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • java使用websocket,并且获取HttpSession 源码分析(推荐)

    一:本文使用范围 此文不仅仅局限于spring boot,普通的spring工程,甚至是servlet工程,都是一样的,只不过配置一些监听器的方法不同而已. 本文经过作者实践,确认完美运行. 二:Spring boot使用websocket 2.1:依赖包 websocket本身是servlet容器所提供的服务,所以需要在web容器中运行,像我们所使用的tomcat,当然,spring boot中已经内嵌了tomcat. websocket遵循了javaee规范,所以需要引入javaee的包 <

  • jQuery插件-jRating评分插件源码分析及使用方法

    该插件被广泛应用于各种需要评分的页面当中,今天作为学习,把源码拿出来分析一下,顺便学习其使用方法. 一.插件使用一览. 复制代码 代码如下: <div> <div>第一个例子</div> <div id="16_1" class="myRating"></div> </div> 复制代码 代码如下: <link href="Script/jRating/jRating.jquer

  • PipedWriter和PipedReader源码分析_动力节点Java学院整理

    PipedWriter和PipedReader源码分析 1. PipedWriter 源码(基于jdk1.7.40)  package java.io; public class PipedWriter extends Writer { // 与PipedWriter通信的PipedReader对象 private PipedReader sink; // PipedWriter的关闭标记 private boolean closed = false; // 构造函数,指定配对的PipedRea

  • Android AsyncTask源码分析

    Android中只能在主线程中进行UI操作,如果是其它子线程,需要借助异步消息处理机制Handler.除此之外,还有个非常方便的AsyncTask类,这个类内部封装了Handler和线程池.本文先简要介绍AsyncTask的用法,然后分析具体实现. 基本用法 AsyncTask是一个抽象类,我们需要创建子类去继承它,并且重写一些方法.AsyncTask接受三个泛型参数: Params: 指定传给任务执行时的参数的类型 Progress: 指定后台任务执行时将任务进度返回给UI线程的参数类型 Re

  • SpringMVC源码解析之消息转换器HttpMessageConverter

    摘要 SpringMVC使用消息转换器实现请求报文和对象.对象和响应报文之间的自动转换 在SpringMVC中,可以使用@RequestBody和@ResponseBody两个注解,分别完成请求报文到对象和对象到响应报文的转换,底层这种灵活的消息转换机制,就是Spring3.x中新引入的HttpMessageConverter即消息转换器机制. #Http请求的抽象 还是回到请求-响应,也就是解析请求体,然后返回响应报文这个最基本的Http请求过程中来.我们知道,在servlet标准中,可以用j

  • Android实例HandlerThread源码分析

    HandlerThread 简介: 我们知道Thread线程是一次性消费品,当Thread线程执行完一个耗时的任务之后,线程就会被自动销毁了.如果此时我又有一 个耗时任务需要执行,我们不得不重新创建线程去执行该耗时任务.然而,这样就存在一个性能问题:多次创建和销毁线程是很耗 系统资源的.为了解这种问题,我们可以自己构建一个循环线程Looper Thread,当有耗时任务投放到该循环线程中时,线程执行耗 时任务,执行完之后循环线程处于等待状态,直到下一个新的耗时任务被投放进来.这样一来就避免了多次

  • Vue 源码分析之 Observer实现过程

    导语: 本文是对 Vue 官方文档深入响应式原理(https://cn.vuejs.org/v2/guide/reactivity.html)的理解,并通过源码还原实现过程. 响应式原理可分为两步,依赖收集的过程与触发-重新渲染的过程.依赖收集的过程,有三个很重要的类,分别是 Watcher.Dep.Observer.本文主要解读 Observer . 这篇文章讲解上篇文章没有覆盖到的 Observer 部分的内容,还是先看官网这张图: Observer 最主要的作用就是实现了上图中touch

  • 花样使用Handler与源码分析

    前几天在跟公司大佬讨论一个问题时,看到他使用Handler的一种方式,旁边的同事在说:以前不是这么用的啊.这个问题引发了我的好奇,虽然当时翻清楚道理了,但是还是想给大家分享一下. Handler在之前也说到过他的使用以及源码分析,而且相信大家都知道如何使用它,最常见的使用方法恐怕就是下面这种了: Handler handler = new Handler(){ @Override public void handleMessage(Message msg) { super.handleMessa

  • Redis源码分析之set 和 sorted set 使用

    目录 set 和 sorted set 前言 set 常见命令 set 的使用场景 看下源码实现 sorted set 常见的命令 使用场景 分析下源码实现 总结 参考 set 和 sorted set 前言 前面在几个文章聊到了 list,string,hash 等结构的实现,这次来聊一下 set 和 sorted set 的细节. set Redis 的 Set 是 String 类型的无序集合,集合成员是唯一的. 底层实现主要用到了两种数据结构 hashtable 和 inset(整数集合

随机推荐