浅谈Java源码ConcurrentHashMap

一、记录形式

打算直接把过程写在源码中,会按序进行注释,查阅的时候可以按序号只看注释部分

二、ConcurrentHashMap

直接模拟该类的使用过程,从而一步步看其怎么运作的吧,当然最好还是带着问题一遍思考一遍总结会比较好,我阅读源码的时候带着以下几个问题

  • 并发体现在哪里?怎么保证线程安全的
  • 怎么扩容的?扩容是怎么保证线程安全的?
  • 怎么put的?put是怎么保证线程安全的?
  • 用了哪些锁?这些锁的作用是什么?
  • 需要留意哪些关键点?

我们最简单地使用方法是怎么样的?

  • new一个ConcurrentHashMap对象
  • 调用put方法放入k-v对
  • 调用get方法获取k-v对

那么很显然,考虑只有在进行修改与更新时需要考虑并发,所以我关注的重点在put方法与扩容上

首先new一个对象时,我们传参入一个size,调用其只有一个参数的构造器

public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // 1、判断传入的大小是否超过最大值的一半,若超过则取最大值
        // 2、若没超过,则调用tableSizeFor
        // 3、tableSizeFor可以让size为2的倍数,为什么要是2的倍数呢?因为对hash取模的时候
        // 用的是位运算,只有size为2的倍数才能这么做
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }
// 1、判断传入的大小是否超过最大值的一半,若超过则取最大值
// 2、若没超过,则调用tableSizeFor
// 3、tableSizeFor让size为2的倍数,为什么要是2的倍数呢?因为对hash取模的时候
// 用的是位运算,只有size为2的倍数才能这么做

注意,此时并没有创建map数据结构,所以ConcurrentHashMap是懒惰创建的

接着我们调用put方法放入数据,put方法调的putVal

final V putVal(K key, V value, boolean onlyIfAbsent) {
   		// 1、k-v都不能为空,不然抛异常
        if (key == null || value == null) throw new NullPointerException();
        // 2、获取key的hashcode的hash值
        int hash = spread(key.hashCode());
        // 3、使用binCount来统计链表有多少个元素的,便于后面判断是否需要变成红黑树
        int binCount = 0;
        // 4、创建tab临时变量,并赋值为table,由于还没初始化,值为null
        // 这里注意了,table的类型是Node数组,这个Node其实就是Map.Entry<K,V>
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            // 5、判断tab为空后才进行初始化,初始化完成后重新进入循环
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 6、此时已经初始化好了,可以开始插入数据。
            // 这里用tabAt(tab,i)获取tab的第i个下标上的链表指针
            // 注意了,(n-1)& hash其实就是hash%n,只有n为2的次方才能这么做
            // 位运算可以提升效率
            // 7、f就是获取到的第i个位置的链表头指针
            // 如果为null说明什么都没有,可以尝试插入元素
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            	// 8、考虑插入那就要考虑并发了,casTab表示用cas方法进行插入
            	// 插入一个新的Node结点,这个是能够保证线程安全的
            	// 我们知道保证线程安全除了用cas之外还不够,还需要保证操作对象的可见性
            	// 在这里是对tab进行操作,tab在前面用table赋值,而table是加了volatile的
            	// 所以没毛病哈
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            // 9、如果f不为空,并且f.hash==MOVED(-1),说明当前这个位置正在被移动
            // 说明有线程在扩容,那么当然不能这时候还去插入了,这里调用helpTransfer去帮助扩容
            // 注意了,这意味着扩容时是一个一个位置来移动的,每移动一个就将f.hash改成MOVED
            // 也就意味着如果当前线程想要操作的位置还没有被移动时是可以操作的,这使得并发度更高了
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            // 10、如果f.hash表示没有被移动,且f不为null说明可以这个位置已经有东西了
            // 需要对其遍历,并找到合适的位置插入
            else {
                V oldVal = null;
                // 11、由于要进行插入了,所以得加锁,注意了哈,这里用的synchronized
                // 并且锁住的是当前位置对象(不一定是链表也可能是树)
                // 意味着除了当前线程,其他线程都不准操作了哈
                // 如果这时候正在扩容的线程扩到这里了,会被阻塞的哈
                synchronized (f) {
                	// 确定f为起点
                    if (tabAt(tab, i) == f) {
                    	// 12、判断f.hash是大于0,大于0表示当前这个东西是链表
                    	// 下面执行链表的更新操作
                        if (fh >= 0) {
                            binCount = 1;
                            // 13、接着就是具体的遍历链表,查找是否对应值是否存在
                            // 如果遍历完都找不到,那么就在尾部插入新的结点
                            // 注意了哈这就是尾插
                            // 14、同时,每遍历一个结点还要累加binCount
                            // 即统计一下当前链表个数,便于后面转红黑树判断
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        // 13、如果f对应的是树结构,那就执行树的更新方法
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                // 14、前面说了哈,binCount就是链表元素个数,接着就判断是否大于阈值
                // 大于则转树,可以看这个阈值TREEIFY_THRESHOLD=8
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 15、addCount是让size+1的
        // 这里要注意,加法也是分多步的,需要先get才能+,因此为了保证线程安全也是需要用cas的
        // 这里加size的方式并不是直接往size上加,因为size是经常修改的,如果经常访问的话效率很低
        // 那么做法和LongAdder这一原子累加器类似,用一个CountCell数组,每个线程只操作数组中的
        // 某一个Cell,最后汇总即可
        addCount(1L, binCount);
        return null;
    }

总结一下put的过程

1.先判断map是否创建,没创建则先创建,结构为 Node<K,V>[ ] extend Map.Entry

2.接着找key应该放在哪个位置 i

3.判断该位置是否为空,为空则使用CAS插入一个新的Node

4.不为空则判断当前位置状态是否为MOVED,是则说明当前位置正在被其他线程改动,当前线程需要帮助完成移动

5.不为空且不为MOVED,则判断是链表还是树,分别执行对应的更新方法

6.如果为链表

  • 先对链表上锁,用syn
  • 则遍历并查找是否已存在
  • 找到最后都不存在则直接尾插
  • 同时统计链表上的元素

7.判断链表元素是否超过变成树的阈值 8 ,超过则直接变成树,变成树也是加syn

8.使用addCount更新size,更新方式类似LongAdder

三、关键点

  • 懒惰加载map
  • 对当前位置操作之前需要判断当前位置的存储的内容是否被其他线程移动了,如果被移动则先去帮助完成移动

执行扩容移动的线程是挨个移动每个位置的链表,移动前会先将当前位置的状态用CAS改成MOVED

注意了这个是提升并发度的关键所在

因为插入和移动(扩容)的粒度是细化到每个位置的链表上

意味着扩容和插入可以同时进行

意味着插入时上锁后,扩容线程执行到该位置需要阻塞

而不是直接整个map都锁住

  • 当前位置没内容时,通过CAS插入新Node,而操作链表时用的是syn

如果面试问到ConcurrentHashMap中用了什么锁就心中有数了

  • 更新size的时候用的是LongAdder类似的方法

有一个CountCell数组,每个线程更新后,对数组中的某个Cell+1

如果没有竞争则只有一个baseCell,对其+1

统计size时汇总即可

再细化一下前面的流程
思考初始化map的时候怎么保证线程安全?防止多个线程同时初始化?
答案在initTable方法中

  • 可以看到,sizeCtl如果是负数说明正在扩容或者初始化
  • 因此当需要初始化时,就去CAS地改变sizeCtl,将其变为负数
  • 哪个线程CAS成功,则可以执行初始化,这就保证了线程安全
  • 可以再去看看,sizeCtl是volatile修饰的哈
  • 并且SIZECTL是sizeCtl的offset,这些都是原子类类似的东西了
private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
        while ((tab = table) == null || tab.length == 0) {
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }

get方法就不说了,因为不涉及并发,就是查找而已
感觉差不多了,把put方法搞清楚了,ConcurrentHashMap怎么解决线程安全的也清楚了,并且并发关键点在哪也清楚了

到此这篇关于浅谈Java源码ConcurrentHashMap的文章就介绍到这了,更多相关Java ConcurrentHashMap内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java源码解析ConcurrentHashMap的初始化

    首先看一下代码 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 第一次检查 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt

  • Java中遍历ConcurrentHashMap的四种方式详解

    这篇文章主要介绍了Java中遍历ConcurrentHashMap的四种方式详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 方式一:在for-each循环中使用entries来遍历 System.out.println("方式一:在for-each循环中使用entries来遍历");for (Map.Entry<String, String> entry: map.entrySet()) { System.out.pr

  • Java并发系列之ConcurrentHashMap源码分析

    我们知道哈希表是一种非常高效的数据结构,设计优良的哈希函数可以使其上的增删改查操作达到O(1)级别.Java为我们提供了一个现成的哈希结构,那就是HashMap类,在前面的文章中我曾经介绍过HashMap类,知道它的所有方法都未进行同步,因此在多线程环境中是不安全的.为此,Java为我们提供了另外一个HashTable类,它对于多线程同步的处理非常简单粗暴,那就是在HashMap的基础上对其所有方法都使用synchronized关键字进行加锁.这种方法虽然简单,但导致了一个问题,那就是在同一时间

  • 深入学习java并发包ConcurrentHashMap源码

    正文 以前写过介绍HashMap的文章,文中提到过HashMap在put的时候,插入的元素超过了容量(由负载因子决定)的范围就会触发扩容操作,就是rehash,这个会重新将原数组的内容重新hash到新的扩容数组中,在多线程的环境下,存在同时其他的元素也在进行put操作,如果hash值相同,可能出现同时在同一数组下用链表表示,造成闭环,导致在get时会出现死循环,所以HashMap是线程不安全的. JDK1.7的实现 整个 ConcurrentHashMap 由一个个 Segment 组成,Seg

  • java基于ConcurrentHashMap设计细粒度实现代码

    细粒度锁: java中的几种锁:synchronized,ReentrantLock,ReentrantReadWriteLock已基本可以满足编程需求,但其粒度都太大,同一时刻只有一个线程能进入同步块,这对于某些高并发的场景并不适用.比如银行客户a向b转账,c向d转账,假如这两个线程并发,代码其实不需要同步.但是同时有线程3,e向b转账,那么对b而言必须加入同步.这时需要考虑锁的粒度问题,即细粒度锁. 网上搜寻了一些关于java细粒度锁的介绍文章,大部分是提供思路,比如乐观锁,String.i

  • java ConcurrentHashMap锁分段技术及原理详解

    一.背景: 线程不安全的HashMap 因为多线程环境下,使用Hashmap进行put操作会引起死循环,导致CPU利用率接近100%,所以在并发情况下不能使用HashMap. 效率低下的HashTable容器 HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable的效率非常低下.因为当一个线程访问HashTable的同步方法时,其他线程访问HashTable的同步方法时,可能会进入阻塞或轮询状态.如线程1使用put进行添加元素,线程2不但不能

  • 基于Java并发容器ConcurrentHashMap#put方法解析

    jdk1.7.0_79 HashMap可以说是每个Java程序员用的最多的数据结构之一了,无处不见它的身影.关于HashMap,通常也能说出它不是线程安全的.这篇文章要提到的是在多线程并发环境下的HashMap--ConcurrentHashMap,显然它必然是线程安全的,同样我们不可避免的要讨论散列表,以及它是如何实现线程安全的,它的效率又是怎样的,因为对于映射容器还有一个Hashtable也是线程安全的但它似乎只出现在笔试.面试题里,在现实编码中它已经基本被遗弃. 关于HashMap的线程不

  • Java ConcurrentHashMap的使用示例

    构造方法 // 1.无参数构造方法 new ConcurrentHashMap(); // 2.指定初始容量 new ConcurrentHashMap(initialCapacity) // 3.指定初始容量和加载因子 new ConcurrentHashMap(initialCapacity,loadFactor) // 4.指定初始容量和加载因子与并发级别(并发更新线程数) new ConcurrentHashMap(initialCapacity, loadFactor, concurr

  • java中ConcurrentHashMap的读操作为什么不需要加锁

    前言 ConcurrentHashMap是Java 5中支持高并发.高吞吐量的线程安全HashMap实现. 我们知道,ConcurrentHashmap(1.8)这个并发集合框架是线程安全的,当你看到源码的get操作时,会发现get操作全程是没有加任何锁的,这也是这篇博文讨论的问题--为什么它不需要加锁呢? 下面话不多说了,来一起看看详细的介绍吧 ConcurrentHashMap的简介 我想有基础的同学知道在jdk1.7中是采用Segment + HashEntry + ReentrantLo

  • JAVA核心知识之ConcurrentHashMap源码分析

    1 前言 ConcurrentHashMap是基于Hash表的Map接口实现,键与值均不允许为NULL,他是一个线程安全的Map.同时他也是一个无序的Map,不同时间进行遍历可能会得到不同的顺序.在JDK1.8之前,ConcurrentHashMap使用分段锁以在保证线程安全的同时获得更大的效率.JDK1.8开始舍弃了分段锁,使用自旋+CAS+sync关键字来实现同步.本文所述便是基于JDK1.8. ConcurrentHashMap与HashMap有共同之处,一些HashMap的基本概念与实现

  • Java concurrency集合之ConcurrentHashMap_动力节点Java学院整理

    ConcurrentHashMap介绍 ConcurrentHashMap是线程安全的哈希表.HashMap, Hashtable, ConcurrentHashMap之间的关联如下: HashMap是非线程安全的哈希表,常用于单线程程序中. Hashtable是线程安全的哈希表,它是通过synchronized来保证线程安全的:即,多线程通过同一个"对象的同步锁"来实现并发控制.Hashtable在线程竞争激烈时,效率比较低(此时建议使用ConcurrentHashMap)!因为当一

  • Java 中ConcurrentHashMap的实现

    ConcurrentHashMap(简称CHM)是在Java 1.5作为Hashtable的替代选择新引入的,是concurrent包的重要成员.在Java 1.5之前,如果想要实现一个可以在多线程和并发的程序中安全使用的Map,只能在HashTable和synchronized Map中选择,因为HashMap并不是线程安全的.但再引入了CHM之后,我们有了更好的选择.CHM不但是线程安全的,而且比HashTable和synchronizedMap的性能要好.相对于HashTable和sync

  • java 使用ConcurrentHashMap和计数器实现锁

    java 使用ConcurrentHashMap和计数器实现锁 在某些场景下,我们想让线程根据某些业务数据进行排队,简单代码如下: import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.Ato

  • Java7和Java8中的ConcurrentHashMap原理解析

    Java7 中 ConcurrentHashMap ConcurrentHashMap 和 HashMap 思路是差不多的,但是因为它支持并发操作,所以要复杂一些. 整个 ConcurrentHashMap 由一个个 Segment 组成,Segment 代表"部分"或"一段"的意思,所以很多地方都会将其描述为分段锁.注意,行文中,我很多地方用了"槽"来代表一个 segment. 简单理解就是,ConcurrentHashMap 是一个 Segm

随机推荐