CountDownLatch和Atomic原子操作类源码解析

目录
  • 引导语
  • 1、CountDownLatch
    • 1.1、await
    • 1.2、countDown
  • 1.3、示例
  • 2、Atomic原子操作类
  • 3、总结

引导语

本小节和大家一起来看看 CountDownLatch 和 Atomic 打头的原子操作类,CountDownLatch 的源码非常少,看起来比较简单,但 CountDownLatch 的实际应用却不是很容易;Atomic 原子操作类就比较好理解和应用,接下来我们分别来看一下。

1、CountDownLatch

CountDownLatch 中文有的叫做计数器,也有翻译为计数锁,其最大的作用不是为了加锁,而是通过计数达到等待的功能,主要有两种形式的等待:

  • 让一组线程在全部启动完成之后,再一起执行(先启动的线程需要阻塞等待后启动的线程,直到一组线程全部都启动完成后,再一起执行);
  • 主线程等待另外一组线程都执行完成之后,再继续执行。

我们会举一个示例来演示这两种情况,但在这之前,我们先来看看 CountDownLatch 的底层源码实现,这样就会清晰一点,不然一开始就来看示例,估计很难理解。

CountDownLatch 有两个比较重要的 API,分别是 await 和 countDown,管理着线程能否获得锁和锁的释放(也可以称为对 state 的计数增加和减少)。

1.1、await

await 我们可以叫做等待,也可以叫做加锁,有两种不同入参的方法,源码如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
// 带有超时时间的,最终都会转化成毫秒
public boolean await(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

两个方法底层使用的都是 sync,sync 是一个同步器,是 CountDownLatch 的内部类实现的,如下:

private static final class Sync extends AbstractQueuedSynchronizer {}

可以看出来 Sync 继承了 AbstractQueuedSynchronizer,具备了同步器的通用功能。

无参 await 底层使用的是 acquireSharedInterruptibly 方法,有参的使用的是 tryAcquireSharedNanos 方法,这两个方法都是 AQS 的方法,底层实现很相似,主要分成两步:

1.使用子类的 tryAcquireShared 方法尝试获得锁,如果获取了锁直接返回,获取不到锁走 2;

2.获取不到锁,用 Node 封装一下当前线程,追加到同步队列的尾部,等待在合适的时机去获得锁。

第二步是 AQS 已经实现了,第一步 tryAcquireShared 方法是交给 Sync 实现的,源码如下:

// 如果当前同步器的状态是 0 的话,表示可获得锁
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

获得锁的代码也很简单,直接根据同步器的 state 字段来进行判断,但还是有两点需要注意一下:

获得锁时,state 的值不会发生变化,像 ReentrantLock 在获得锁时,会把 state + 1,但 CountDownLatch 不会;

CountDownLatch 的 state 并不是 AQS 的默认值 0,而是可以赋值的,是在 CountDownLatch 初始化的时候赋值的,

代码如下:

// 初始化,count 代表 state 的初始化值
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    // new Sync 底层代码是 state = count;
    this.sync = new Sync(count);
}

这里的初始化的 count 和一般的锁意义不太一样,count 表示我们希望等待的线程数,在两种不同的等待场景中,count 有不同的含义:

让一组线程在全部启动完成之后,再一起执行的等待场景下, count 代表一组线程的个数;

主线程等待另外一组线程都执行完成之后,再继续执行的等待场景下,count 代表一组线程的个数。

所以我们可以把 count 看做我们希望等待的一组线程的个数,可能我们是等待一组线程全部启动完成,可能我们是等待一组线程全部执行完成。

1.2、countDown

countDown 中文翻译为倒计时,每调用一次,都会使 state 减一,底层调用的方法如下:

public void countDown() {
    sync.releaseShared(1);
}

releaseShared 是 AQS 定义的方法,方法主要分成两步:

1.尝试释放锁(tryReleaseShared),锁释放失败直接返回,释放成功走2

2.释放当前节点的后置等待节点。

第二步 AQS 已经实现了,第一步是 Sync 实现的,我们一起来看下 tryReleaseShared 方法的实现源码:

// 对 state 进行递减,直到 state 变成 0;
// state 递减为 0 时,返回 true,其余返回 false
protected boolean tryReleaseShared(int releases) {
    // 自旋保证 CAS 一定可以成功
    for (;;) {
        int c = getState();
        // state 已经是 0 了,直接返回 false
        if (c == 0)
            return false;
        // 对 state 进行递减
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

从源码中可以看到,只有到 count 递减到 0 时,countDown 才会返回 true。

1.3、示例

看完 CountDownLatch 两个重要 API 后,我们来实现文章开头说的两个功能:

让一组线程在全部启动完成之后,再一起执行;

主线程等待另外一组线程都执行完成之后,再继续执行。

代码在 CountDownLatchDemo 类中,大家可以调试看看,源码如下:

public class CountDownLatchDemo {

  // 线程任务
  class Worker implements Runnable {
    // 定义计数锁用来实现功能 1
    private final CountDownLatch startSignal;
    // 定义计数锁用来实现功能 2
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
      this.startSignal = startSignal;
      this.doneSignal = doneSignal;
    }
		// 子线程做的事情
    public void run() {
      try {
        System.out.println(Thread.currentThread().getName()+" begin");
        // await 时有两点需要注意:await 时 state 不会发生变化,2:startSignal 的state初始化是 1,所以所有子线程都是获取不到锁的,都需要到同步队列中去等待,达到先启动的子线程等待后面启动的子线程的结果
        startSignal.await();
        doWork();
        // countDown 每次会使 state 减一,doneSignal 初始化为 9,countDown 前 8 次执行都会返回 false (releaseShared 方法),执行第 9 次时,state 递减为 0,会 countDown 成功,表示所有子线程都执行完了,会释放 await 在 doneSignal 上的主线程
        doneSignal.countDown();
        System.out.println(Thread.currentThread().getName()+" end");
      } catch (InterruptedException ex) {
      } // return;
    }

    void doWork() throws InterruptedException {
      System.out.println(Thread.currentThread().getName()+"sleep 5s …………");
      Thread.sleep(5000l);
    }
  }

  @Test
  public void test() throws InterruptedException {
    // state 初始化为 1 很关键,子线程是不断的 await,await 时 state 是不会变化的,并且发现 state 都是 1,所有线程都获取不到锁
    // 造成所有线程都到同步队列中去等待,当主线程执行 countDown 时,就会一起把等待的线程给释放掉
    CountDownLatch startSignal = new CountDownLatch(1);
    // state 初始化成 9,表示有 9 个子线程执行完成之后,会唤醒主线程
    CountDownLatch doneSignal = new CountDownLatch(9);

    for (int i = 0; i < 9; ++i) // create and start threads
    {
      new Thread(new Worker(startSignal, doneSignal)).start();
    }
    System.out.println("main thread begin");
    // 这行代码唤醒 9 个子线程,开始执行(因为 startSignal 锁的状态是 1,所以调用一次 countDown 方法就可以释放9个等待的子线程)
    startSignal.countDown();
    // 这行代码使主线程陷入沉睡,等待 9 个子线程执行完成之后才会继续执行(就是等待子线程执行 doneSignal.countDown())
    doneSignal.await();
    System.out.println("main thread end");
  }
}
执行结果:
Thread-0 begin
Thread-1 begin
Thread-2 begin
Thread-3 begin
Thread-4 begin
Thread-5 begin
Thread-6 begin
Thread-7 begin
Thread-8 begin
main thread begin
Thread-0sleep 5s …………
Thread-1sleep 5s …………
Thread-4sleep 5s …………
Thread-3sleep 5s …………
Thread-2sleep 5s …………
Thread-8sleep 5s …………
Thread-7sleep 5s …………
Thread-6sleep 5s …………
Thread-5sleep 5s …………
Thread-0 end
Thread-1 end
Thread-4 end
Thread-3 end
Thread-2 end
Thread-8 end
Thread-7 end
Thread-6 end
Thread-5 end
main thread end

从执行结果中,可以看出已经实现了以上两个功能,实现比较绕,大家可以根据注释,debug 看一看。

2、Atomic 原子操作类

Atomic 打头的原子操作类有很多,涉及到 Java 常用的数字类型的,基本都有相应的 Atomic 原子操作类,如下图所示:

Atomic 打头的原子操作类,在高并发场景下,都是线程安全的,我们可以放心使用。

我们以 AtomicInteger 为例子,来看下主要的底层实现:

private volatile int value;
// 初始化
public AtomicInteger(int initialValue) {
    value = initialValue;
}
// 得到当前值
public final int get() {
    return value;
}
// 自增 1,并返回自增之前的值
public final int getAndIncrement() {
    return unsafe.getAndAddInt(this, valueOffset, 1);
}
// 自减 1,并返回自增之前的值
public final int getAndDecrement() {
    return unsafe.getAndAddInt(this, valueOffset, -1);
}

从源码中,我们可以看到,线程安全的操作方法,底层都是使用 unsafe 方法实现,以上几个 unsafe 方法不是使用 Java 实现的,都是线程安全的。

AtomicInteger 是对 int 类型的值进行自增自减,那如果 Atomic 的对象是个自定义类怎么办呢,Java 也提供了自定义对象的原子操作类,叫做 AtomicReference。AtomicReference 类可操作的对象是个泛型,所以支持自定义类,其底层是没有自增方法的,操作的方法可以作为函数入参传递,源码如下:

// 对 x 执行 accumulatorFunction 操作
// accumulatorFunction 是个函数,可以自定义想做的事情
// 返回老值
public final V getAndAccumulate(V x,
                                BinaryOperator<V> accumulatorFunction) {
    // prev 是老值,next 是新值
    V prev, next;
    // 自旋 + CAS 保证一定可以替换老值
    do {
        prev = get();
        // 执行自定义操作
        next = accumulatorFunction.apply(prev, x);
    } while (!compareAndSet(prev, next));
    return prev;
}

3、总结

CountDownLatch 的源码实现简单,但真的要用好还是不简单的,其使用场景比较复杂,建议同学们可以 debug 一下

CountDownLatchDemo,在增加实战能力基础上,增加底层的理解能力。

以上就是CountDownLatch和Atomic原子操作类源码解析的详细内容,更多关于CountDownLatch和Atomic原子操作类的资料请关注我们其它相关文章!

(0)

相关推荐

  • JAVA CountDownLatch(倒计时计数器)用法实例

    这篇文章主要介绍了JAVA CountDownLatch(倒计时计数器)用法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 方法说明: public void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程.如果当前计数大于零,则将计数减少.如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程. 如果当前计数等于零,则不发生任何操作. public boolean await(long timeout

  • java并发学习-CountDownLatch实现原理全面讲解

    CountDownLatch在多线程并发编程中充当一个计时器的功能,并且维护一个count的变量,并且其操作都是原子操作. 如下图,内部有下static final的Sync类继承自AQS. 该类主要通过countDown()和await()两个方法实现功能的,首先通过建立CountDownLatch对象,并且传入参数即为count初始值. 如果一个线程调用了await()方法,那么这个线程便进入阻塞状态,并进入阻塞队列. 如果一个线程调用了countDown()方法,则会使count-1:当c

  • Java并发系列之CountDownLatch源码分析

    CountDownLatch(闭锁)是一个很有用的工具类,利用它我们可以拦截一个或多个线程使其在某个条件成熟后再执行.它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的初始值必须大于0.另外它还提供了一个countDown方法来操作计数器的值,每调用一次countDown方法计数器都会减1,直到计数器的值减为0时就代表条件已成熟,所有因调用await方法而阻塞的线程都会被唤醒.这就是CountDownLatch的内部机制,看起来很简单,无非就是阻塞一部分线程让其在达到某个条

  • CountDownLatch源码解析之await()

    CountDownLatch 源码解析-- await(),具体内容如下 上一篇文章说了一下CountDownLatch的使用方法.这篇文章就从源码层面说一下await() 的原理. 我们已经知道await 能够让当前线程处于阻塞状态,直到锁存器计数为零(或者线程中断). 下面是它的源码. end.await(); ↓ public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } s

  • Java并发编程之CountDownLatch源码解析

    一.前言 CountDownLatch维护了一个计数器(还是是state字段),调用countDown方法会将计数器减1,调用await方法会阻塞线程直到计数器变为0.可以用于实现一个线程等待所有子线程任务完成之后再继续执行的逻辑,也可以实现类似简易CyclicBarrier的功能,达到让多个线程等待同时开始执行某一段逻辑目的. 二.使用 一个线程等待其它线程执行完再继续执行 ...... CountDownLatch cdl = new CountDownLatch(10); Executor

  • CountDownLatch和Atomic原子操作类源码解析

    目录 引导语 1.CountDownLatch 1.1.await 1.2.countDown 1.3.示例 2.Atomic原子操作类 3.总结 引导语 本小节和大家一起来看看 CountDownLatch 和 Atomic 打头的原子操作类,CountDownLatch 的源码非常少,看起来比较简单,但 CountDownLatch 的实际应用却不是很容易:Atomic 原子操作类就比较好理解和应用,接下来我们分别来看一下. 1.CountDownLatch CountDownLatch 中

  • java.lang.Void类源码解析

    在一次源码查看ThreadGroup的时候,看到一段代码,为以下: /* * @throws NullPointerException if the parent argument is {@code null} * @throws SecurityException if the current thread cannot create a * thread in the specified thread group. */ private static Void checkParentAcc

  • flutter图片组件核心类源码解析

    目录 导语 问题 Image的核心类图及其关系 网络图片的加载过程 网络图片数据的回调和展示过程 补上图片内存缓存的源码分析 如何支持图片的磁盘缓存 总结 导语 在使用flutter 自带图片组件的过程中,大家有没有考虑过flutter是如何加载一张网络图片的? 以及对自带的图片组件我们可以做些什么优化? 问题 flutter 网络图片是怎么请求的? 图片请求成功后是这么展示的? gif的每一帧是怎么支持展示的? 如何支持图片的磁盘缓存? 接下来,让我们带着问题一起探究flutter 图片组件的

  • Java并发之ReentrantLock类源码解析

    ReentrantLock内部由Sync类实例实现. Sync类定义于ReentrantLock内部. Sync继承于AbstractQueuedSynchronizer. AbstractQueuedSynchronizer继承于AbstractOwnableSynchronizer. AbstractOwnableSynchronizer类中只定义了一个exclusiveOwnerThread变量,表示当前拥有的线程. 除了Sync类,ReentrantLock内部还定义了两个实现类. No

  • Java源码解析之object类

    在源码的阅读过程中,可以了解别人实现某个功能的涉及思路,看看他们是怎么想,怎么做的.接下来,我们看看这篇Java源码解析之object的详细内容. Java基类Object java.lang.Object,Java所有类的父类,在你编写一个类的时候,若无指定父类(没有显式extends一个父类)编译器(一般编译器完成该步骤)会默认的添加Object为该类的父类(可以将该类反编译看其字节码,不过貌似Java7自带的反编译javap现在看不到了). 再说的详细点:假如类A,没有显式继承其他类,编译

  • Java源码解析之TypeVariable详解

    TypeVariable,类型变量,描述类型,表示泛指任意或相关一类类型,也可以说狭义上的泛型(泛指某一类类型),一般用大写字母作为变量,比如K.V.E等. 源码 public interface TypeVariable<D extends GenericDeclaration> extends Type { //获得泛型的上限,若未明确声明上边界则默认为Object Type[] getBounds(); //获取声明该类型变量实体(即获得类.方法或构造器名) D getGenericDe

  • CountDownLatch源码解析之countDown()

    CountDownLatch 源码解析-- countDown() 上一篇文章从源码层面说了一下CountDownLatch 中 await() 的原理.这篇文章说一下countDown() . public void countDown() { //CountDownLatch sync.releaseShared(1); } ↓ public final boolean releaseShared(int arg) { //AQS if (tryReleaseShared(arg)) { d

  • java底层AQS实现类kReentrantLock锁的构成及源码解析

    目录 引导语 1.类注释 2.类结构 3.构造器 4.Sync同步器 4.1.nonfairTryAcquire 4.2.tryRelease 5.FairSync公平锁 6.NonfairSync非公平锁 7.如何串起来 7.1lock加锁 7.2tryLock尝试加锁 7.3unlock释放锁 7.4Condition 8.总结 引导语 本章的描述思路是先描述清楚 ReentrantLock 的构成组件,然后使用加锁和释放锁的方法把这些组件串起来. 1.类注释 ReentrantLock 中

随机推荐