RocketMQ broker文件清理源码解析

目录
  • 1. broker 清理文件介绍
    • 1.1 哪些文件需要清理
    • 1.2 RocketMQ文件清理的机制
  • 2. 源码解析
    • 2.1 清理commitlog
    • 2.2 ConsumeQueue 清理
    • 2.3 indexFile 清理
  • 3. 总结

1. broker 清理文件介绍

本系列RocketMQ4.8注释github地址,希望对大家有所帮助,要是觉得可以的话麻烦给点一下Star哈

1.1 哪些文件需要清理

首先我们需要介绍下在RocketMQ中哪些文件需要清理,其实可以想一想,在RocketMQ中哪些文件是一直在往里面写入东西的,最容易想到的就是commitlog 了,因为在一个broker 进程中,所有的普通消息,事务消息,系统消息啥的都往这个commitlog中写,随着时间的越来越长,然后commitlog就会越积攒越多,肯定会有磁盘放不下的那一天,而且我们消息消费完成后,那些被消费完成后的消息其实作用就很小了,可能会有这么一个场景,比如说我线上出现了某个问题,我想看下关于这个问题的消息有没有被消费到,可能你会用到这个消息,但是这种问题一般就是比较紧急的,最近实效的,之前那些消息其实作用就基本没有了,所以就需要清理掉之前的消息。其实不光commitlog需要清理,还需要清理一下ConsumeQueueindexFile , 因为你commitlog里面的消息都被清理了,ConsumeQueueindexFile 再保存着之前的一些数据,就是纯粹浪费空间了。

所以说 broker 文件清理主要是清理commitlog , ConsumeQueue , indexFile

1.2 RocketMQ文件清理的机制

我们介绍下RocketMQ文件清理的机制,RocketMQ默认是清理72小时之前的消息,然后它有几个触发条件, 默认是凌晨4点触发清理, 除非你你这个磁盘空间占用到75% 以上了。在清理commitlog 的时候,并不是一条消息一条消息的清理,拿到所有的MappedFile(抛去现在还在用着的,也就是最后一个) ,然后比对每个MappedFile的最后一条消息的时间,如果是72小时之前的就把MappedFile对应的文件删除了,销毁对应MappedFile,这种情况的话只要你MappedFile 最后一条消息还在存活实效内的话,它就不会清理你这个MappedFile,就算你这个MappedFile 靠前的消息过期了。但是有一种情况它不管你消息超没超过72小时,直接就是删,那就是磁盘空间不足的时候,也就是占了85%以上了,就会立即清理。

清理完成commitlog 之后,就会拿到commitlog中最小的offset ,然后去ConsumeQueueindexFile中把小于offset 的记录删除掉。清理ConsumeQueue 的时候也是遍历MappedFile ,然后它的最后一条消息(unit)小于commitlog中最小的offset 的话,就说明这个MappedFile都小于offset ,因为他们是顺序追加写的,这个MappedFile 就会清理掉,如果你MappedFile 最后一个unit不是小于offset 的话,这个MappedFile 就不删了。

2. 源码解析

我们来看下源码是怎样实现的: 在broker 存储器DefaultMessageStore 启动(start)的时候,会添加几个任务调度,其中有一个就是文件清理的:

private void addScheduleTask() {
    // todo 清理过期文件 每隔10s
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            // todo
            DefaultMessageStore.this.cleanFilesPeriodically();
        }
    }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    ...
}

默认是10s执行一次,可以看到它调用了DefaultMessageStorecleanFilesPeriodically方法:

private void cleanFilesPeriodically() {
    // todo 清除CommitLog文件
    this.cleanCommitLogService.run();
    // todo 清除ConsumeQueue文件
    this.cleanConsumeQueueService.run();
}

2.1 清理commitlog

我们先来看下关于commitlog的清理工作:

public void run() {
    try {
        // todo 删除过期文件
        this.deleteExpiredFiles();
        this.redeleteHangedFile();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

我们看下deleteExpiredFiles 方法的实现:

private void deleteExpiredFiles() {
    int deleteCount = 0;
    // 文件保留时间,如果超过了该时间,则认为是过期文件,可以被删除
    long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    // 删除物理文件的间隔时间,在一次清除过程中,可能需要被删除的文件不止一个,该值指定两次删除文件的间隔时间
    int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    // 在清除过期文件时,如
    //果该文件被其他线程占用(引用次数大于0,比如读取消息),此时会
    //阻止此次删除任务,同时在第一次试图删除该文件时记录当前时间
    //戳,destroyMapedFileIntervalForcibly表示第一次拒绝删除之后能
    //保留文件的最大时间,在此时间内,同样可以被拒绝删除,超过该时
    //间后,会将引用次数设置为负数,文件将被强制删除
    int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    // 指定删除文件的时间点,RocketMQ通过deleteWhen设置每天在
    //固定时间执行一次删除过期文件操作,默认凌晨4点
    boolean timeup = this.isTimeToDelete();
    // todo 检查磁盘空间是否充足,如果磁盘空间不充足,则返回true,表示应该触发过期文件删除操作
    boolean spacefull = this.isSpaceToDelete();
    // 预留手工触发机制,可以通过调用excuteDeleteFilesManualy
    //方法手工触发删除过期文件的操作,目前RocketMQ暂未封装手工触发
    //文件删除的命令
    boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    if (timeup || spacefull || manualDelete) {
        if (manualDelete)
            this.manualDeleteFileSeveralTimes--;
        boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
        log.info("begin to delete before {} hours file. timeup: {} spacefull: {} manualDeleteFileSeveralTimes: {} cleanAtOnce: {}",
            fileReservedTime,
            timeup,
            spacefull,
            manualDeleteFileSeveralTimes,
            cleanAtOnce);
        fileReservedTime *= 60 * 60 * 1000;
        // todo 文件的销毁和删除
        deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
            destroyMapedFileIntervalForcibly, cleanAtOnce);
        if (deleteCount > 0) {
        } else if (spacefull) {
            log.warn("disk space will be full soon, but delete file failed.");
        }
    }
}

开始几个参数,一个是文件保留实效默认是72小时,你可以使用fileReservedTime来配置,一个是删除文件的间隔100ms,再就是强行销毁MappedFile120s(这个为啥要强行销毁,因为它还害怕还有地方用着这个MappedFile,它有个专门的引用计数器,比如说我还有地方要读它的消息,这个时候计数器就是+1的)。

接着就是判断到没到删除的那个时间,它默认是凌晨4点才能删除

private boolean isTimeToDelete() {
    // 什么时候删除,默认是凌晨4点 -> 04
    String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
    // 判断是不是到点了 就是判断的当前小时 是不是等于 默认的删除时间
    if (UtilAll.isItTimeToDo(when)) {
        DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
        return true;
    }
    return false;
}

再接着就是看看空间是不是充足,看看磁盘空间使用占比是什么样子的:

private boolean isSpaceToDelete() {
    // 表示CommitLog文件、ConsumeQueue文件所在磁盘分区的最大使用量,如果超过该值,则需要立即清除过期文件
    double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    // 表示是否需要立即执行清除过期文件的操作
    cleanImmediately = false;
    {
        // 当前CommitLog目录所在的磁盘分区的磁盘使用率,通过File#getTotalSpace方法获取文件所在磁盘分区的总容量,
        //通过File#getFreeSpace方法获取文件所在磁盘分区的剩余容量
        double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(getStorePathPhysic());
        // diskSpaceWarningLevelRatio:默认0.90。如果磁盘分区使用率超过该阈值,将设置磁盘为不可写,此时会拒绝写入新消息
        // 如果当前磁盘分区使用率大于diskSpaceWarningLevelRatio,应该立即启动过期文件删除操作
        if (physicRatio > diskSpaceWarningLevelRatio) {
            // 设置 磁盘不可写
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
            if (diskok) {
                DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
            }
            cleanImmediately = true;
        //diskSpaceCleanForciblyRatio:默认0.85 如果磁盘分区使用超过该阈值,建议立即执行过期文件删除,但不会拒绝写入新消息
        // 如果当前磁盘分区使用率大于diskSpaceCleanForciblyRatio,建议立即执行过期文件清除
        } else if (physicRatio > diskSpaceCleanForciblyRatio) {
            cleanImmediately = true;
        } else {
            // 设置 磁盘可以写入
            boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
            if (!diskok) {
                DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
            }
        }
        // 如果当前磁盘使用率小于diskMaxUsedSpaceRatio,则返回false,表示磁盘使用率正常,
        // 否则返回true,需要执行删除过期文件
        if (physicRatio < 0 || physicRatio > ratio) {
            DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
            return true;
        }
    }
    /**
     * 对consumeQueue 做同样的判断
     */
    ...
    return false;
}

这里其实不光是判断 commitlog的存储区域,后面还有段判断ConsumeQueue的存储区域的,然后与这块逻辑一样,就没有放上。这里就是获取默认的最大使用占比 就是75% ,接着就是看看commitlog 存储的那地方使用了多少了,如果是使用90% 了,就设置runningFlag 说磁盘满了,立即清理设置成true,这个参数设置成true之后,就不会管你消息有没有超过72小时,如果你使用了85% 以上了,也是设置立即清理,如果超过75% 返回true。好了,磁盘占用空间这块我们就看完了。

接着看上面deleteExpiredFiles方法实现,还有一个手动清除的,这块我没有找到哪里有用到的,如果后续找到,会补充上, 判断 到了清理的点 或者是磁盘空间满了 或者是手动删除了,满足一个条件就ok了,如果是立即清除是个true,它这里这个cleanAtOnce 变量就是true了,因为前面那个强制清理是默认开启的。

接着计算了一下fileReservedTime 就是将小时转成了毫秒,为了后面好比对,最后就是调用commitlog的deleteExpiredFile 方法清理了:

/**
 * 删除过期的文件
 * @param expiredTime 过期时间 默认72小时
 * @param deleteFilesInterval 删除文件的间隔 100ms
 * @param intervalForcibly  强制删除 1000 * 120
 * @param cleanImmediately 是不是要一次性清理了
 * @return
 */
public int deleteExpiredFile(
    final long expiredTime,
    final int deleteFilesInterval,
    final long intervalForcibly,
    final boolean cleanImmediately
) {
    // todo
    return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}

可以看到commitlog 对象调用mappedFileQueuedeleteExpiredFileByTime 方法来处理的,这个mappedFileQueue 就是管理了一堆MappedFile:

/**
 * 删除文件
 *
 * 从倒数第二个文件开始遍历,计算文件的最大存活时间,即文件的最后一次更新时间+文件存活时间(默认
 * 72小时),如果当前时间大于文件的最大存活时间或需要强制删除文
 * 件(当磁盘使用超过设定的阈值)时,执行MappedFile#destory方
 * 法,清除MappedFile占有的相关资源,如果执行成功,将该文件加入
 * 待删除文件列表中,最后统一执行File#delete方法将文件从物理磁盘
 * 中删除。
 */
public int deleteExpiredFileByTime(final long expiredTime,
    final int deleteFilesInterval,
    final long intervalForcibly,
    final boolean cleanImmediately) {
    // 拿到mappedFile的引用
    Object[] mfs = this.copyMappedFiles(0);
    if (null == mfs)
        return 0;
    int mfsLength = mfs.length - 1;
    int deleteCount = 0;
    List<MappedFile> files = new ArrayList<MappedFile>();
    if (null != mfs) {
        for (int i = 0; i < mfsLength; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 计算文件的最大存活时间,即文件的最后一次更新时间+文件存活时间(默认72小时)
            long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
            // 如果当前时间大于文件的最大存活时间 或 需要强制删除文件(当磁盘使用超过设定的阈值)时
            if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                // todo 执行destroy方法
                if (mappedFile.destroy(intervalForcibly)) {
                    files.add(mappedFile);
                    deleteCount++;
                    // 一批 最多删除10 个
                    if (files.size() >= DELETE_FILES_BATCH_MAX) {
                        break;
                    }
                    // 删除间隔
                    if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                        try {
                            Thread.sleep(deleteFilesInterval);
                        } catch (InterruptedException e) {
                        }
                    }
                } else {
                    break;
                }
            } else {
                //avoid deleting files in the middle
                break;
            }
        }
    }
    // todo 统一执行File#delete方法将文件从物理磁盘中删除
    deleteExpiredFile(files);
    return deleteCount;
}

这里首先是拿到所有MappedFile的引用,然后就是遍历了,可以看到它这个length是-1的,也就是最后一个MappedFile 是遍历不到的,这个是肯定的,因为最后一个MappedFile肯定是在用着的,如果你来个强制清理,一下清理了,就没法提供服务了。

遍历的时候,拿到对应MappedFile 里面最后一条消息,看看它的写入时间是不是已经过了这个过期时间了,或者直接强制删除,就会执行MappedFile的销毁方法,而且带着销毁时间:

/**
 * 销毁方法
 * @param intervalForcibly 表示拒绝被销毁的最大存活时间
 * @return
 */
public boolean destroy(final long intervalForcibly) {
    // todo
    this.shutdown(intervalForcibly);
    // 清理结束
    if (this.isCleanupOver()) {
        try {
            // 关闭文件通道,
            this.fileChannel.close();
            log.info("close file channel " + this.fileName + " OK");
            long beginTime = System.currentTimeMillis();
            // 删除物理文件
            boolean result = this.file.delete();
            log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                + this.getFlushedPosition() + ", "
                + UtilAll.computeElapsedTimeMilliseconds(beginTime));
        } catch (Exception e) {
            log.warn("close file channel " + this.fileName + " Failed. ", e);
        }
        return true;
    } else {
        log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
            + " Failed. cleanupOver: " + this.cleanupOver);
    }
    return false;
}
public void shutdown(final long intervalForcibly) {
    // 关闭MappedFile
    if (this.available) {
        this.available = false;
        // 初次关闭的时间戳
        this.firstShutdownTimestamp = System.currentTimeMillis();
        // todo 尝试释放资源
        this.release();
    } else if (this.getRefCount() > 0) {
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            this.refCount.set(-1000 - this.getRefCount());
            this.release();
        }
    }
}

这里就不详细说了,其实就是shutdown,然后过了120s后强制把引用清了,之后就是关闭channel,删除对应文件。

接着往下说,就是销毁成功了,会记录删除数量,判断删了多少了,一批是最多删10个的,这块应该是怕影响性能的,你一直删的的话,这东西很消耗磁盘性能,容易影响其他写入,读取功能,如果你销毁失败,直接就停了。最后就是将删除的这些MappedFileMappedFileQueue中删除掉。再回到commitlog clean servicerun方法:

public void run() {
    try {
        // todo 删除过期文件
        this.deleteExpiredFiles();
        // todo
        this.redeleteHangedFile();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

我们deleteExpiredFiles 方法已经介绍完了,然后再来看看第二个方法是干嘛的,这个其实就是判断第一个MappedFile 还可不可用了,如果不可用的话,就删了,这块有可能是上面 deleteExpiredFiles 方法MappedFile销毁失败,然后设置了不可用,但是没有清理掉,所以这块再来善后下:

private void redeleteHangedFile() {
    // redeleteHangedFileInterval间隔 默认1000*120
    int interval = DefaultMessageStore.this.getMessageStoreConfig().getRedeleteHangedFileInterval();
    // 当前时间戳
    long currentTimestamp = System.currentTimeMillis();
    if ((currentTimestamp - this.lastRedeleteTimestamp) > interval) {
        this.lastRedeleteTimestamp = currentTimestamp;
        // 获取强制销毁Mapped文件间隔
        int destroyMapedFileIntervalForcibly =
            DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
        // todo 重新删除第一个MappedFile
        if (DefaultMessageStore.this.commitLog.retryDeleteFirstFile(destroyMapedFileIntervalForcibly)) {
        }
    }
}
public boolean retryDeleteFirstFile(final long intervalForcibly) {
    // 获取到 第一个mappedFile
    MappedFile mappedFile = this.getFirstMappedFile();
    if (mappedFile != null) {
        // 不可用
        if (!mappedFile.isAvailable()) {
            log.warn("the mappedFile was destroyed once, but still alive, " + mappedFile.getFileName());
            // 销毁
            boolean result = mappedFile.destroy(intervalForcibly);
            if (result) {
                log.info("the mappedFile re delete OK, " + mappedFile.getFileName());
                List<MappedFile> tmpFiles = new ArrayList<MappedFile>();
                tmpFiles.add(mappedFile);
                this.deleteExpiredFile(tmpFiles);
            } else {
                log.warn("the mappedFile re delete failed, " + mappedFile.getFileName());
            }
            return result;
        }
    }
    return false;
}

这块就是看第一个MappedFile 还可不可用,不可用的话,就销毁掉。好了commitlog 文件清理源码就解析完成了。接下来看下这个ConsumeQueue与indexFile的清理。

2.2 ConsumeQueue 清理

private void cleanFilesPeriodically() {
    // todo 清除CommitLog文件
    this.cleanCommitLogService.run();
    // todo 清除ConsumeQueue文件
    this.cleanConsumeQueueService.run();
}

DefaultMessageStore.CleanConsumeQueueService#run:

public void run() {
    try {
        // 删除 过期的file
        this.deleteExpiredFiles();
    } catch (Throwable e) {
        DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

接下来DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:

private void deleteExpiredFiles() {
    // 删除间隔 100
    int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
    // 获取 commitLog 的最小offset
    long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    if (minOffset > this.lastPhysicalMinOffset) {
        // 上次 清理 到哪了
        this.lastPhysicalMinOffset = minOffset;
        ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
        // 遍历删除
        for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
            for (ConsumeQueue logic : maps.values()) {
                // 进行删除
                int deleteCount = logic.deleteExpiredFile(minOffset);
                // 间隔
                if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                    try {
                        Thread.sleep(deleteLogicsFilesInterval);
                    } catch (InterruptedException ignored) {
                    }
                }
            }
        }
        // todo 删除 过期的 indexFile
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}

首先是获取删除间隔,然后拿到commitlog中最小的那个offset ,接着就是判断上次清理位置与最小offset 比较,如果offset 大于它上次清理的位置的话,就说明 它得把最小offset之前的清理掉。先是记录最后一次清理的offset是最小offset , 接着就是遍历所有的ConsumeQueue ,调用每个ConsumeQueuedeleteExpiredFile 方法来清理,我们来看下这个方法:

public int deleteExpiredFile(long offset) {
    // 进行销毁 然后得到销毁个数
    int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
    // 纠正最小偏移量
    this.correctMinOffset(offset);
    return cnt;
}

CQ_STORE_UNIT_SIZE 这个就是每个unit 占20个字节,见。

/**
 * 删除过期的file
 * @param offset 最小offset
 * @param unitSize 大小为20字节
 * @return
 */
public int deleteExpiredFileByOffset(long offset, int unitSize) {
    Object[] mfs = this.copyMappedFiles(0);
    List<MappedFile> files = new ArrayList<MappedFile>();
    int deleteCount = 0;
    if (null != mfs) {
        int mfsLength = mfs.length - 1;
        for (int i = 0; i < mfsLength; i++) {
            boolean destroy;
            MappedFile mappedFile = (MappedFile) mfs[i];
            // 最后一个单元位置到这个MappedFile结束,其实就是获取最后一个单元
            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
            if (result != null) {
                // 获取最大的offset
                long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                result.release();
                // 判断是否销毁 如果小于offset 就要销毁
                destroy = maxOffsetInLogicQueue < offset;
                if (destroy) {
                    log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                        + maxOffsetInLogicQueue + ", delete it");
                }
            } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                log.warn("Found a hanged consume queue file, attempting to delete it.");
                destroy = true;
            } else {
                log.warn("this being not executed forever.");
                break;
            }
            // 进行销毁
            if (destroy && mappedFile.destroy(1000 * 60)) {
                files.add(mappedFile);
                deleteCount++;
            } else {
                break;
            }
        }
    }
    // 删除引用
    deleteExpiredFile(files);
    return deleteCount;
}

它的删除跟commitlog 的差不多,只不过commitlog 是根据时间来判断的,它是根据commitlog 的offset 来判断的,判断要不要删除这个MappedFile,如果这个MappedFile最后一个unit 存储的offset 小于 commitlog 最小的offset 的话就要销毁了。接着就是销毁,超时时间是1分钟,最后是删除引用。

2.3 indexFile 清理

最后我们来看下 indexFile的清理工作: DefaultMessageStore.CleanConsumeQueueService#deleteExpiredFiles:

private void deleteExpiredFiles() {
    // 删除间隔 100
    int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
    // 获取 commitLog 的最小offset
    long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    if (minOffset > this.lastPhysicalMinOffset) {
        ...
        // todo 删除 过期的 indexFile
        DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
    }
}
/**
 * 删除 过期文件
 * @param offset 最小的offset 小于这个offset都要删除
 */
public void deleteExpiredFile(long offset) {
    Object[] files = null;
    try {
        // 获取读锁
        this.readWriteLock.readLock().lock();
        if (this.indexFileList.isEmpty()) {
            return;
        }
        // 获取第一个indexFile 的一个offset
        long endPhyOffset = this.indexFileList.get(0).getEndPhyOffset();
        if (endPhyOffset < offset) {
            files = this.indexFileList.toArray();
        }
    } catch (Exception e) {
        log.error("destroy exception", e);
    } finally {
        this.readWriteLock.readLock().unlock();
    }
    if (files != null) {
        // 找到需要删除的indexFile
        List<IndexFile> fileList = new ArrayList<IndexFile>();
        for (int i = 0; i < (files.length - 1); i++) {
            IndexFile f = (IndexFile) files[i];
            if (f.getEndPhyOffset() < offset) {
                fileList.add(f);
            } else {
                break;
            }
        }
        // 删除
        this.deleteExpiredFile(fileList);
    }
}

可以看到,先是拿第一个indexFile 看看有没有小于commitlog 最小offset 的情况发生,这里也是拿的indexFile最后一个offset 做的对比,因为这块也是按照offset大小 前后顺序处理的,最后一个的offest 肯定是这个indexFile中最大的了,如果第一个indexFile满足了的话,就会拿到所有引用,然后遍历找出符合条件的indexFile, 调用deleteExpiredFile方法遍历销毁:

private void deleteExpiredFile(List<IndexFile> files) {
    if (!files.isEmpty()) {
        try {
            this.readWriteLock.writeLock().lock();
            for (IndexFile file : files) {
                // 销毁
                boolean destroyed = file.destroy(3000);
                // 从index 集合中移除
                destroyed = destroyed && this.indexFileList.remove(file);
                if (!destroyed) {
                    log.error("deleteExpiredFile remove failed.");
                    break;
                }
            }
        } catch (Exception e) {
            log.error("deleteExpiredFile has exception.", e);
        } finally {
            this.readWriteLock.writeLock().unlock();
        }
    }
}

这里就是遍历销毁,然后移除对这个indexFile管理。

3. 总结

本文主要是介绍了RocketMQ broker 消息清理机制,介绍了主要清理哪些文件 :commitlog ,ConsumeQueue,indexFile

接着就是介绍了什么时候触发清理,比如说凌晨4点 ,磁盘没满85% 以上的话,就是清理72小时之前的,如果是满了85%就除了还在用着的那个先清10个看看, 还有就是磁盘使用空间75% 以上也是会触发的, 低于85 % 清理72小时之前的,高于85% 先清理10个文件看看,这是commitlog的清理机制,关于ConsumeQueue与indexFile的话,就是与commitlog中最小的那个offset 有关了,小于commitlog中最小offset 的那些还是要清理掉的。 最后就是分别解析了一下commitlog 文件清理,ConsumeQueue 文件清理与indexFile 文件清理。

参考

RocketMQ源码分析专栏

以上就是RocketMQ broker文件清理源码解析的详细内容,更多关于RocketMQ broker文件清理的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ消息生产者是如何选择Broker示例详解

    目录 前言 从NameServer查询Topic信息 如何选择Broker 小结 前言 在RocketMQ中为,我们创建消息生产者时,只需要设置NameServer地址,消息就能正确地发送到对应的Broker中,那么RocketMQ消息生产者是如何找到Broker的呢?如果有多个Broker实例,那么消息发送是如何选择发送到哪个Broker的呢? 从NameServer查询Topic信息 通过Debug消息发送send()方法,我们最终可以定位到DefaultMQProducerImpl.sen

  • RocketMQ 源码分析Broker消息刷盘服务

    目录 前言 刷盘服务源码分析 CommitRealTimeService刷盘源码分析 FlushRealTimeService刷盘源码分析 GroupCommitService刷盘源码分析 总结 前言 上篇文章我们介绍了消息刷盘的四种方式,本篇文章我们来介绍Broker是如何实现这四种刷盘方式. 刷盘服务源码分析 Broker中的四种刷盘分别是由CommitRealTimeService,FlushRealTimeService,GroupCommitService将消息从内存中刷到磁盘上的.在

  • RocketMQ Broker如何保存消息源码解析

    目录 前言 消息存储格式总览 CommitLog介绍 MappedFile详解 消息存储格式介绍 DefaultMessageStore介绍 消息存储源码分析 发送消息存储流程 消息预处理阶段 消息保存阶段 消息保存结果处理阶段 总结 前言 前面我们介绍了RocketMQ是如何接收消息的,下面我们来介绍Broker是如何保存消息的. 消息存储格式总览 Broker消息存储主要包括CommitLog,ConsumerQueue和Index三个部分. CommitLog CommitLog主要用于消

  • RocketMQ生产者如何规避故障Broker方式详解

    目录 前言 收集故障Broker 选择Broker 小结 前言 在消息发送过程中,生产者从NameServer中获取到了指定Topic对应的Broker信息,在同步发送消息的代码中,如果消息发送失败,生产者默认是会重试两次的.那么Broker有问题的情况下,无论重试多少次都是没有意义的,消息生产者是如何规避这些故障Broker的呢? 收集故障Broker 我们在所有的发送消息源码中都可以找到这样一段代码,可在DefaultMQProducerImpl类中查找: this.updateFaultI

  • RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    目录 RocketMq消息处理 1. 处理PULL_MESSAGE请求 2. 获取消息 3. 挂起请求:PullRequestHoldService#suspendPullRequest 3.1 处理挂起请求的线程:PullRequestHoldService 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup 3.3 消息分发中唤醒consumer请求 总结 RocketMq消息处理 RocketMq消息处理整个流程如下: 本系列Roc

  • 浅谈ASP.NET Core静态文件处理源码探究

    前言 静态文件(如 HTML.CSS.图像和 JavaScript)等是Web程序的重要组成部分.传统的ASP.NET项目一般都是部署在IIS上,IIS是一个功能非常强大的服务器平台,可以直接处理接收到的静态文件处理而不需要经过应用程序池处理,所以很多情况下对于静态文件的处理程序本身是无感知的.ASP.NET Core则不同,作为Server的Kestrel服务是宿主到程序上的,由宿主运行程序启动Server然后可以监听请求,所以通过程序我们直接可以处理静态文件相关.静态文件默认存储到项目的ww

  • MyBatis框架底层的执行原理源码解析

    目录 1.前言 2.案例项目源码 3.MyBatis源码解析底层执行原理 3.1 读取mybatis配置文件创建出SqlSeesionFactory对象 3.2 通过SqlSeesionFactory对象进而创建出SqlSession对象 3.3 通过SqlSession的getMapper获取到接口代理对象 3.4 通过mapper接口的代理对象执行CRUD 1.前言 MyBatis框架大家肯定都用过的,废话我就不再多说了,这篇文章就给大家分享一下有关MyBatis框架底层的执行原理吧(Deb

  • go slice 扩容实现原理源码解析

    目录 正文 扩容的示例 实际扩容倍数 growslice 实现 growslice 实现步骤 growslice 源码剖析 总结 正文 基于 Go 1.19. go 的切片我们都知道可以自动地进行扩容,具体来说就是在切片的容量容纳不下新的元素的时候, 底层会帮我们为切片的底层数组分配更大的内存空间,然后把旧的切片的底层数组指针指向新的内存中: 目前网上一些关于扩容倍数的文章都是基于相对旧版本的 Go 的,新版本中,现在切片扩容的时候并不是那种准确的小于多少容量的时候就 2 倍扩容, 大于多少容量

  • 关于feign接口动态代理源码解析

    目录 feign接口动态代理源码解析 @FeignClinet代理类注册 feign源码解析 Feign的作用 源码及流程介绍 feign接口动态代理源码解析 @FeignClinet 代理类注册 @FeignClinet 通过动态代理实现的底层http调用,既然是动态代理,必然存在创建代理类的过程.如Proxy.newProxyInstance或者 CGlib org.springframework.cloud.openfeign 的代理类注册实现如下. 首先,org.springframew

  • Spring的Model 和 Map的原理源码解析

    Model 和 Map 为什么在Model和Map中放值传入后会出现在request的上面. 9.1.源码解析 准备测试代码 @GetMapping("/goto") public String go(HttpServletRequest request, Map<String,Object> map, Model model){ request.setAttribute("msg","传过来...."); map.put("

  • React Hydrate原理源码解析

    目录 引言 Demo ReactDOM.render ReactDOM.hydrate hydrate 过程 事件绑定 hydrate 源码剖析 beginWork HostRoot Fiber HostComponent HostText Fiber tryToClaimNextHydratableInstance completeUnitOfWork popHydrationState prepareToHydrateHostInstance prepareToHydrateHostText

  • axios拦截器工作方式及原理源码解析

    目录 axios 拦截器的配置方式 use() 方法的定义 拦截器如何执行 拦截器回调方法的添加顺序 同步执行请求拦截器(顺序执行) 异步执行请求拦截器(同时执行) Q&A 拦截器是如何工作的 拦截器的执行顺序 同步&异步 axios 拦截器的配置方式 本文所用 axios 版本号为:1.3.2. axios 中有两种拦截器: axios.interceptors.request.use(onFulfilled, onRejected, options):配置请求拦截器. onFulfil

  • vue parseHTML 函数拿到返回值后的处理源码解析

    目录 引言 parseStartTag函数返回值 handleStartTag源码 tagName 及unarySlash 调用parser钩子函数 引言 继上篇文章: parseHTML 函数源码解析 var startTagMatch = parseStartTag(); if (startTagMatch) { handleStartTag(startTagMatch); if (shouldIgnoreFirstNewline(startTagMatch.tagName, html))

  • async-validator实现原理源码解析

    目录 async-validator 介绍 async-validator 基本使用 async-validator 源码分析 async-validator 源码-构造函数 async-validator 源码-validate方法 async-validator 源码-register方法 总结 最后 async-validator 介绍 async-validator是异步的验证数据是否合法有效的工具, 内置了不同数据类型的常见验证规则. 在需要对数据进行验证的场景中,都可以考虑使用asy

  • .properties文件读取及占位符${...}替换源码解析

    前言 我们在开发中常遇到一种场景,Bean里面有一些参数是比较固定的,这种时候通常会采用配置的方式,将这些参数配置在.properties文件中,然后在Bean实例化的时候通过Spring将这些.properties文件中配置的参数使用占位符"${}"替换的方式读入并设置到Bean的相应参数中. 这种做法最典型的就是JDBC的配置,本文就来研究一下.properties文件读取及占位符"${}"替换的源码,首先从代码入手,定义一个DataSource,模拟一下JDB

随机推荐