Java多线程编程中的两种常用并发容器讲解

ConcurrentHashMap并发容器
 ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。

ConcurrentHashMap的内部结构

  ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:

从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。

Segment

  我们再来具体了解一下Segment的数据结构:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
 transient volatile int count;
 transient int modCount;
 transient int threshold;
 transient volatile HashEntry<K,V>[] table;
 final float loadFactor;
}

  详细解释一下Segment里面的成员变量的意义:

  • count:Segment中元素的数量
  • modCount:对table的大小造成影响的操作的数量(比如put或者remove操作)
  • threshold:阈值,Segment里面元素的数量超过这个值依旧就会对Segment进行扩容
  • table:链表数组,数组中的每一个元素代表了一个链表的头部
  • loadFactor:负载因子,用于确定threshold

HashEntry

  Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:

static final class HashEntry<K,V> {
 final K key;
 final int hash;
 volatile V value;
 final HashEntry<K,V> next;
}

  可以看到HashEntry的一个特点,除了value以外,其他的几个变量都是final的,这样做是为了防止链表结构被破坏,出现ConcurrentModification的情况。

ConcurrentHashMap的初始化

  下面我们来结合源代码来具体分析一下ConcurrentHashMap的实现,先看下初始化方法:

public ConcurrentHashMap(int initialCapacity,
       float loadFactor, int concurrencyLevel) {
 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
  throw new IllegalArgumentException();

 if (concurrencyLevel > MAX_SEGMENTS)
  concurrencyLevel = MAX_SEGMENTS;

 // Find power-of-two sizes best matching arguments
 int sshift = 0;
 int ssize = 1;
 while (ssize < concurrencyLevel) {
  ++sshift;
  ssize <<= 1;
 }
 segmentShift = 32 - sshift;
 segmentMask = ssize - 1;
 this.segments = Segment.newArray(ssize);

 if (initialCapacity > MAXIMUM_CAPACITY)
  initialCapacity = MAXIMUM_CAPACITY;
 int c = initialCapacity / ssize;
 if (c * ssize < initialCapacity)
  ++c;
 int cap = 1;
 while (cap < c)
  cap <<= 1;

 for (int i = 0; i < this.segments.length; ++i)
  this.segments[i] = new Segment<K,V>(cap, loadFactor);
}

  CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,代表ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

  整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。

  这边需要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的作用,假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。

ConcurrentHashMap的get操作

  前面提到过ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:

public V get(Object key) {
 int hash = hash(key.hashCode());
 return segmentFor(hash).get(key, hash);
}

  看第三行,segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数,我们看下这个函数的实现:

final Segment<K,V> segmentFor(int hash) {
 return segments[(hash >>> segmentShift) & segmentMask];
}

  这个函数用了位操作来确定Segment,根据传入的hash值向右无符号右移segmentShift位,然后和segmentMask进行与操作,结合我们之前说的segmentShift和segmentMask的值,就可以得出以下结论:假设Segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中。

  在确定了需要在哪一个segment中进行操作以后,接下来的事情就是调用对应的Segment的get方法:

V get(Object key, int hash) {
 if (count != 0) { // read-volatile
  HashEntry<K,V> e = getFirst(hash);
  while (e != null) {
   if (e.hash == hash && key.equals(e.key)) {
    V v = e.value;
    if (v != null)
     return v;
    return readValueUnderLock(e); // recheck
   }
   e = e.next;
  }
 }
 return null;
}

  先看第二行代码,这里对count进行了一次判断,其中count表示Segment中元素的数量,我们可以来看一下count的定义:

transient volatile int count;

   可以看到count是volatile的,实际上这里里面利用了volatile的语义:

  对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。
  因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。

  然后,在第三行,调用了getFirst()来取得链表的头部:

HashEntry<K,V> getFirst(int hash) {
 HashEntry<K,V>[] tab = table;
 return tab[hash & (tab.length - 1)];
}

  同样,这里也是用位操作来确定链表的头部,hash值和HashTable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是HashTable的长度以2为底的结果。

  在确定了链表的头部以后,就可以对整个链表进行遍历,看第4行,取出key对应的value的值,如果拿出的value的值是null,则可能这个key,value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。

ConcurrentHashMap的put操作

  看完了get操作,再看下put操作,put操作的前面也是确定Segment的过程,这里不再赘述,直接看关键的segment的put方法:

V put(K key, int hash, V value, boolean onlyIfAbsent) {
 lock();
 try {
  int c = count;
  if (c++ > threshold) // ensure capacity
   rehash();
  HashEntry<K,V>[] tab = table;
  int index = hash & (tab.length - 1);
  HashEntry<K,V> first = tab[index];
  HashEntry<K,V> e = first;
  while (e != null && (e.hash != hash || !key.equals(e.key)))
   e = e.next;

  V oldValue;
  if (e != null) {
   oldValue = e.value;
   if (!onlyIfAbsent)
    e.value = value;
  }
  else {
   oldValue = null;
   ++modCount;
   tab[index] = new HashEntry<K,V>(key, hash, first, value);
   count = c; // write-volatile
  }
  return oldValue;
 } finally {
  unlock();
 }
}

  首先对Segment的put操作是加锁完成的,然后在第五行,如果Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)这需要进行对Segment扩容,并且要进行rehash,关于rehash的过程大家可以自己去了解,这里不详细讲了。

  第8和第9行的操作就是getFirst的过程,确定链表头部的位置。

  第11行这里的这个while循环是在链表中寻找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果没有找到,则进入21行这里,生成一个新的HashEntry并且把它加到整个Segment的头部,然后再更新count的值。

ConcurrentHashMap的remove操作

  Remove操作的前面一部分和前面的get和put操作一样,都是定位Segment的过程,然后再调用Segment的remove方法:

V remove(Object key, int hash, Object value) {
 lock();
 try {
  int c = count - 1;
  HashEntry<K,V>[] tab = table;
  int index = hash & (tab.length - 1);
  HashEntry<K,V> first = tab[index];
  HashEntry<K,V> e = first;
  while (e != null && (e.hash != hash || !key.equals(e.key)))
   e = e.next;

  V oldValue = null;
  if (e != null) {
   V v = e.value;
   if (value == null || value.equals(v)) {
    oldValue = v;
    // All entries following removed node can stay
    // in list, but all preceding ones need to be
    // cloned.
    ++modCount;
    HashEntry<K,V> newFirst = e.next;
    for (HashEntry<K,V> p = first; p != e; p = p.next)
     newFirst = new HashEntry<K,V>(p.key, p.hash,
             newFirst, p.value);
    tab[index] = newFirst;
    count = c; // write-volatile
   }
  }
  return oldValue;
 } finally {
  unlock();
 }
}

  首先remove操作也是确定需要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了,我们之前已经说过HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去,看一下下面这一幅图来了解这个过程:

 假设链表中原来的元素如上图所示,现在要删除元素3,那么删除元素3以后的链表就如下图所示:

CopyOnWriteArrayList并发容器
Copy-On-Write简称COW,是一种用于程序设计中的优化策略。其基本思路是,从一开始大家都在共享同一个内容,当某个人想要修改这个内容的时候,才会真正把内容Copy出去形成一个新的内容然后再改,这是一种延时懒惰策略。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。CopyOnWrite容器非常有用,可以在非常多的并发场景中使用到。

什么是CopyOnWrite容器

  CopyOnWrite容器即写时复制的容器。通俗的理解是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素之后,再将原容器的引用指向新的容器。这样做的好处是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以CopyOnWrite容器也是一种读写分离的思想,读和写不同的容器。

CopyOnWriteArrayList的实现原理

  在使用CopyOnWriteArrayList之前,我们先阅读其源码了解下它是如何实现的。以下代码是向CopyOnWriteArrayList中add方法的实现(向CopyOnWriteArrayList里添加元素),可以发现在添加的时候是需要加锁的,否则多线程写的时候会Copy出N个副本出来。

/**
  * Appends the specified element to the end of this list.
  *
  * @param e element to be appended to this list
  * @return <tt>true</tt> (as specified by {@link Collection#add})
  */
 public boolean add(E e) {
 final ReentrantLock lock = this.lock;
 lock.lock();
 try {
  Object[] elements = getArray();
  int len = elements.length;
  Object[] newElements = Arrays.copyOf(elements, len + 1);
  newElements[len] = e;
  setArray(newElements);
  return true;
 } finally {
  lock.unlock();
 }
 }

  读的时候不需要加锁,如果读的时候有多个线程正在向CopyOnWriteArrayList添加数据,读还是会读到旧的数据,因为写的时候不会锁住旧的CopyOnWriteArrayList。

public E get(int index) {
 return get(getArray(), index);
}

  JDK中并没有提供CopyOnWriteMap,我们可以参考CopyOnWriteArrayList来实现一个,基本代码如下:

import java.util.Collection;
import java.util.Map;
import java.util.Set;

public class CopyOnWriteMap<K, V> implements Map<K, V>, Cloneable {
 private volatile Map<K, V> internalMap;

 public CopyOnWriteMap() {
  internalMap = new HashMap<K, V>();
 }

 public V put(K key, V value) {

  synchronized (this) {
   Map<K, V> newMap = new HashMap<K, V>(internalMap);
   V val = newMap.put(key, value);
   internalMap = newMap;
   return val;
  }
 }

 public V get(Object key) {
  return internalMap.get(key);
 }

 public void putAll(Map<? extends K, ? extends V> newData) {
  synchronized (this) {
   Map<K, V> newMap = new HashMap<K, V>(internalMap);
   newMap.putAll(newData);
   internalMap = newMap;
  }
 }
}

  实现很简单,只要了解了CopyOnWrite机制,我们可以实现各种CopyOnWrite容器,并且在不同的应用场景中使用。

CopyOnWrite的应用场景

  CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景,假如我们有一个搜索网站,用户在这个网站的搜索框中,输入关键字搜索内容,但是某些关键字不允许被搜索。这些不能被搜索的关键字会被放在一个黑名单当中,黑名单每天晚上更新一次。当用户搜索时,会检查当前关键字在不在黑名单当中,如果在,则提示不能搜索。实现代码如下:

package com.ifeve.book;

import java.util.Map;

import com.ifeve.book.forkjoin.CopyOnWriteMap;

/**
 * 黑名单服务
 *
 * @author fangtengfei
 *
 */
public class BlackListServiceImpl {

 private static CopyOnWriteMap<String, Boolean> blackListMap = new CopyOnWriteMap<String, Boolean>(
   1000);

 public static boolean isBlackList(String id) {
  return blackListMap.get(id) == null ? false : true;
 }

 public static void addBlackList(String id) {
  blackListMap.put(id, Boolean.TRUE);
 }

 /**
  * 批量添加黑名单
  *
  * @param ids
  */
 public static void addBlackList(Map<String,Boolean> ids) {
  blackListMap.putAll(ids);
 }

}

  代码很简单,但是使用CopyOnWriteMap需要注意两件事情:

  1. 减少扩容开销。根据实际需要,初始化CopyOnWriteMap的大小,避免写时CopyOnWriteMap扩容的开销。

  2. 使用批量添加。因为每次添加,容器每次都会进行复制,所以减少添加次数,可以减少容器的复制次数。如使用上面代码里的addBlackList方法。

CopyOnWrite的缺点

  CopyOnWrite容器有很多优点,但是同时也存在两个问题,即内存占用问题和数据一致性问题。所以在开发的时候需要注意一下。

  内存占用问题。因为CopyOnWrite的写时复制机制,所以在进行写操作的时候,内存里会同时驻扎两个对象的内存,旧的对象和新写入的对象(注意:在复制的时候只是复制容器里的引用,只是在写的时候会创建新对象添加到新容器里,而旧容器的对象还在使用,所以有两份对象内存)。如果这些对象占用的内存比较大,比如说200M左右,那么再写入100M数据进去,内存就会占用300M,那么这个时候很有可能造成频繁的Yong GC和Full GC。之前我们系统中使用了一个服务由于每晚使用CopyOnWrite机制更新大对象,造成了每晚15秒的Full GC,应用响应时间也随之变长。

  针对内存占用问题,可以通过压缩容器中的元素的方法来减少大对象的内存消耗,比如,如果元素全是10进制的数字,可以考虑把它压缩成36进制或64进制。或者不使用CopyOnWrite容器,而使用其他的并发容器,如ConcurrentHashMap。

  数据一致性问题。CopyOnWrite容器只能保证数据的最终一致性,不能保证数据的实时一致性。所以如果你希望写入的的数据,马上能读到,请不要使用CopyOnWrite容器。

(0)

相关推荐

  • 基于Java并发容器ConcurrentHashMap#put方法解析

    jdk1.7.0_79 HashMap可以说是每个Java程序员用的最多的数据结构之一了,无处不见它的身影.关于HashMap,通常也能说出它不是线程安全的.这篇文章要提到的是在多线程并发环境下的HashMap--ConcurrentHashMap,显然它必然是线程安全的,同样我们不可避免的要讨论散列表,以及它是如何实现线程安全的,它的效率又是怎样的,因为对于映射容器还有一个Hashtable也是线程安全的但它似乎只出现在笔试.面试题里,在现实编码中它已经基本被遗弃. 关于HashMap的线程不

  • 深入理解Java的Spring框架中的IOC容器

    Spring IOC的原型 spring框架的基础核心和起点毫无疑问就是IOC,IOC作为spring容器提供的核心技术,成功完成了依赖的反转:从主类的对依赖的主动管理反转为了spring容器对依赖的全局控制. 这样做的好处是什么呢? 当然就是所谓的"解耦"了,可以使得程序的各模块之间的关系更为独立,只需要spring控制这些模块之间的依赖关系并在容器启动和初始化的过程中将依据这些依赖关系创建.管理和维护这些模块就好,如果需要改变模块间的依赖关系的话,甚至都不需要改变程序代码,只需要将

  • 深入理解Java线程编程中的阻塞队列容器

    1. 什么是阻塞队列? 阻塞队列(BlockingQueue)是一个支持两个附加操作的队列.这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空.当队列满时,存储元素的线程会等待队列可用.阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程.阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素. 阻塞队列提供了四种处理方法: 抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException("Q

  • 用java的spring实现一个简单的IOC容器示例代码

    要想深入的理解IOC的技术原理,没有什么能比的上我们自己实现它.这次我们一起实现一个简单IOC容器.让大家更容易理解Spring IOC的基本原理. 这里会涉及到一些java反射的知识,如果有不了解的,可以自己去找些资料看看. 注意 在上一篇文章,我说,启动IOC容器时,Spring会将xml文件里面配置的bean扫描并实例化,其实这种说法不太准确,所以我在这里更正一下,xml文件里面配置的非单利模式的bean,会在第一次调用的时候被初始化,而不是启动容器的时候初始化.但是我们这次要做的例子是容

  • 浅析Java的Spring框架中IOC容器容器的应用

    Spring容器是Spring框架的核心.容器将创建对象,它们连接在一起,配置它们,并从创建到销毁管理他们的整个生命周期.在Spring容器使用依赖注入(DI)来管理组成应用程序的组件.这些对象被称为Spring Beans. 容器获得其上的哪些对象进行实例化,配置和组装通过阅读提供的配置元数据的说明.配置元数据可以通过XML,Java注释或Java代码来表示.下面的图是Spring如何工作的高层次图. Spring IoC容器是利用Java的POJO类和配置元数据的产生完全配置和可执行的系统或

  • Java容器HashMap与HashTable详解

    1.HashMap HashMap继承抽象类AbstractMap,实现接口Map.Cloneable, Serializable接口.HashMap是一种以键值对存储数据的容器, 由数组+链表组成,其中key和value都可以为空,key的值唯一.HashMap是非线程安全的, 对于键值对<Key,Value>, HashMap内部会将其封装成一个对应的Entry<Key,Value>对象.HashMap的存储空间大小是可以动态改变的: 存储过程 每个对象都有一个对应的HashC

  • java容器详细解析

    前言:在java开发中我们肯定会大量的使用集合,在这里我将总结常见的集合类,每个集合类的优点和缺点,以便我们能更好的使用集合.下面我用一幅图来表示 其中淡绿色的表示接口,红色的表示我们经常使用的类. 1:基本概念 Java容器类类库的用途是保存对象,可以将其分为2个概念. 1.1:Collection 一个独立元素的序列,这些元素都服从一条或多条规则.其中List必须按照插入的顺序保存元素.Set不能有重复的元素.Queue按照排队规则来确定对象的产生顺序(通常也是和插入顺序相同) 1.2:Ma

  • Java的Swing编程中使用SwingWorker线程模式及顶层容器

    使用SwingWorker线程模式 谨慎地使用并发机制对Swing开发人员来说非常重要.一个好的Swing程序使用并发机制来创建不会失去响应的用户接口-不管是什么样的用户交互,程序总能够对其给出响应.创建一个有响应的程序,开发人员必须学会如何在Swing框架中使用多线程. 一个Swing开发人员将会与下面几类线程打交道: (1)Initial threads(初始线程),此类线程将执行初始化应用代码. (2)The event dispatch thread(事件派发线程),所有的事件处理代码在

  • java并发容器CopyOnWriteArrayList实现原理及源码分析

    CopyOnWriteArrayList是Java并发包中提供的一个并发容器,它是个线程安全且读操作无锁的ArrayList,写操作则通过创建底层数组的新副本来实现,是一种读写分离的并发策略,我们也可以称这种容器为"写时复制器",Java并发包中类似的容器还有CopyOnWriteSet.本文会对CopyOnWriteArrayList的实现原理及源码进行分析. 实现原理 我们都知道,集合框架中的ArrayList是非线程安全的,Vector虽是线程安全的,但由于简单粗暴的锁同步机制,

  • 迅速掌握Java容器中常用的ArrayList类与Vector类用法

    ArrayList类 List集合的实例化: List<String> l = new ArrayList<String>(); //使用ArrayList类实例化List集合 List<String> l2 = new LinkedList<String>(); //使用LinkedList类实例化List集合 ArrayList常用方法: add(int index, Object obj); addAll(int, Collection coll);

  • Java开发中的容器概念、分类与用法深入详解

    本文实例讲述了Java开发中的容器概念.分类与用法.分享给大家供大家参考,具体如下: 1.容器的概念 在Java当中,如果有一个类专门用来存放其它类的对象,这个类就叫做容器,或者就叫做集合,集合就是将若干性质相同或相近的类对象组合在一起而形成的一个整体 2.容器与数组的关系 之所以需要容器: ① 数组的长度难以扩充 ② 数组中数据的类型必须相同 容器与数组的区别与联系: ① 容器不是数组,不能通过下标的方式访问容器中的元素 ② 数组的所有功能通过Arraylist容器都可以实现,只是实现的方式不

  • Java容器类的深入理解

    Java容器类包含List.ArrayList.Vector及map.HashTable.HashMap ArrayList和HashMap是异步的,Vector和HashTable是同步的,所以Vector和HashTable是线程安全的,而ArrayList和HashMap并不是线程安全的.因为同步需要花费机器时间,所以Vector和HashTable的执行效率要低于ArrayList和HashMap.Collection├List       接口│├LinkedList       链表

随机推荐