java高并发下解决AtomicLong性能瓶颈方案LongAdder
目录
- 一、 LongAdder简介
- 二、LongAdder代码分析
- (1)LongAdder的结构
- (2)add方法实现
- (3)add方法中longAccumulate方法的实现
- 三、总结
一、 LongAdder简介
LongAdder类是JDK1.8新增的一个原子性操作类。上一节说到,AtomicLong通过CAS提供了非阻塞的原子性操作,相比用阻塞算法的synchronized来说性能已经得到了很大提升。在高并发下大量线程会同时竞争更新同一个原子变量,但由于只有一个线程的CAS操作会成功,这就造成了大量线程竞争失败后,会通过无限循环不断进行自旋尝试CAS操作,这会白白浪费CPU资源。
为了解决AtomicLong在高并发下的缺点,LongAdder应运而生。LongAdder采用的思路是:既然AtomicLong由于过多线程同时去竞争同一个变量的更新而产生性能瓶颈,那么把一个变量分解为多个变量,让同样多的线程去竞争多个资源,就解决了性能问题。
如图4-1:
使用AtomicLong时,是多个线程同时竞争同一个原子变量。
如图4-2:
使用LongAdder时,则是内部维护多个Cell变量,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相地减少了争夺共享资源的并发量。另外,多个线程在争夺同一个Cell原子变量时候,如果失败并不是自旋CAS重试,而是尝试获取在其他Cell原子变量上进行CAS尝试,这增加了当前线程重试CAS成功的可能性。最后,在获取LongAdder当前值时,是把所有Cell变量的value值累加后再加上base值返回的。
LongAdder维护了一个Cells数组和一个基值变量base,Cells数组的特点如下:
- 延迟初始化(默认情况下Cells为null)。因为Cells占用内存相对较大,故惰性加载。Cells为null时且并发线程较少时,所有的累加操作都是对base变量进行的。
- 初始化时,Cells数组中Cell元素个数为2;同时,Cell数组中元素个数保存为2的N次方。
Cell 类是AtomicLong的一个改进,用来减少缓存的争用,即解决伪共享问题。对于大多数孤立的多个原子操作进行字节填充是浪费的,因为原子操作都是无规律地分散在内存中进行的,多个原子变量被放入同一个缓存行的可能性很小。但是原子性数组元素的内存地址是连续的,故数组内的多个元素能经常共享缓存行(伪共享),因此Cell类使用了@sun.misc.Contended注解进行字节填充,这是为了防止数组中多个元素共享一个缓存行,从而提升性能。
二、LongAdder代码分析
为了解决高并发下多线程对一个变量 CAS 争夺失败后进行无限自旋而造成的降低并发性能的问题,LongAdder在内部维护了一个动态的Cell数组来分担对单个变量进行争夺的开销。
这一节我们来围绕以下话题对LongAdder的实现进行分析:
- (1)LongAdder的结构
- (2)当前线程应该访问Cells数组里的哪一个Cell元素
- (3)如何初始化Cells数组
- (4)Cells数组的扩容机制
- (5)线程访问所分配的Cell元素有冲突后如何处理
- (6)如何保证线程操作被分配的Cell元素的原子性
(1)LongAdder的结构
如图,LongAdder类继承自Striped64类:
先混个眼熟,Striped64类是一个高并发累加的工具类。其特点为:
- 设计核心思路就是通过内部的分散计算来避免竞争。
- 内部包含一个base和一个Cells数组
- 没有竞争的情况下,要累加的数通过CAS累加到base上;如果有竞争的话,会将Cells数组中的每个Cell的value值累加,再加上base值返回。
Striped64类内部维护三个重要的变量:
- transient volatile Cell[] cells; // 存放Cell的数组,大小为2的N次方
- transient volatile long base; // 基础值,默认为0
- transient volatile int cellsBusy; // 用来实现自旋锁,状态值只有0和1,通过CAS操作该变量来保证只有一个线程可以创建Cell元素、初始化Cells数组、对Cells数组扩容
上文中出现频率很高的Cell长啥样?下面我们来揭秘一下。
Cell结构如下:
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
简单分析一下:
- 为提高性能,使用注解@sun.misc.Contended修饰,用来避免伪共享。
- value变量被声明为volatile,为了保证变量的内存可见性。
- cas方法通过CAS操作,保证了当前线程更新时被分配的Cell元素中value值的原子性。
(2)add方法实现
从add方法的代码中,我们可以找到开头里很多问题的答案。
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) {//(1) boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || //(2) (a = as[getProbe() & m]) == null ||//(3) !(uncontended = a.cas(v = a.value, v + x)))//(4) longAccumulate(x, null, uncontended);//(5) } } final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); }
- (1)处代码表明:如果cells为null,则add方法实现的效果是在base变量上累加x,这是与AtomicLong的操作类似。
- 当cells不为null或者线程执行代码(1)处的CAS操作失败了,就会执行代码(2);代码(2)(3)决定了当前线程应该访问Cells数组里哪一个Cell元素,如果当前线程映射的Cell元素存在则执行代码(4),使用CAS操作去更新Cell元素的value值
- 如果当前线程映射的Cell元素不存在或者存在,但是CAS操作失败则执行代码(5)。
总结来看就是,(2)(3)(4)处的代码就是获取当前线程应该访问的Cells数组中的Cell元素,然后进行CAS更新操作,在获取期间如果有条件不满足就会跳到代码(5)执行。
另外,当前线程应该访问Cells数组的哪一个Cell元素是通过getProbe() & m
进行计算的,其中m是当前Cells数组元素个数-1,getProbe()
则用于获取当前线程中变量threadLocalRandomProbe
的值(默认为0,代码(5)将其初始化),并且当前线程通过分配的Cell元素的cas函数来保证对Cell元素更新的原子性。到此,我们解决了问题当前线程应该访问Cells数组里的哪一个Cell元素和如何保证线程操作被分配的Cell元素的原子性。
(3)add方法中longAccumulate方法的实现
longAccumulate
方法是cells数组被初始化和扩容的地方。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h;//(6) if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) {//(7) if ((a = as[(n - 1) & h]) == null) {//(8) if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash //(9) else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; //(10) else if (n >= NCPU || cells != as) collide = false; // At max size or stale //(11) else if (!collide) collide = true; //(12) else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale //(12.1) Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { //(12.2) cellsBusy = 0; } //(12.3) collide = false; continue; // Retry with expanded table } //(13)为了能找到一个空闲的Cell,重新计算hash值,xorshift算法生成随机数。 h = advanceProbe(h); } //(14) else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { //(14.1) Cell[] rs = new Cell[2]; //(14.2) rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { //(14.3) cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
- 当线程第一次执行代码(6)时,会初始化当前线程变量threadLocalRandomProbe的值,此变量在计算当前线程被分配到Cells数组的哪一个Cell元素时会使用。
- 当前线程执行到代码(7)(8)时,会根据当前线程的threadLocalRandomProbe和cells元素个数计算访问的Cell元素下标,如果发现对应下标元素的值为null,则新增一个Cell元素添加到Cells数组中。但在添加的时候,需要竞争并设置cellsBusy为1。
- 当前线程对应的Cell元素存在,执行代码(9),进行累加操作。
如果Cells数组不存在时,会发生什么?
- cells数组初始化是在代码(14)中进行的,cellsBusy是一个标识(0:当前数组没有在被初始化或者扩容,也未新建Cell元素。1:cells数组在被初始化或扩容、正在创建新的cell元素),通过CAS操作来进行0或1状态的切换,使用
casCellsBusy
方法。 - (14.1)初始化cells数组,长度为2,然后(14.2)使用当前线程的threadLocalRandomProbe值&(cells元素个数-1)来计算当前线程应该访问cells数组的哪个位置。最后(14.3)重置cellsBusy为0,表示初始化完成。这里虽未使用CAS,但却线程安全,因为cellsBusy是volatile的,保证了内存可见性。
- 初始化完的cells数组,里面的元素还是为null的。创建Cell元素是在(7)(8)中。
Cells数组扩容是怎么实现的?
- 代码(12)中,扩容之前需要进行(10)和(11)的判断,当cells元素个数小于当前机器CPU个数且当前多个线程访问了cells中同一个元素时,从而导致冲突使其中一个线程CAS失败时才会进行扩容操作。
- 涉及CPU个数的原因是,只有每个CPU都运行一个线程时,多线程效果最佳。即当Cells元素个数等于CPU个数时,每个Cell都使用一个CPU进行处理,效果最佳。
- 代码(12)中,扩容操作是先通过CAS设置cellsBusy为1,然后才扩容。假设CAS成功则执行代码(12.1)扩容,容量为之前的2倍,并复制Cell元素到扩容后数组,并复制Cell元素到扩容后的数组。
线程访问所分配的Cell元素有冲突后如何处理?
- 代码(13)已经给了我们答案,对CAS失败的线程重新计算当前线程的threadLocalRandomProbe值,以减少下次访问cells元素时的冲突机会。
三、总结
最后再问一下自己,看完了源码分析后,能回答出这六个问题吗?如果能,那么恭喜你,LongAdder的原理已经掌握得差不多了。
- (1)LongAdder的结构
- (2)当前线程应该访问Cells数组里的哪一个Cell元素
- (3)如何初始化Cells数组
- (4)Cells数组的扩容机制
- (5)线程访问所分配的Cell元素有冲突后如何处理
- (6)如何保证线程操作被分配的Cell元素的原子性
由于篇幅有限,下一期我们再来看看LongAccumulator类的原理,毕竟这期的内容有点多了,讲太多也不好消化。这期就到这吧~更多关于java高并发AtomicLong LongAdder的资料请关注我们其它相关文章!