RocketMQ Broker如何保存消息源码解析

目录
  • 前言
  • 消息存储格式总览
    • CommitLog介绍
    • MappedFile详解
  • 消息存储格式介绍
    • DefaultMessageStore介绍
  • 消息存储源码分析
    • 发送消息存储流程
    • 消息预处理阶段
    • 消息保存阶段
    • 消息保存结果处理阶段
  • 总结

前言

前面我们介绍了RocketMQ是如何接收消息的,下面我们来介绍Broker是如何保存消息的。

消息存储格式总览

Broker消息存储主要包括CommitLog,ConsumerQueue和Index三个部分。

  • CommitLog

CommitLog主要用于消息存储,所有topic的消息按顺序都存储在CommitLog中。

  • ConsumerQueue

ConsumerQueue对应消费队列,消息存储到CommitLog后,会异步转发到ConsumerQueue文件中

  • Index

消息索引,只要存储消息key与offset的关系

CommitLog介绍

CommitLog是消息和消息数据存储的主体,CommitLog存储的文件目录在${user.home}/store/commitlog中,它其实是一个目录,消息并不是直接存储在CommitLog中,而是存储在由20位数字构成的文件中。

MappedFile详解

commitlog文件夹中文件单元是MappedFile,我们可以把MappedFile理解成一个文件管理的工具,如果需要将数据存储到磁盘,或者快速查找数据,都可以通过MappedFile。

每个MappedFile文件大小默认是1GB,文件名是由20位数字构成,文件名其实是MappedFile的起始偏移量,如果偏移量不足20位,则将偏移量的左边补0。上图中MappedFile的文件名是00000000000000000000,它代表的是CommitLog中的第一个文件,由于每个MappedFile文件大小是1GB,因此第二个文件的偏移量为1024*1024*1024(1GB),计算后的结果为1073741824,因此第二个文件的文件名为00000000001073741824,可依此类推其他文件的文件名。

消息存储格式介绍

消息在commitLog中存储的格式如下所示

  • totalSize

消息总长度,4字节

  • magicCode

魔数,4字节,固定值十六进制是0xdaa320a7,10进制是-875286124

  • bodyCRC

消息体crc校验码,4字节

  • queueId

消息队列id,4字节

  • flag

消息标记,RocketMQ不做处理,默认4字节

  • queueOffset

消息在ConsumeQueue文件中的物理偏移量,默认8字节

  • physicalOffset

消息在CommitLog文件中的物理偏移量,默认8字节

  • sysFlag

消息系统标记,例如是否压缩、是否是事务消息等,4字节

  • bornTimestamp

消息生产者调用消息API的时间戳,8字节

  • bornHost

BORNHOST 消息生产者IP和端口号,8字节

  • storeTimestamp

消息存储时间戳,8字节

  • storeHostAddress

STOREHOSTADDRESS 消息存储Broker的IP和端口号,8字节

  • reconsumeTimes

消息重试次数 4字节

  • Prepared Transaction Offset

事务消息偏移量,8字节

  • bodyLength

消息体长度,4字节

  • body

消息体内容,它是变长的,长度为bodyLength中存储的值

  • TopicLength

topicLength表示topic占用的长度,topicLength占用1字节,也就是255,也就是说topic长度最长不能超过255字节

  • Topic

topic是消息主题名称,topic是变长的,实际占用topicLength字节

  • PropertiesLength

propertiesLength表示properties占用的长度,propertiesLength占用2字节,也就是说properties长度最长不超过65536字节

  • Properties

properties是消息属性,properties是变长的,实际占用propertiesLength字节

DefaultMessageStore介绍

Broker保存消息是通过消息存储默认实现类org.apache.rocketmq.store.DefaultMessageStore执行的,它是Broker存储模块中最最最重要的一个类,提供了很多存储文件的API。DefaultMessageStore中和消息存储相关的属性如下所示,

// 消息存储配置
private final MessageStoreConfig messageStoreConfig;
// CommitLog文件的存储实现类
private final CommitLog commitLog;
// 消息队列存储缓存表,key是topic
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
// MappedFile分配服务
private final AllocateMappedFileService allocateMappedFileService;
// 直接内存暂存池
private final TransientStorePool transientStorePool;
// broker状态管理器
private final BrokerStatsManager brokerStatsManager;
// 锁文件
// 目录: ${user.home}/store/lock
private RandomAccessFile lockFile;

消息存储源码分析

发送消息存储流程

发送消息存储的入口函数是DefaultMessageStore#asyncPutMessage,它主要分为下面三步

  • 存储状态校验
  • 校验消息存储服务是否关闭,当前Broker是否是从节点,queue是否可写
  • 消息校验
  • 校验topic名称长度是否超过了127字节和property长度是否超过了32767
  • 将消息保存到commitLog
// org.apache.rocketmq.store.DefaultMessageStore#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
    // 1. 存储状态校验
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }
    // 2. 校验topic名称和property长度
    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }
    // ...
    long beginTime = this.getSystemClock().now();
    // 3. 保存到commitLog
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
    //...
    return putResultFuture;
}

CommitLog#asyncPutMessage保存消息

CommitLog#asyncPutMessage保存消息可以分为三个阶段

  • 消息预处理阶段
  • 消息保存阶段
  • 消息保存结果处理阶段

消息预处理阶段

消息预处理阶段可以分为下面三个步骤

  • 设置消息存储时间戳和消息体CSC32信息
  • 如果是延迟消息,则设置延迟信息

如果是非事务消息或者是提交的事务消息,并且设置了消息的延迟级别,说明当前消息是延迟消息,Broker在处理延迟消息时会将消息投递到名为SCHEDULE_TOPIC_XXXX的Topic。在消息预处理的阶段,会先将当前消息的topic设置为SCHEDULE_TOPIC_XXXX,queueId设置为延迟级别-1,并且将原来的Topic和queueId设置到消息的REAL_TOPICREAL_QID属性中。

  • 设置ip及构建存储消息上下文
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // 1. 设置消息存储时间戳和消息体CSC32信息
    msg.setStoreTimestamp(System.currentTimeMillis());     // 设置消息存储时间
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));					 // 设置消息体CRC32校验值
    // 2. 如果是非事务消息,或者是事务提交消息,判断是否是是否是延迟消息,如果是延迟消息则设置延迟相关信息
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // Delay Delivery
        // 如果延迟级别>0,说明是延迟消息
        if (msg.getDelayTimeLevel() > 0) {
            // 如果大于最大的延迟级别,则取最大的延迟级别
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }
            // 消息topic改成延迟消息topic(SCHEDULE_TOPIC_XXXX)
            topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
            // 延迟topic的queueId:延迟级别-1
            int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
            // 消息属性中设置真实的QueueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
            // 把SCHEDULE_TOPIC_XXXX设置为当前消息的topic,消息先投递到这个队列中
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }
  	// 3. 设置ip并构建存储消息上下文信息
    msg.setBornHostV6Flag(); // 如果producer的ip是IpV6,则设置生产者IpV6 flag
    msg.setStoreHostAddressV6Flag(); // 如果如果broker的ip是IpV6,则设置BrokerIpV6 flag
    PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
    // 构建存消息上下文
    PutMessageContext putMessageContext = new PutMessageContext(/*key值:topic-queueId*/generateKey(putMessageThreadLocal.getKeyBuilder()/*StringBuilder*/, msg));
  	// ... 省略部分代码
}

消息保存阶段

消息保存阶段可以分为如下步骤

  • 获取消息保存锁
  • 获取最新的mappedFile

获取MappedFile调用的是MappedFileQueue中的方法,获取最新的MappedFile

  • 如果最新的mappedFile为空或者已经满了,则创建新的MappedFile
  • 将消息保存的mappedFile中
  • 处理消息保存结果
  • 释放消息保存锁
// org.apache.rocketmq.store.CommitLog#asyncPutMessage
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    // ... 省略部分代码
  	// 1. 消息保存锁,默认是ReentrantLock互斥锁
    putMessageLock.lock();
    try {
        // 2. 获取最新的mappedFile
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        // 3. 如果获取到的mappedFile是null说明之前没有存储消息
        // 如果mappedFile满了,说明需要创建一个新的MappedFile
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0);
        }
				// 如果创建mappedFile失败,则返回异常信息
        if (null == mappedFile) {
            // 创建mappedFile失败
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
        // 4. 将消息保存的mappedFile中
        result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
        // 5. 处理消息保存结果
      	switch (result.getStatus()) {
            case PUT_OK:
                break;
            // mappedFile满了,重新创建mappedFile后再写入消息
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // 创建一个新的文件,然后重新写入
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
								//...
     						// 写消息
                result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                break;
            // ...
        }
    } finally {
      	// 6. 释放锁
        putMessageLock.unlock();
    }
		// ... 省略部分代码
}

上面第4步MappedFile#appendMessage逻辑主要有三步

  • 获取当前写文件位置

如果写指针小于文件大小,则对消息进行追加处理

  • 获取写缓冲
  • 调用AppendMessageCallback的doAppend将消息写到内存缓冲中

回调函数doAppend方法分为单条处理逻辑和批量消息处理逻辑,下面仅展示了单条消息处理逻辑

  • 消息保存完成后会更新当前写文件的位置和消息保存时间戳
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb,
        PutMessageContext putMessageContext) {
    // 获取当前写文件位置
    int currentPos = this.wrotePosition.get();
    // 如果写文件位置小于文件size
    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        // 如果是单条消息
        if (messageExt instanceof MessageExtBrokerInner) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos/*文件长度-当前写位置,可以写的长度*/,(MessageExtBrokerInner) messageExt, putMessageContext);
        }
        //...
        // 更新当前写文件位置和消息保存时间戳
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
}

上面保存消息回调函数中的doAppend实际调用的是CommitLog中内部类DefaultAppendMessageCallback的doAppend方法,这里大致可以分为下面几个步骤

  • 获取消息物理偏移量,并且创建消息id生成器,从topicQueueTable中获取Queue的最大相对便宜量。

消息id的格式如下所示,它由ip,端口和消息偏移量公共构成,长度是16字节,为了保证消息的可读性,返回给应用程序的Id转成了字符串。

消息id这么设计的原因是可以根据消息id快速找到broker的IP,端口,以及消息在的物理偏移量,通过它可以快速找到消息

  • 如果消息长度加上消息结束符(8字节)大于maxBlank,则表示该mappedFile已经没有足够的空间保存该消息了,那么就会将消息结束符写入缓冲中,并返回END_OF_FILE,mappedFile消息结束符如下所示

  • 如果空间足够,将queue的相对偏移量,物理偏移量,sysflag,消息创建时间,消息创建ip,消息保存时间及消息体等按照上面消息格式保存到缓冲中。
  • 创建AppendMessageResult对象并返回,它包括消息追加状态、消息写入物理偏移量、消息写入长度、消息ID生成器、消息开始追加的时间戳、消息队列偏移量、消息开始写入的时间戳等属性。
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset/*消息文件起始偏移量*/, final ByteBuffer byteBuffer, final int maxBlank/*文件可写长度*/,final MessageExtBrokerInner msgInner, PutMessageContext putMessageContext) {
    // 1. 物理offset,文件起始offset+写offset
    long wroteOffset = fileFromOffset + byteBuffer.position();
    // 创建消息id supplier
    Supplier<String> msgIdSupplier = () -> {
        int sysflag = msgInner.getSysFlag();
        int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
        ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
        MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
        msgIdBuffer.clear();//because socketAddress2ByteBuffer flip the buffer
        msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
        return UtilAll.bytes2string(msgIdBuffer.array());
    };
    // topic-ququeId
    String key = putMessageContext.getTopicQueueTableKey();
    // 获取消息queue offset
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    // 如果queueOffset是null,则将其置0
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }
    // 获取写缓冲
    ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
    final int msgLen = preEncodeBuffer.getInt(0);
    // 2. 判断空间是否足够,如果剩余空间不足,则保存TOTAL+MAGICCODE之后,返回BLANK_MAGIC_CODE
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
        this.msgStoreItemMemory.clear();
        // 1 TOTALSIZE 写消息总长度
        this.msgStoreItemMemory.putInt(maxBlank);
        // 2 MAGICCODE 写魔数
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
        return new AppendMessageResult(/*...*/);
    }
    int pos = 4/*totalSize*/ + 4/*magicCode*/ + 4/*bodyCRC*/ + 4/*queueId*/ + 4/*flag*/;
    // set队列的offset,
    preEncodeBuffer.putLong(pos, queueOffset);
    pos += 8;
    // 设置物理offset: 文件起始offset+当前文件写消息的offset
    preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
    int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
    // set 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
    pos += 8 + 4 + 8 + ipLen;
    // 设置存储消息ip地址
    preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
    // 写消息到队列缓冲
    byteBuffer.put(preEncodeBuffer);
    msgInner.setEncodedBuff(null);
  	// 4. 返回消息保存结果
    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    return result;
}

消息保存结果处理阶段

消息保存结果处理阶段主要包括下面三个

  • 提交刷盘请求

如果是同步刷盘,则会创建刷盘请求并返回CompleteFuture,如果是异步刷盘,则会唤醒刷盘服务,然后返回消息保存成功的CompleteFuture

  • 提交消息复制请求

如果是同步复制,则创建消息同步请求然后返回CompleteFuture,如果是异步复制则直接放回消息保存成功的CompleteFuture

  • 合并提交刷盘请求和提交消息复制请求

CompleteFuture#thenCombine是将两个CompleteFuture(提交刷盘请求,提交消息复制请求)组合起来,等提交刷盘请求和提交消息复制请求都执行完了之后再执行后续任务

public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
		// ... 省略部分代码
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    // 1. 提交刷盘请求
    CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
    // 2. 提交复制请求
    CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
    // 3. 合并提交刷盘请求和提交复制请求结果
    return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
        if (flushStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(flushStatus);
        }
        if (replicaStatus != PutMessageStatus.PUT_OK) {
            putMessageResult.setPutMessageStatus(replicaStatus);
        }
        return putMessageResult;
    });
}

总结

消息保存到commitLog实际上是保存到byteBuffer中,消息是在回调结果时根据配置决定同步/异步刷盘以及同步/异步同步到从节点。消息在这个阶段也并不会将消息分发到comsumeQueue以及Index中。

以上就是RocketMQ | 源码分析】Broker是如何保存消息的?的详细内容,更多关于RocketMQ Broker保存消息的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ ConsumeQueue与IndexFile实时更新机制源码解析

    目录 前言 ConsumeQueue详解 IndexFile详解 IndexHeader slots槽位 indexes索引数据 实时更新ConsumeQueue与IndexFile源码分析 CommitLogDispatcherBuildConsumeQueue源码分析 CommitLogDispatcherBuildIndex源码分析 IndexFile如何解决Hash冲突 总结 前言 前面我们介绍了消息是如何存储的,消息是如何刷盘的,讲的都是CommitLog是如何存储和刷盘的.虽然Com

  • RocketMQ 源码分析Broker消息刷盘服务

    目录 前言 刷盘服务源码分析 CommitRealTimeService刷盘源码分析 FlushRealTimeService刷盘源码分析 GroupCommitService刷盘源码分析 总结 前言 上篇文章我们介绍了消息刷盘的四种方式,本篇文章我们来介绍Broker是如何实现这四种刷盘方式. 刷盘服务源码分析 Broker中的四种刷盘分别是由CommitRealTimeService,FlushRealTimeService,GroupCommitService将消息从内存中刷到磁盘上的.在

  • RocketMQ源码解析topic创建机制详解

    目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

  • RocketMQ broker 消息投递流程处理PULL_MESSAGE请求解析

    目录 RocketMq消息处理 1. 处理PULL_MESSAGE请求 2. 获取消息 3. 挂起请求:PullRequestHoldService#suspendPullRequest 3.1 处理挂起请求的线程:PullRequestHoldService 3.2 唤醒请求:PullMessageProcessor#executeRequestWhenWakeup 3.3 消息分发中唤醒consumer请求 总结 RocketMq消息处理 RocketMq消息处理整个流程如下: 本系列Roc

  • JAVA Vector源码解析和示例代码

    第1部分 Vector介绍Vector 是矢量队列,它是JDK1.0版本添加的类.继承于AbstractList,实现了List, RandomAccess, Cloneable这些接口.Vector 继承了AbstractList,实现了List:所以,它是一个队列,支持相关的添加.删除.修改.遍历等功能.Vector 实现了RandmoAccess接口,即提供了随机访问功能.RandmoAccess是java中用来被List实现,为List提供快速访问功能的.在Vector中,我们即可以通过

  • Android图片加载利器之Picasso源码解析

    看到了这里,相信大家对Picasso的使用已经比较熟悉了,本篇博客中将从基本的用法着手,逐步的深入了解其设计原理. Picasso的代码量在众多的开源框架中算得上非常少的一个了,一共只有35个class文件,但是麻雀虽小,五脏俱全.好了下面跟随我的脚步,出发了. 基本用法 Picasso.with(this).load(imageUrl).into(imageView); with(this)方法 public static Picasso with(Context context) { if

  • Android源码解析之截屏事件流程

    今天这篇文章我们主要讲一下Android系统中的截屏事件处理流程.用过android系统手机的同学应该都知道,一般的android手机按下音量减少键和电源按键就会触发截屏事件(国内定制机做个修改的这里就不做考虑了).那么这里的截屏事件是如何触发的呢?触发之后android系统是如何实现截屏操作的呢?带着这两个问题,开始我们的源码阅读流程. 我们知道这里的截屏事件是通过我们的按键操作触发的,所以这里就需要我们从android系统的按键触发模块开始看起,由于我们在不同的App页面,操作音量减少键和电

  • .Net Core中ObjectPool的使用与源码解析

    一.对象池 运用对象池化技术可以显著地提升性能,尤其是当对象的初始化过程代价较大或者频率较高.下面是ObjectPool源码中涉及的几个类.当你看过.Net Core源码很多时,你会发现,微软的开发很多都是这种模式,通过Policy构建Provider,通过Provider创建最终的类. 二.使用 这个组件的目的主要是将对象保存到对象池,用的时候直接去取,不需要重新创建,实现对象的重复利用.但是有个问题,假如对象池中开始没有对象或者取得数量大于对象池中的数量怎么办?在对象池中对象的数量不足时,此

  • 详解ArrayBlockQueue源码解析

    今天要讲的是ArrayBlockQueue,ArrayBlockQueue是JUC提供的线程安全的有界的阻塞队列,一看到Array,第一反应:这货肯定和数组有关,既然是数组,那自然是有界的了,我们先来看看ArrayBlockQueue的基本使用方法,然后再看看ArrayBlockQueue的源码. ArrayBlockQueue基本使用 public static void main(String[] args) throws InterruptedException { ArrayBlocki

  • Android okhttp的启动流程及源码解析

    前言 这篇文章主要讲解了okhttp的主要工作流程以及源码的解析. 什么是OKhttp 简单来说 OkHttp 就是一个客户端用来发送 HTTP 消息并对服务器的响应做出处理的应用层框架. 那么它有什么优点呢? 易使用.易扩展. 支持 HTTP/2 协议,允许对同一主机的所有请求共用同一个 socket 连接. 如果 HTTP/2 不可用, 使用连接池复用减少请求延迟. 支持 GZIP,减小了下载大小. 支持缓存处理,可以避免重复请求. 如果你的服务有多个 IP 地址,当第一次连接失败,OkHt

  • Java源码解析之HashMap的put、resize方法详解

    一.HashMap 简介 HashMap 底层采用哈希表结构 数组加链表加红黑树实现,允许储存null键和null值 数组优点:通过数组下标可以快速实现对数组元素的访问,效率高 链表优点:插入或删除数据不需要移动元素,只需要修改节点引用效率高 二.源码分析 2.1 继承和实现 public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {

  • Java源码解析之超级接口Map

    前言 我们在前面说到的无论是链表还是数组,都有自己的优缺点,数组查询速度很快而插入很慢,链表在插入时表现优秀但查询无力.哈希表则整合了数组与链表的优点,能在插入和查找等方面都有不错的速度.我们之后要分析的HashMap就是基于哈希表实现的,不过在JDK1.8中还引入了红黑树,其性能进一步提升了. 今天我们来说一说超级接口Map. 一.接口Map Map是基于Key-Value的数据格式,并且key值不能重复,每个key对应的value值唯一.Map的key也可以为null,但不可重复. 在看Ma

随机推荐