浅谈Java源码ConcurrentHashMap
一、记录形式
打算直接把过程写在源码中,会按序进行注释,查阅的时候可以按序号只看注释部分
二、ConcurrentHashMap
直接模拟该类的使用过程,从而一步步看其怎么运作的吧,当然最好还是带着问题一遍思考一遍总结会比较好,我阅读源码的时候带着以下几个问题
- 并发体现在哪里?怎么保证线程安全的
- 怎么扩容的?扩容是怎么保证线程安全的?
- 怎么put的?put是怎么保证线程安全的?
- 用了哪些锁?这些锁的作用是什么?
- 需要留意哪些关键点?
我们最简单地使用方法是怎么样的?
- new一个ConcurrentHashMap对象
- 调用put方法放入k-v对
- 调用get方法获取k-v对
那么很显然,考虑只有在进行修改与更新时需要考虑并发,所以我关注的重点在put方法与扩容上
首先new一个对象时,我们传参入一个size,调用其只有一个参数的构造器
public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); // 1、判断传入的大小是否超过最大值的一半,若超过则取最大值 // 2、若没超过,则调用tableSizeFor // 3、tableSizeFor可以让size为2的倍数,为什么要是2的倍数呢?因为对hash取模的时候 // 用的是位运算,只有size为2的倍数才能这么做 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
// 1、判断传入的大小是否超过最大值的一半,若超过则取最大值 // 2、若没超过,则调用tableSizeFor // 3、tableSizeFor让size为2的倍数,为什么要是2的倍数呢?因为对hash取模的时候 // 用的是位运算,只有size为2的倍数才能这么做
注意,此时并没有创建map数据结构,所以ConcurrentHashMap是懒惰创建的
接着我们调用put方法放入数据,put方法调的putVal
final V putVal(K key, V value, boolean onlyIfAbsent) { // 1、k-v都不能为空,不然抛异常 if (key == null || value == null) throw new NullPointerException(); // 2、获取key的hashcode的hash值 int hash = spread(key.hashCode()); // 3、使用binCount来统计链表有多少个元素的,便于后面判断是否需要变成红黑树 int binCount = 0; // 4、创建tab临时变量,并赋值为table,由于还没初始化,值为null // 这里注意了,table的类型是Node数组,这个Node其实就是Map.Entry<K,V> for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 5、判断tab为空后才进行初始化,初始化完成后重新进入循环 if (tab == null || (n = tab.length) == 0) tab = initTable(); // 6、此时已经初始化好了,可以开始插入数据。 // 这里用tabAt(tab,i)获取tab的第i个下标上的链表指针 // 注意了,(n-1)& hash其实就是hash%n,只有n为2的次方才能这么做 // 位运算可以提升效率 // 7、f就是获取到的第i个位置的链表头指针 // 如果为null说明什么都没有,可以尝试插入元素 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 8、考虑插入那就要考虑并发了,casTab表示用cas方法进行插入 // 插入一个新的Node结点,这个是能够保证线程安全的 // 我们知道保证线程安全除了用cas之外还不够,还需要保证操作对象的可见性 // 在这里是对tab进行操作,tab在前面用table赋值,而table是加了volatile的 // 所以没毛病哈 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // 9、如果f不为空,并且f.hash==MOVED(-1),说明当前这个位置正在被移动 // 说明有线程在扩容,那么当然不能这时候还去插入了,这里调用helpTransfer去帮助扩容 // 注意了,这意味着扩容时是一个一个位置来移动的,每移动一个就将f.hash改成MOVED // 也就意味着如果当前线程想要操作的位置还没有被移动时是可以操作的,这使得并发度更高了 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); // 10、如果f.hash表示没有被移动,且f不为null说明可以这个位置已经有东西了 // 需要对其遍历,并找到合适的位置插入 else { V oldVal = null; // 11、由于要进行插入了,所以得加锁,注意了哈,这里用的synchronized // 并且锁住的是当前位置对象(不一定是链表也可能是树) // 意味着除了当前线程,其他线程都不准操作了哈 // 如果这时候正在扩容的线程扩到这里了,会被阻塞的哈 synchronized (f) { // 确定f为起点 if (tabAt(tab, i) == f) { // 12、判断f.hash是大于0,大于0表示当前这个东西是链表 // 下面执行链表的更新操作 if (fh >= 0) { binCount = 1; // 13、接着就是具体的遍历链表,查找是否对应值是否存在 // 如果遍历完都找不到,那么就在尾部插入新的结点 // 注意了哈这就是尾插 // 14、同时,每遍历一个结点还要累加binCount // 即统计一下当前链表个数,便于后面转红黑树判断 for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } // 13、如果f对应的是树结构,那就执行树的更新方法 else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // 14、前面说了哈,binCount就是链表元素个数,接着就判断是否大于阈值 // 大于则转树,可以看这个阈值TREEIFY_THRESHOLD=8 if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // 15、addCount是让size+1的 // 这里要注意,加法也是分多步的,需要先get才能+,因此为了保证线程安全也是需要用cas的 // 这里加size的方式并不是直接往size上加,因为size是经常修改的,如果经常访问的话效率很低 // 那么做法和LongAdder这一原子累加器类似,用一个CountCell数组,每个线程只操作数组中的 // 某一个Cell,最后汇总即可 addCount(1L, binCount); return null; }
总结一下put的过程
1.先判断map是否创建,没创建则先创建,结构为 Node<K,V>[ ] extend Map.Entry
2.接着找key应该放在哪个位置 i
3.判断该位置是否为空,为空则使用CAS插入一个新的Node
4.不为空则判断当前位置状态是否为MOVED,是则说明当前位置正在被其他线程改动,当前线程需要帮助完成移动
5.不为空且不为MOVED,则判断是链表还是树,分别执行对应的更新方法
6.如果为链表
- 先对链表上锁,用syn
- 则遍历并查找是否已存在
- 找到最后都不存在则直接尾插
- 同时统计链表上的元素
7.判断链表元素是否超过变成树的阈值 8 ,超过则直接变成树,变成树也是加syn
8.使用addCount更新size,更新方式类似LongAdder
三、关键点
- 懒惰加载map
- 对当前位置操作之前需要判断当前位置的存储的内容是否被其他线程移动了,如果被移动则先去帮助完成移动
执行扩容移动的线程是挨个移动每个位置的链表,移动前会先将当前位置的状态用CAS改成MOVED
注意了这个是提升并发度的关键所在
因为插入和移动(扩容)的粒度是细化到每个位置的链表上
意味着扩容和插入可以同时进行
意味着插入时上锁后,扩容线程执行到该位置需要阻塞
而不是直接整个map都锁住
- 当前位置没内容时,通过CAS插入新Node,而操作链表时用的是syn
如果面试问到ConcurrentHashMap中用了什么锁就心中有数了
- 更新size的时候用的是LongAdder类似的方法
有一个CountCell数组,每个线程更新后,对数组中的某个Cell+1
如果没有竞争则只有一个baseCell,对其+1
统计size时汇总即可
再细化一下前面的流程
思考初始化map的时候怎么保证线程安全?防止多个线程同时初始化?
答案在initTable方法中
- 可以看到,sizeCtl如果是负数说明正在扩容或者初始化
- 因此当需要初始化时,就去CAS地改变sizeCtl,将其变为负数
- 哪个线程CAS成功,则可以执行初始化,这就保证了线程安全
- 可以再去看看,sizeCtl是volatile修饰的哈
- 并且SIZECTL是sizeCtl的offset,这些都是原子类类似的东西了
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
get方法就不说了,因为不涉及并发,就是查找而已
感觉差不多了,把put方法搞清楚了,ConcurrentHashMap怎么解决线程安全的也清楚了,并且并发关键点在哪也清楚了
到此这篇关于浅谈Java源码ConcurrentHashMap的文章就介绍到这了,更多相关Java ConcurrentHashMap内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!