RocketMQ 源码分析Broker消息刷盘服务
目录
- 前言
- 刷盘服务源码分析
- CommitRealTimeService刷盘源码分析
- FlushRealTimeService刷盘源码分析
- GroupCommitService刷盘源码分析
- 总结
前言
上篇文章我们介绍了消息刷盘的四种方式,本篇文章我们来介绍Broker是如何实现这四种刷盘方式。
刷盘服务源码分析
Broker中的四种刷盘分别是由CommitRealTimeService,FlushRealTimeService,GroupCommitService将消息从内存中刷到磁盘上的。在了解刷盘这三个刷盘服务之前,我们先来了解MappedFile中下面几个属性
public class MappedFile extends ReferenceResource { // 当前写文件位置,即数据被写入MappedFile的最新指针,可能存在ByteBuffer中,没有提交 protected final AtomicInteger wrotePosition = new AtomicInteger(0); // 数据被写入文件的最新指针(只是被写入文件映射,不一定被刷盘) protected final AtomicInteger committedPosition = new AtomicInteger(0); // 刷盘位置,该指针之前的数据都持久化存储到磁盘中 private final AtomicInteger flushedPosition = new AtomicInteger(0); // 文件大小,默认是1042*1024*4(4GB) protected int fileSize; // 起始偏移量,MappedFile创建时从文件名中解析 private long fileFromOffset; }
上面几个属性在MappedFile中的位置如下图所示
上面几个位置关系: flushedPosition ≤ commitedPosition ≤ wrotePosition
CommitRealTimeService刷盘源码分析
CommitRealTimeService类的作用就是将上图中红色的消息(也就是committedPosition -> wrotePosition之间的消息)从直接内存ByteBuffer提交到FileChannel,提交完成并不带表刷盘完成,还需要将FileChannel将数据刷到硬盘中,才正式刷盘完成。CommitRealTimeService核心代码逻辑是在run()中,在run()中是包含一个死循环,死循环中每个200ms提交一次消息,每次最少提交4页的消息,每页大小是4kb,也就是说只有wrotePosition - committedPosition ≥ 4*4kb
,消息才会被提交。
// org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run public void run() { // 死循环 while (!this.isStopped()) { // 消息提交时间间隔,默认200ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog(); // 最少提交页数,默认是4 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages(); try { // 提交消息 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages); // 等待200ms this.waitForRunning(interval); } catch (Throwable e) { CommitLog.log.error(this.getServiceName() + " service has exception. ", e); } } }
上面mappedFileQueue#commit
提交最终会调用MappedFile#commit0
,commit0代码逻辑如下,将直接内存ByteBuffer中的数据拷贝到fileChannel中。
// org.apache.rocketmq.store.MappedFile#commit0 protected void commit0() { // 写指针 int writePos = this.wrotePosition.get(); // 最后提交指针 int lastCommittedPosition = this.committedPosition.get(); // byteBuffer的数据提交到FileChannel if (writePos - lastCommittedPosition > 0) { try { ByteBuffer byteBuffer = writeBuffer.slice(); byteBuffer.position(lastCommittedPosition); byteBuffer.limit(writePos); this.fileChannel.position(lastCommittedPosition); this.fileChannel.write(byteBuffer); this.committedPosition.set(writePos); } catch (Throwable e) { log.error("Error occurred when commit data to FileChannel.", e); } } }
FlushRealTimeService刷盘源码分析
FlushRealTimeService的代码与CommitRealTimeService类似,核心代码也带run()中,run()中也是一个死循环,每隔500ms调用mappedFileQueue#flush
刷盘。
// org.apache.rocketmq.store.CommitLog.FlushRealTimeService#run public void run() { while (!this.isStopped()) { // 定时刷盘时间间隔,默认500ms int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog(); // 一次刷盘页数,默认是4页 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages(); try { if (flushCommitLogTimed) { // sleep 500ms Thread.sleep(interval); } else { this.waitForRunning(interval); } // 消息刷盘 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages); } catch (Throwable e) { this.printFlushProgress(); } } }
mappedFileQueue#flush
刷盘最终调用了MappedFile#flush
,代码如下所示,可以看到如果MappedFile中有直接内存写缓存,则会调用fileChannel.force(false)
刷盘,如果没有写缓存,则消息直接提交到MappedFile的内存映射文件mappedByteBuffer中,因此调用mappedByteBuffer.force()
刷盘。
// org.apache.rocketmq.store.MappedFile#flush public int flush(final int flushLeastPages) { if (this.isAbleToFlush(flushLeastPages)) { if (this.hold()) { int value = getReadPosition(); try { // 如果使用了堆外内存,那么通过fileChannel强制刷盘,这是异步堆外内存的逻辑 if (writeBuffer != null || this.fileChannel.position() != 0) { this.fileChannel.force(false); } else { // 如果没有使用堆外内存,那么通过fileChannel强制刷盘,这是同步或者异步刷盘走的逻辑 this.mappedByteBuffer.force(); } } catch (Throwable e) { log.error("Error occurred when force data to disk.", e); } // 设置刷盘位置为写入位置 this.flushedPosition.set(value); // 减少对该MappedFile的引用次数 this.release(); } else { log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get()); this.flushedPosition.set(getReadPosition()); } } return this.getFlushedPosition(); }
GroupCommitService刷盘源码分析
同步刷盘GroupCommitService代码与上述代码类似,都继承了ServiceThread
,它的核心逻辑在GroupCommitService#run
,在run()中也是一个死循环,每隔10ms调用一次doCommit()
,虽然这个方法的名字叫doCommit,实际底层也与FlushRealTimeService相同,都是调用的mappedFileQueue#flush
,将mappedByteBuffer
中的数据刷入磁盘。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#run public void run() { // 死循环 while (!this.isStopped()) { try { // 间隔10ms this.waitForRunning(10); this.doCommit(); } catch (Exception e) { CommitLog.log.warn(this.getServiceName() + " service has exception. ", e); } } }
看到这里大家可能会有疑问,为什么同步刷盘也是定时刷盘,这与异步刷盘有什么区别呢?实际上这里有着相当精妙的设计,在上篇文章中我们了解到同步刷盘包括等待消息保存与不等待消息保存。
如果不等待消息保存,则调用了ServiceThread#wakeup
方法。
public void wakeup() { if (hasNotified.compareAndSet(false, true)) { waitPoint.countDown(); } }
ServiceThread状态如下所示,如果刷盘线程在10ms等待中,hasNotified属性值为false,hastNotified更新成功,刷盘线程被唤醒,立即停止等待。如果刷盘线程正在执行中,hasNotified更新失败,刷盘线程唤醒失败。只能等待下一次被唤醒或者下一次时间间隔后再次刷盘。
如果是要等待刷盘成功后才返回结果,就要利用到GroupCommitService属性中两个刷盘请求容器
- requestWrite
同步刷盘请求暂存容器
- requestsRead
处理中的刷盘请求容器
class GroupCommitService extends FlushCommitLogService { // 同步刷盘请求暂存容器 private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>(); // 每次处理刷盘的request容器 private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>(); }
提交刷盘请求首先会被放入到requestsWrite容器中,然后再唤醒刷盘线程。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#putRequest public synchronized void putRequest(final GroupCommitRequest request) { lock.lock(); try { // 写请求 this.requestsWrite.add(request); } finally { lock.unlock(); } // 唤醒当前线程 this.wakeup(); }
刷盘线程被唤醒或者线程结束等待时都会调用onWaitEnd()
方法,交换请求暂存容器和刷盘request容器
// org.apache.rocketmq.store.CommitLog.GroupCommitService#onWaitEnd @Override protected void onWaitEnd() { this.swapRequests(); } // org.apache.rocketmq.store.CommitLog.GroupCommitService#swapRequests // 交换请求暂存容器和刷盘request容器 private void swapRequests() { lock.lock(); try { LinkedList<GroupCommitRequest> tmp = this.requestsWrite; this.requestsWrite = this.requestsRead; this.requestsRead = tmp; } finally { lock.unlock(); } }
线程被唤醒后会调用doCommit(),从下面代码可以发现,不管requestsRead是否包含要处理的刷盘请求,实际都是通过调用mappedFileQueue#flush
执行刷盘。
- 如果requestsRead中包含刷盘请求
则有可能需要调用mappedFileQueue#flush
,确保当前请求的消息能够被刷盘,并返回刷盘结果给客户端,如果包含请求,最多会调用两次刷盘方法,确保消息能够正确刷盘。
由于文件是固定大小,有可能刷盘位置在上一个MappedFile中,当前消息请求在最新的MappedFile中,刷盘两次,确保当前消息能够被刷入硬盘中
- 如果requestsRead中不包含刷盘请求
处理请求容器中包含request,直接调用MappedFileQueue#flush
,如果当前消息不在flushPosition所在的mappedFile中,则本次刷盘有可能并不会将当前消息持久化到磁盘中,需要等待下次刷盘。
// org.apache.rocketmq.store.CommitLog.GroupCommitService#doCommit private void doCommit() { // 如果处理Request不空 if (!this.requestsRead.isEmpty()) { // 遍历处理Request for (GroupCommitRequest req : this.requestsRead) { // 如果刷盘指针大于刷盘请求中需要刷盘的offSet boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); // 消息刷盘 for (int i = 0; i < 2 && !flushOK; i++) { CommitLog.this.mappedFileQueue.flush(0); flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset(); } // 唤醒客户端 req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT); } } else { // 如果消息不等待刷盘成功就返回,则不会提交刷盘请求,调用这个方法 CommitLog.this.mappedFileQueue.flush(0); } }
总结
本次我们了解了RocketMQ中四种刷盘策略对应的刷盘服务
- 同步刷盘-等待消息保存到磁盘
- 同步刷盘-不等待消息保存到磁盘上
上面两个同步刷盘都是由GroupCommitService实现的,由GroupCommitService将MappedByteBuffer消息刷盘到磁盘上
- 异步刷盘-开启堆外缓存
如果开启了堆外缓存,刷盘时会先由CommitRealTimeService将消息从Bytebuffer拷贝到FileChannel,FlushRealTimeService再将消息从FileChannel刷到磁盘上
- 异步刷盘-不开启堆外缓存
这种方式也是默认的刷盘方式,由FlushRealTimeService将MappedByteBuffer消息刷盘到磁盘上
以上就是RocketMQ 源码分析Broker消息刷盘服务的详细内容,更多关于RocketMQ Broker刷盘服务的资料请关注我们其它相关文章!