Java并发编程之CountDownLatch源码解析

一、前言

CountDownLatch维护了一个计数器(还是是state字段),调用countDown方法会将计数器减1,调用await方法会阻塞线程直到计数器变为0。可以用于实现一个线程等待所有子线程任务完成之后再继续执行的逻辑,也可以实现类似简易CyclicBarrier的功能,达到让多个线程等待同时开始执行某一段逻辑目的。

二、使用

  • 一个线程等待其它线程执行完再继续执行
	......
	CountDownLatch cdl = new CountDownLatch(10);
	ExecutorService es = Executors.newFixedThreadPool(10);
	for (int i = 0; i < 10; i++) {
		es.execute(() -> {
			//do something
			cdl.countDown();
		});
	}
	cdl.await();
	......
  • 实现类似CyclicBarrier的功能,先await,再countDown
	......
        CountDownLatch cdl = new CountDownLatch(1);
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            es.execute(() -> {
                cdl.await();
                //do something
            });
        }
        Thread.sleep(10000L);
        cdl.countDown();
        ......

三、源码分析

CountDownLatch的结构和ReentrantLock、Semaphore的结构类似,也是使用的内部类Sync继承AQS的方式,并且重写了tryAcquireShared和tryReleaseShared方法。

还是首先来看构造函数:

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

需要传入一个大于0的count,代表CountDownLatch计数器的初始值,通过Sync的构造函数最终赋值给父类AQS的state字段。可一个看到这个state字段用法多多,在ReentrantLock中使用0和1来标识锁的状态,Semaphore中用来标识信号量,此处又用来表示计数器。

CountDownLatch要通过await方法完成阻塞,先来看看这个方法是如何实现的:

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

调用的是sync的acquireSharedInterruptibly方法,该方法定义在AQS中,Semaphore也调用的这个方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

这个方法的逻辑前面在解析SemaPhore的时候细说过了,这里不再赘述,主要就是两个方法的调用,先通过tryAcquireShared方法尝试获取"许可",返回值代表此次获取后的剩余量,如果大于等于0表示获取成功,否则表示失败。如果失败,那么就会进入doAcquireSharedInterruptibly方法执行入队阻塞的逻辑。这里我们主要到CountDownLatch中看看tryAcquireShared方法的实现:

protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

和Semaphore的实现中每次将state减去requires不同,这里直接判断state是否为0,如果为0那么返回1,表示获取"许可"成功;如果不为0,表示失败,则需要入队阻塞。从这个tryAcquireShared方法就能看出CountDownLatch的逻辑了:等到state变为了0,那么所有线程都能获取运行许可。

那么我们接下来来到countDown方法:

public void countDown() {
        sync.releaseShared(1);
    }

调用的是sync的releaseShared方法,该方法定义在父类AQS中,Semaphore使用的也是这个方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
        	//当state从非
            doReleaseShared();
            return true;
        }
        return false;
    }

前面提到了CountDownLatch也重写了tryReleaseShared方法:

protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                	//如果state等于0了直接返回false
                	//保证在并发情况下,最多只会有一个线程返回true
                	//也包括调用countDown的次数超过state的初始值
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                	//如果返回true,表示state从非0变为了0
                	//那么后续需要唤醒阻塞线程
                    return nextc == 0;
            }
        }

Semaphore在释放信号量的时候,是将获取的许可归还到state中,但是CountDownLatch没有获取许可的逻辑(获取许可的时候是判断state是否等于0),所以在countDown的时候没有释放的逻辑,就是将state减1,然后根据state减1之后的值是否为0判断release是否成功,如果state本来大于0,经过减1之后变为了0,那么返回true。tryReleaseShared方法的返回值决定了后续需不需要调用doReleaseShared方法唤醒阻塞线程。

这里有个逻辑:如果state已经为0,那么返回false。这个主要应对两种情况:

  • 调用countDown的次数超过了state的初始值多
  • 线程并发调用的时候保证只有一个线程去完成阻塞线程的唤醒操作

可以看到CountDownLatch没有锁的概念,countDown方法可以被一个线程重复调用,只需要对state做reduce操作,而不用关心是谁做的reduce。如果tryReleaseShared返回true,那么表示需要在后面进入doReleaseShared方法,该方法和Semaphore中调用的方法是同一个,主要是唤醒阻塞线程或者设置PROPAGAGE状态,这里也不再赘述~

阻塞线程被唤醒之后,会在doAcquireSharedInterruptibly方法中继续循环,虽然和Semaphore调用的是同样的方法,但是这里有不一样的地方,所以还是提一句。我们首先回到doAcquireSharedInterruptibly方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                	//如果head.next被unpark唤醒,说明此时state==0
                	//那么tryAcquireShared会返回1
                    int r = tryAcquireShared(arg);
                    //r==1
                    if (r >= 0) {
                    	//node节点被唤醒后,还会继续唤醒node.next
                    	//这样依次传递,因为在这里的r一定为1
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

当head.next线程被unpark唤醒后,会进入tryAcquireShared方法判断,由于此时state已经为0(只有当state变为0时,才会unpark唤醒线程),而前面提到了在CountDownLatch重写的tryAcquireShared中,如果state==0,那么会返回1,所以会进入setHeadAndPropagate方法:

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

该方法在Semaphore中详细介绍过,这里我们就站在CountDownLatch的角度来看看。其实很简单了,注意此时该方法的propagate参数值是1,那么就会进入到下面的if逻辑里,继续唤醒下一个node。当下一个node对应的线程被唤醒后,同样会进入setHeadAndPropagate方法,propagage同样为1,那么继续唤醒下一个node,就这样依次将整个CLH队列的节点都唤醒。

四、总结

如果单独把CountDownLatch拿出来看其实是很复杂的,只是CountDownLatch(包括Semaphore和ReentrantLock)都高度共用了AQS提供的一些方法,而这些方法在前面介绍Semaphore和ReentrantLock的时候已经详细分析过,所以到本文分析CountDownLatch的时候,只需要关注它内部类Sync重写的两个方法:tryAcquireShared和tryReleaseShared,也就是"获取许可"和"释放许可"的逻辑。

CountDownLatch在await的逻辑里,如果当前state的值大于0,那么会进入CLH队列进行阻塞等待unpark唤醒(或者中断唤醒);在countDown的逻辑里,就是简单的将state-1,如果一个线程把state从1减为0,那么该线程就会负责唤醒head.next节点,head.next节点被唤醒后,又会在setHeadAndPropagate方法中唤醒next.next节点,这样依次唤醒所有CLH队列中的阻塞节点。当然,如果线程被中断唤醒,那么也会进入cancelAcquire中进行无效节点的移除逻辑。

到此这篇关于Java并发编程之CountDownLatch源码解析的文章就介绍到这了,更多相关Java中CountDownLatch源码解析内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java并发编程专题(八)----(JUC)实例讲解CountDownLatch

    CountDownLatch 是一个非常实用的多线程控制工具类." Count Down " 在英文中意为倒计数, Latch 为门问的意思.如果翻译成为倒计数门阀, 我想大家都会觉得不知所云吧! 因此,这里简单地称之为倒计数器.在这里, 门问的含义是:把门锁起来,不让里面的线程跑出来.因此,这个工具通常用来控制线程等待,它可以让某一个线程等待直到倒计时结束, 再开始执行. CountDown Latch 的构造函数接收一个整数作为参数,即当前这个计数器的计数个数. public Co

  • JAVA多线程CountDownLatch使用详解

    前序: 上周测试给开发的同事所开发的模块提出了一个bug,并且还是偶现. 经过仔细查看代码,发现是在业务中启用了多线程,2个线程同时跑,但是新启动的2个线程必须保证一个完成之后另一个再继续运行,才能消除bug. 什么时候用? 多线程是在很多地方都会用到的,但是我们如果想要实现在某个特定的线程运行完之后,再启动另外一个线程呢,这个时候CountDownLatch就可以派上用场了 怎么用? 先看看普通的多线程代码: package code; public class MyThread extend

  • JAVA CountDownLatch(倒计时计数器)用法实例

    这篇文章主要介绍了JAVA CountDownLatch(倒计时计数器)用法实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 方法说明: public void countDown() 递减锁存器的计数,如果计数到达零,则释放所有等待的线程.如果当前计数大于零,则将计数减少.如果新的计数为零,出于线程调度目的,将重新启用所有的等待线程. 如果当前计数等于零,则不发生任何操作. public boolean await(long timeout

  • Java骚操作之CountDownLatch代码详解

    简述 用来干嘛的?当你在方法中调用了多个线程,对数据库进行了一些不为人知的操作后,还有一个操作需要留到前者都执行完的重头戏,就需要用到 CountDownLatch 了 实践代码 package com.github.gleans; import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { public static void main(String[] args) throws Interrupt

  • JAVA CountDownLatch与thread-join()的区别解析

    今天学习CountDownLatch这个类,作用感觉和join很像,然后就百度了一下,看了他们之间的区别.所以在此记录一下. 首先来看一下join,在当前线程中,如果调用某个thread的join方法,那么当前线程就会被阻塞,直到thread线程执行完毕,当前线程才能继续执行.join的原理是,不断的检查thread是否存活,如果存活,那么让当前线程一直wait,直到thread线程终止,线程的this.notifyAll 就会被调用. 我们来看一下这个应用场景:假设现在公司有三个员工A,B,C

  • Java线程并发工具类CountDownLatch原理及用法

    一.CountDownLatch [1]CountDownLatch是什么? CountDownLatch,英文翻译为倒计时锁存器,是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或 多个线程一直等待. 闭锁可以延迟线程的进度直到其到达终止状态,闭锁可以用来确保某些活动直到其他活动都完成才继续执行: 确保某个计算在其需要的所有资源都被初始化之后才继续执行; 确保某个服务在其依赖的所有其他服务都已经启动之后才启动; 等待直到某个操作所有参与者都准备就绪再继续执行: CountD

  • java多线程CountDownLatch与线程池ThreadPoolExecutor/ExecutorService案例

    1.CountDownLatch: 一个同步工具类,它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行. 2.ThreadPoolExecutor/ExecutorService: 线程池,使用线程池可以复用线程,降低频繁创建线程造成的性能消耗,同时对线程的创建.启动.停止.销毁等操作更简便. 3.使用场景举例: 年末公司组织团建,要求每一位员工周六上午8点到公司门口集合,统一乘坐公司所租大巴前往目的地. 在这个案例中,公司作为主线程,员工作为子线程. 4.代码示例: package

  • 详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    前言 CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用.但两者就其内部实现和使用场景而言是各有所侧重的. 内部实现差异 前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性. public class { //Synchronization control For CountDownLatch. //Uses

  • java利用CountDownLatch实现并行计算

    本文实例为大家分享了利用CountDownLatch实现并行计算的具体代码,供大家参考,具体内容如下 import java.util.concurrent.CountDownLatch; /** * @Author pipi * @Date 2018/10/15 13:56 **/ public class ParallelComputing { private int[] nums; private String[] info; private CountDownLatch countDow

  • 浅谈java并发之计数器CountDownLatch

    CountDownLatch简介 CountDownLatch顾名思义,count + down + latch = 计数 + 减 + 门闩(这么拆分也是便于记忆=_=) 可以理解这个东西就是个计数器,只能减不能加,同时它还有个门闩的作用,当计数器不为0时,门闩是锁着的:当计数器减到0时,门闩就打开了. 如果你感到懵比的话,可以类比考生考试交卷,考生交一份试卷,计数器就减一.直到考生都交了试卷(计数器为0),监考老师(一个或多个)才能离开考场.至于考生是否做完试卷,监考老师并不关注.只要都交了试

随机推荐