分析并发编程之LongAdder原理

目录
  • 一、前言
  • 二、LongAdder类的使用
  • 三、LongAdder原理的直观理解
  • 四、源码分析
  • 五、与AtomicInteger的比较
  • 六、思想的抽象

一、前言

ConcurrentHashMap的源码采用了一种比较独特的方式对map中的元素数量进行统计,自然是要好好研究一下其原理思想,同时也能更好地理解ConcurrentHashMap本身。

本文主要思路分为以下5个部分:

1.计数的使用效果

2.原理的直观图解

3.源码的细节分析

4.与AtomicInteger的比较

5.思想的抽象

学习的入口自然是map的put方法

public V put(K key, V value) {
    return putVal(key, value, false);
}

查看putVal方法

这里并不对ConcurrentHashMap本身的原理作过多讨论,因此我们直接跳到计数部分

final V putVal(K key, V value, boolean onlyIfAbsent) {
    ...
    addCount(1L, binCount);
    return null;
}

每当成功添加一个元素之后,都会调用addCount方法进行数量的累加1的操作,这就是我们研究的目标

因为ConcurrentHashMap的设计初衷就是为了解决多线程并发场景下的map操作,因此在作数值累加的时候自然也要考虑线程安全

当然,多线程数值累加一般是学习并发编程的第一课,本身并非很复杂,可以采用AtomicInteger或者锁等等方式来解决该问题

然而如果我们查看该方法,就会发现,一个想来应该比较简单的累加方法,其逻辑看上去却相当复杂

这里我只贴出了累加算法的核心部分

private final void addCount(long x, int check) {
    CounterCell[] as; long b, s;
    if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell a; long v; int m;
        boolean uncontended = true;
        if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                        U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    ...
}

我们就来研究一下该逻辑的实现思路。而这个思路其实是照搬了LongAdder类的逻辑,因此我们直接查看该算法的原始类

二、LongAdder类的使用

我们先看下LongAdder的使用效果

LongAdder adder = new LongAdder();
int num = 0;

@Test
public void test5() throws InterruptedException {
    Thread[] threads = new Thread[10];
    for (int i = 0; i < 10; i++) {
        threads[i] = new Thread(() -> {
            for (int j = 0; j < 10000; j++) {
                adder.add(1);
                num += 1;
            }
        });
        threads[i].start();
    }
    for (int i = 0; i < 10; i++) {
        threads[i].join();
    }
    System.out.println("adder:" + adder);
    System.out.println("num:" + num);
}

输出结果

adder:100000

num:40982

可以看到adder在使用效果上是可以保证累加的线程安全的

三、LongAdder原理的直观理解

为了更好地对源码进行分析,我们需要先从直觉上理解它的原理,否则直接看代码的话会一脸懵逼

LongAdder的计数主要分为2个对象

一个long类型的字段:base

一个Cell对象数组,Cell对象中就维护了一个long类型的字段value,用来计数

/**
 * Table of cells. When non-null, size is a power of 2.
 */
transient volatile Cell[] cells;

/**
 * Base value, used mainly when there is no contention, but also as
 * a fallback during table initialization races. Updated via CAS.
 */
transient volatile long base;

当没有发生线程竞争的时候,累加都会发生在base字段上,这就相当于是一个单线程累加2次,只不过base的累加是一个cas操作

当发生线程竞争的时候,必然有一个线程对base的cas累加操作失败,于是它先去判断Cell是否已经被初始化了,如果没有则初始化一个长度为2的数组,并根据线程的hash值找到对应的数组索引,并对该索引的Cell对象中的value值进行累加(这个累加也是cas的操作)

如果一共有3个线程发生了竞争,那么其中第一个线程对base的cas累加成功,剩下2个线程都需要去对Cell数组中的元素进行累加。因为对Cell中value值的累加也是一个cas操作,如果第二个线程和第三个线程的hash值对应的数组下标是同一个,那么同样会发生竞争,如果第二个线程成功了,第三个线程就会去rehash自己的hash值,如果得到的新的hash值对应的是另一个元素为null的数组下标,那么就new一个Cell对象并对value值进行累加

如果此时有线程4同时参与竞争,那么对于线程4来说,即使rehash后还是可能在和线程3的竞争过程中cas失败,此时如果当前数组的容量小于系统可用的cpu的数量,那么它就会对数组进行扩容,之后再次rehash,重复尝试对Cell数组中某个下标对象的累加

以上就是整体直觉上的理解,然而代码中还有很多细节的设计非常值得学习,所以我们就开始进入源码分析的环节

四、源码分析

入口方法是add

public void add(long x) {
    Cell[] as; long b, v; int m; Cell a;
    /**
     * 这里优先判断了cell数组是否为空,之后才判断base字段的cas累加
     * 意味着如果线程不发生竞争,cell数组一直为空,那么所有的累加操作都会累加到base上
     * 而一旦发生过一次竞争导致cell数组不为空,那么所有的累加操作都会优先作用于数组中的对象上
     */
    if ((as = cells) != null || !casBase(b = base, b + x)) {
        /**
         * 这个字段是用来标识在对cell数组中的对象进行累加操作时是否发生了竞争
         * 如果发生了竞争,那么在longAccumulate方法中会多进行一次rehash的自旋
         * 这个在后面的方法中详细说明,这里先有个印象
         * true表示未发生竞争
         */
        boolean uncontended = true;
        /**
         * 如果cell数组为空或者长度为0则直接进入主逻辑方法
         */
        if (as == null || (m = as.length - 1) < 0 ||
                /**
                 * 这里的getProbe()方法可以认为就是获取线程的hash值
                 * hash值与(数组长度-1)进行位与操作后得到对应的数组下标
                 * 判断该元素是否为空,如果不为空那么就会尝试累加
                 * 否则进入主逻辑方法
                 */
                (a = as[getProbe() & m]) == null ||
                /**
                 * 对数组下标的元素进行cas累加,如果成功了,那么就可以直接返回
                 * 否则进入主逻辑方法
                 */
                !(uncontended = a.cas(v = a.value, v + x)))
            longAccumulate(x, null, uncontended);
    }
}

当不发生线程竞争的时候,那累加操作就会由第一个if中的casBase负责,对应之前图解的情况一

当发生线程竞争之后,累加操作就会由cell数组负责,对应之前图解的情况二(数组的初始化在longAccumulate方法中)

接着我们查看主逻辑方法,因为方法比较长,所以我会一段一段拿出来解析

longAccumulate方法

签名中的参数

x表示需要累加的值

fn表示需要如何累加,一般传null就行,不重要

wasUncontended表示是否在外层方法遇到了竞争失败的情况,因为外层的判断逻辑是多个“或”(as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null),所以如果数组为空或者相应的下标元素还未初始化,这个字段就会保持false

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
  ...
}

首先判断线程的hash值是否为0,如果为0则需要做一个初始化,即rehash

之后会将wasUncontended置为true,因为即使之前是冲突过的,经过rehash后就会先假设它能找到一个元素不冲突的数组下标

int h;//线程的hash值,在后面的逻辑中会用到
if ((h = getProbe()) == 0) {
    ThreadLocalRandom.current(); // force initialization
    h = getProbe();
    wasUncontended = true;
}

之后是一个死循环,死循环中有3个大的if分支,这3个分支的逻辑作用于数组未初始化的时候,一旦数组初始化完成,那么就都会进入主逻辑了,因此我这里把主逻辑抽取出来放到后面单独说,也可以避免外层分支对思路的影响

/**
 * 用来标记某个线程在上一次循环中找到的数组下标是否已经有Cell对象了
 * 如果为true,则表示数组下标为空
 * 在主逻辑的循环中会用到
 */
boolean collide = false;
/**
 * 死循环,提供自旋操作
 */
for (; ; ) {
    Cell[] as;
    Cell a;
    int n;//cell数组长度
    long v;//需要被累积的值
    /**
     * 如果cells数组不为空,且已经被某个线程初始化成功,那么就会进入主逻辑,这个后面详细解释
     */
    if ((as = cells) != null && (n = as.length) > 0) {
        ...
        /**
         * 如果数组为空,那么就需要初始化一个Cell数组
         * cellsBusy用来标记cells数组是否能被操作,作用相当于一个锁
         * cells == as 判断是否有其他线程在当前线程进入这个判断之前已经初始化了一个数组
         * casCellsBusy 用一个cas操作给cellsBusy字段赋值为1,如果成功可以认为拿到了操作cells数组的锁
         */
    } else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
        /**
         * 这里就是初始化一个数组,不解释了
         */
        boolean init = false;
        try {
            if (cells == as) {
                Cell[] rs = new Cell[2];
                rs[h & 1] = new Cell(x);
                cells = rs;
                init = true;
            }
        } finally {
            cellsBusy = 0;
        }
        if (init)
            break;
        /**
         * 如果当前数组是空的,又没有竞争过其他线程
         * 那么就再次尝试去给base赋值
         * 如果又没竞争过(感觉有点可怜),那么就自旋
         * 另外提一下方法签名中的LongBinaryOperator对象就是用在这里的,不影响逻辑
         */
    } else if (casBase(v = base, ((fn == null) ? v + x :
            fn.applyAsLong(v, x))))
        break;                          // Fall back on using base
}

接着就看对cell数组元素进行累加的主逻辑

/**
 * 如果cells数组不为空,且已经被某个线程初始化成功,进入主逻辑
 */
if ((as = cells) != null && (n = as.length) > 0) {
    /**
     * 如果当前线程的hash值对应的数组元素为空
     */
    if ((a = as[(n - 1) & h]) == null) {
        /**
         * Cell数组并未被其他线程操作
         */
        if (cellsBusy == 0) {
            /**
             * 这里没有理解作者为什么会在这里初始化单个Cell
             * 作者这里的注释是Optimistically create,如果有理解的同学可以说一下
             */
            Cell r = new Cell(x);
            /**
             * 在此判断cell锁的状态,并尝试加锁
             */
            if (cellsBusy == 0 && casCellsBusy()) {
                boolean created = false;
                try {
                    /**
                     * 这里对数组是否为空等状态再次进行校验
                     * 如果校验通过,那么就将之前new的Cell对象放到Cell数组的该下标处
                     */
                    Cell[] rs;
                    int m, j;
                    if ((rs = cells) != null &&
                            (m = rs.length) > 0 &&
                            rs[j = (m - 1) & h] == null) {
                        rs[j] = r;
                        created = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                /**
                 * 如果创建成功,就说明累加成功,直接退出循环
                 */
                if (created)
                    break;
                /**
                 * 走到这里说明在判空和拿到锁之间正好有其他线程在该下标处创建了一个Cell
                 * 因此直接continue,不rehash,下次就不会进入到该分支了
                 */
                continue;
            }
        }
        /**
         * 当执行到这里的时候,因为是在 if ((a = as[(n - 1) & h]) == null) 这个判断逻辑中
         * 就说明在第一个if判断的时候该下标处没有元素,所以赋值为false
         * collide的意义是:上一次循环中找到的数组下标是否已经有Cell对象了
         * True if last slot nonempty
         */
        collide = false;
    /**
     * 这个字段如果为false,说明之前已经和其他线程发过了竞争
     * 即使此时可以直接取尝试cas操作,但是在高并发场景下
     * 这2个线程之后依然可能发生竞争,而每次竞争都需要自旋的话会很浪费cpu资源
     * 因此在这里先直接增加自旋一次,在for的最后会做一次rehash
     * 使得线程尽快地找到自己独占的数组下标
     */
    } else if (!wasUncontended)
        wasUncontended = true;
    /**
     * 尝试给hash对应的Cell累加,如果这一步成功了,那么就返回
     * 如果这一步依然失败了,说明此时整体的并发竞争非常激烈
     * 那就可能需要考虑扩容数组了
     * (因为数组初始化容量为2,如果此时有10个线程在并发运行,那就很难避免竞争的发生了)
     */
    else if (a.cas(v = a.value, ((fn == null) ? v + x :
            fn.applyAsLong(v, x))))
        break;
    /**
     * 这里判断下cpu的核数,因为即使有100个线程
     * 能同时并行运行的线程数等于cpu数
     * 因此如果数组的长度已经大于cpu数目了,那就不应当再扩容了
     */
    else if (n >= NCPU || cells != as)
        collide = false;
    /**
     * 走到这里,说明当前循环中根据线程hash值找到的数组下标已经有元素了
     * 如果此时collide为false,说明上一次循环中找到的下边是没有元素的
     * 那么就自旋一次并rehash
     * 如果再次运行到这里,并且collide为true,就说明明竞争非常激烈,应当扩容了
     */
    else if (!collide)
        collide = true;
    /**
     * 能运行到这里,说明需要扩容数组了
     * 判断锁状态并尝试获取锁
     */
    else if (cellsBusy == 0 && casCellsBusy()) {
        /**
         * 扩容数组的逻辑,这个扩容比较简单,就不解释了
         * 扩容大小为2倍
         */
        try {
            if (cells == as) {
                Cell[] rs = new Cell[n << 1];
                for (int i = 0; i < n; ++i)
                    rs[i] = as[i];
                cells = rs;
            }
        } finally {
            cellsBusy = 0;
        }
        collide = false;
        /**
        * 这里直接continue,因为扩容过了,就先不rehash了
        */
        continue;
    }
    /**
     * 做一个rehash,使得线程在下一个循环中可能找到独占的数组下标
     */
    h = advanceProbe(h);
}

到这里LongAdder的源码其实就分析结束了,其实代码并不多,但是他的思想非常值得我们去学习。

五、与AtomicInteger的比较

光分析源码其实还差一些感觉,我们还没有搞懂为何作者要在已经有AtomicInteger的情况下,再设计这么一个看上去非常复杂的类。

那么首先我们先分析下AtomicInteger保证线程安全的原理

查看最基本的getAndIncrement方法

public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}

调用了Unsafe类的getAndAddInt方法,继续往下看

public final int getAndAddInt(Object var1, long var2, int var4) {
    int var5;
    do {
        var5 = this.getIntVolatile(var1, var2);
    } while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));

    return var5;
}

这里我们不再深究getIntVolatile和compareAndSwapInt方法具体实现,因为其已经是native的方法了

可以看到,AtomicInteger底层是使用了cas+自旋的方式解决原子性问题的,即如果一次赋值不成功,那么就自旋,直到赋值成功为止

那么由此可以推断,当出现大量线程并发,竞争非常激烈的时候,AtomicInteger就有可能导致有些线程不断地竞争失败,不断自旋从而影响任务的吞吐量

为了解决高并发下的自旋问题,LongAdder的作者在设计的时候就通过增加一个数组的方式,使得竞争的对象从一个值变成多个值,从而使得发生竞争的频率降低,从而缓解了自旋的问题,当然付出的代价就是额外的存储空间。

最后我简单做了个测试,比较2种计数方法的耗时

通过原理可知,只有当线程竞争非常激烈的时候,LongAdder的优势才会比较明显,因此这里我用了100个线程,每一个线程对同一个数累加1000000次,得到结果如下,差距非常巨大,达到15倍!

LongAdder耗时:104292242nanos

AtomicInteger耗时:1583294474nanos

当然这只是一个简单测试,包含了很多随机性,有兴趣的同学可以尝试不同的竞争程度多次测试

六、思想的抽象

最后我们需要将作者的具体代码和实现逻辑抽象一下,理清思考的过程

1)AtomicInteger遇到的问题:单个资源的竞争导致自旋的发生

2)解决的思路:将单个对象的竞争扩展为多个对象的竞争(有那么一些分治的思想)

3)扩展的可控性:多个竞争对象需要付出额外的存储空间,因此不能无脑地扩展(极端情况是一个线程一个计数的对象,这明显不合理)

4)问题的分层:因为使用类的时候的场景是不可控的,因此需要根据并发的激烈程度动态地扩展额外的存储空间(类似于synchronized的膨胀)

5)3个分层策略:当不发生竞争时,那么用一个值累加即可;当发生一定程度的竞争时,创建一个容量为2的数组,使得竞争的资源扩展为3个;当竞争更加激烈时,则继续扩展数组(对应图解中的1个线程到4个线程的过程)

6)策略细节:在自旋的时候增加rehash,此时虽然付出了一定的运算时间计算hash、比较数组对象等,但是这会使得并发的线程尽快地找到专属于自己的对象,在之后就不会再发生任何竞争(磨刀不误砍柴工,特别注意wasUncontended字段的相关注解)

以上就是分析并发编程之LongAdder原理的详细内容,更多关于并发编程 LongAdder的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java并发工具类LongAdder原理实例解析

    LongAdder实现原理图 高并发下N多线程同时去操作一个变量会造成大量线程CAS失败,然后处于自旋状态,导致严重浪费CPU资源,降低了并发性.既然AtomicLong性能问题是由于过多线程同时去竞争同一个变量的更新而降低的,那么如果把一个变量分解为多个变量,让同样多的线程去竞争多个资源. LongAdder则是内部维护一个Cells数组,每个Cell里面有一个初始值为0的long型变量,在同等并发量的情况下,争夺单个变量的线程会减少,这是变相的减少了争夺共享资源的并发量,另外多个线程在争夺同

  • JDK8中新增的原子性操作类LongAdder详解

    前言 本文主要给大家介绍了关于JDK8新增的原子性操作类LongAdder的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍: LongAdder简单介绍 LongAdder类似于AtomicLong是原子性递增或者递减类,AtomicLong已经通过CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器来说性能已经很好了,但是JDK开发组并不满足,因为在非常高的并发请求下AtomicLong的性能不能让他们接受,虽然AtomicLong使用CAS但是CAS失败后还是通过

  • 一篇带你解析入门LongAdder源码

    1.LongAdder由来 LongAdder类是JDK1.8新增的一个原子性操作类.AtomicLong通过CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器来说性能已经很好了,但是JDK开发组并不满足于此,因为经常搞并发的请求下AtomicLong的性能是不能让人接受的. 如下AtomicLong 的incrementAndGet的代码,虽然AtomicLong使用CAS算法,但是CAS失败后还是通过无限循环的自旋锁不多的尝试,这就是高并发下CAS性能低下的原因所在.源码如下: p

  • 分析并发编程之LongAdder原理

    目录 一.前言 二.LongAdder类的使用 三.LongAdder原理的直观理解 四.源码分析 五.与AtomicInteger的比较 六.思想的抽象 一.前言 ConcurrentHashMap的源码采用了一种比较独特的方式对map中的元素数量进行统计,自然是要好好研究一下其原理思想,同时也能更好地理解ConcurrentHashMap本身. 本文主要思路分为以下5个部分: 1.计数的使用效果 2.原理的直观图解 3.源码的细节分析 4.与AtomicInteger的比较 5.思想的抽象

  • Java并发编程之ReentrantLock实现原理及源码剖析

    目录 一.ReentrantLock简介 二.ReentrantLock使用 三.ReentrantLock源码分析 1.非公平锁源码分析 2.公平锁源码分析 前面<Java并发编程之JUC并发核心AQS同步队列原理剖析>介绍了AQS的同步等待队列的实现原理及源码分析,这节我们将介绍一下基于AQS实现的ReentranLock的应用.特性.实现原理及源码分析. 一.ReentrantLock简介 ReentrantLock位于Java的juc包里面,从JDK1.5开始出现,是基于AQS同步队列

  • Java并发编程之JUC并发核心AQS同步队列原理剖析

    目录 一.AQS介绍 二.AQS中的队列 1.同步等待队列 2.条件等待队列 3.AQS队列节点Node 三.同步队列源码分析 1.同步队列分析 2.同步队列--独占模式源码分析 3.同步队列--共享模式源码分析 一.AQS介绍 队列同步器AbstractQueuedSynchronizer(简称AQS),AQS定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其他同步组件的基础框架,是一个依赖状态(state)的同步器.Java并发编程的核心在java.util.concurrent(

  • Java并发编程之Condition源码分析(推荐)

    Condition介绍 上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充.ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活".Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现. 常用方法 Condition中主要的

  • Java并发编程之Volatile变量详解分析

    目录 一.volatile变量的特性 1.1.保证可见性,不保证原子性 1.2.禁止指令重排 二.内存屏障 三.happens-before Volatile关键字是Java提供的一种轻量级的同步机制.Java 语言包含两种内在的同步机制:同步块(或方法)和 volatile 变量, 相比synchronized(synchronized通常称为重量级锁),volatile更轻量级,因为它不会引起线程上下文的切换和调度. 但是volatile 变量的同步性较差(有时它更简单并且开销更低),而且其

  • 深入理解Java并发编程之ThreadLocal

    目录 ThreadLocal简介 ThreadLocal源码解析 实现原理 ThreadLocalMap源码分析 InheritableThreadLocal 参考资料 ThreadLocal简介 变量值的共享可以使用public static的形式,所有线程都使用同一个变量,如果想实现每一个线程都有自己的共享变量该如何实现呢?JDK中的ThreadLocal类正是为了解决这样的问题. ThreadLocal类并不是用来解决多线程环境下的共享变量问题,而是用来提供线程内部的共享变量,在多线程环境

  • 浅谈Java并发编程之Lock锁和条件变量

    简单使用Lock锁 Java 5中引入了新的锁机制--java.util.concurrent.locks中的显式的互斥锁:Lock接口,它提供了比synchronized更加广泛的锁定操作.Lock接口有3个实现它的类:ReentrantLock.ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁.读锁和写锁.lock必须被显式地创建.锁定和释放,为了可以使用更多的功能,一般用ReentrantLock为其实例

  • Java并发编程之ReentrantLock可重入锁的实例代码

    目录 1.ReentrantLock可重入锁概述2.可重入3.可打断4.锁超时5.公平锁6.条件变量 Condition 1.ReentrantLock可重入锁概述 相对于 synchronized 它具备如下特点 可中断 synchronized锁加上去不能中断,a线程应用锁,b线程不能取消掉它 可以设置超时时间 synchronized它去获取锁时,如果对方持有锁,那么它就会进入entryList一直等待下去.而可重入锁可以设置超时时间,规定时间内如果获取不到锁,就放弃锁 可以设置为公平锁

  • Java并发编程之CountDownLatch源码解析

    一.前言 CountDownLatch维护了一个计数器(还是是state字段),调用countDown方法会将计数器减1,调用await方法会阻塞线程直到计数器变为0.可以用于实现一个线程等待所有子线程任务完成之后再继续执行的逻辑,也可以实现类似简易CyclicBarrier的功能,达到让多个线程等待同时开始执行某一段逻辑目的. 二.使用 一个线程等待其它线程执行完再继续执行 ...... CountDownLatch cdl = new CountDownLatch(10); Executor

  • Java并发编程之LockSupport类详解

    一.LockSupport类的属性 private static final sun.misc.Unsafe UNSAFE; // 表示内存偏移地址 private static final long parkBlockerOffset; // 表示内存偏移地址 private static final long SEED; // 表示内存偏移地址 private static final long PROBE; // 表示内存偏移地址 private static final long SEC

随机推荐