Java AQS中闭锁CountDownLatch的使用

目录
  • 一. 简介
  • 二. 使用
  • 三. 应用场景
  • 四. 底层原理
  • 五. CountDownLatch与Thread.join的区别

一. 简介

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。

CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。

二. 使用

构造器

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }

常用方法

 // 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };
// 会将 count 减 1,直至为 0
public void countDown() {
        sync.releaseShared(1);
    }

三. 应用场景

CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。

CountDownLatch的两种使用场景:

  • 让多个线程等待
  • 让单个线程等待

场景1 让多个线程等待:模拟并发,让并发线程一起执行

 public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        for (int i = 0; i < 5; i++) {
            new Thread(() -> {
                try {
                   //等待
                    countDownLatch.await();
                    String parter = "【" + Thread.currentThread().getName() + "】";
                    System.out.println(parter + "开始执行……");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        Thread.sleep(2000);
        countDownLatch.countDown();
    }

for循环中等待阻塞,直到执行countdown方法。

场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并。

很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check;这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。

   public static void main(String[] args) throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            final int index = i;
            new Thread(() -> {
                try {
                    Thread.sleep(1000 +
                            ThreadLocalRandom.current().nextInt(1000));
                    System.out.println(Thread.currentThread().getName()
                            + " finish task" + index);

                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
        // 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。
        countDownLatch.await();
        System.out.println("主线程:在所有任务运行完成后,进行结果汇总");
    }

四. 底层原理

底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark线程;这一步是由最后一个执行countdown方法的线程执行的。

而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。

构造方法

 public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
	    Sync(int count) {
            setState(count);
        }
 protected final void setState(int newState) {
        state = newState;
    }

阻塞

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //arg为1,不为0 ,返回-1,这里小于0
        if (tryAcquireShared(arg) < 0)
        	//入队阻塞
            doAcquireSharedInterruptibly(arg);
    }
 protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

入队阻塞

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入队,创建节点 使用共享模式
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            //获取当前节点的前躯节点
                final Node p = node.predecessor();
                //如果节点为head节点
                if (p == head) {
                	//阻塞动作比较重,通常会再尝试获取资源,没有获取到返回负数
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //判断是否可以阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

countDown方法减一

  public void countDown() {
        sync.releaseShared(1);
    }
  public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
//尝试释放共享锁
protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                	//减到0的时候返回true,进行唤醒
                    return nextc == 0;
            }
        }

唤醒逻辑

 private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                	//wa为-1时,将其状态设置为0,并且唤醒
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
        //ws状态小于0就将其设置为0
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
        //s不为空就调用unpark
            LockSupport.unpark(s.thread);
    }

五. CountDownLatch与Thread.join的区别

  • CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
  • CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
  • 而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。

到此这篇关于Java AQS中闭锁CountDownLatch的使用的文章就介绍到这了,更多相关Java CountDownLatch内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • AQS加锁机制Synchronized相似点详解

    目录 正文 1. Synchronized加锁流程 2. AQS加锁原理 3. 总结 正文 在并发多线程的情况下,为了保证数据安全性,一般我们会对数据进行加锁,通常使用Synchronized或者ReentrantLock同步锁.Synchronized是基于JVM实现,而ReentrantLock是基于Java代码层面实现的,底层是继承的AQS. AQS全称 AbstractQueuedSynchronizer ,即抽象队列同步器,是一种用来构建锁和同步器的框架. 我们常见的并发锁Reentr

  • Java AQS中ReentrantLock条件锁的使用

    目录 一.什么是AQS 1.定义 2.特性 3.属性 4.资源共享方式 5.两种队列 6.队列节点状态 7.实现方法 二.等待队列 1.同步等待队列 2.条件等待队列 三.condition接口 四.ReentrantLock 五.源码解析 一.什么是AQS 1.定义 java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列.条件队列.独占获取.共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的

  • Java AQS中ReentrantReadWriteLock读写锁的使用

    目录 一. 简介 二. 接口及实现类 三.使用 四. 应用场景 五. 锁降级 六.源码解析 七.总结 一. 简介 为什么会使用读写锁? 日常大多数见到的对共享资源有读和写的操作,写操作并没有读操作那么频繁(读多写少),在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发):但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥).在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量.

  • AQS核心流程解析cancelAcquire方法

    目录 引出问题 更新正常节点的链表 当前取消节点是tail节点的情况 当前取消节点是非tail节点的情况 引出问题 首先,先考虑一个问题,什么条件会触发cancelAcquire()方法? cancelAcquire()方法的反向查找 可以清楚的看到在互斥锁和共享锁的拿锁过程中都是有调用此方法的,而cancelAcquire()方法是写在finally代码块中,并且使用failed标志位来控制cancelAcquire()方法的执行.可以得出,在触发异常的情况下会执行cancelAcquire(

  • Java AQS信号量Semaphore的使用

    目录 一.什么是Semaphore 二.Semaphore的使用 三.Semaphore源码分析 一.什么是Semaphore Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的. Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现.大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量. PV操作是操作系统一种实现进程

  • Java并发编程之CountDownLatch源码解析

    一.前言 CountDownLatch维护了一个计数器(还是是state字段),调用countDown方法会将计数器减1,调用await方法会阻塞线程直到计数器变为0.可以用于实现一个线程等待所有子线程任务完成之后再继续执行的逻辑,也可以实现类似简易CyclicBarrier的功能,达到让多个线程等待同时开始执行某一段逻辑目的. 二.使用 一个线程等待其它线程执行完再继续执行 ...... CountDownLatch cdl = new CountDownLatch(10); Executor

  • 分析java并发中的wait notify notifyAll

    一.前言 java 面试是否有被问到过,sleep 和 wait 方法的区别,关于这个问题其实不用多说,大多数人都能回答出最主要的两点区别: sleep 是线程的方法, wait / notify / notifyAll 是 Object 类的方法: sleep 不会释放当前线程持有的锁,到时间后程序会继续执行,wait 会释放线程持有的锁并挂起,直到通过 notify 或者 notifyAll 重新获得锁. 另外还有一些参数.异常等区别,不细说了.本文重点记录一下 wait / notify

  • java并发包工具CountDownLatch源码分析

    目录 一:简述 二:什么是CountDownLatch 三:CountDownLatch的使用 四:CountDownLatch原理分析 构造函数 await()方法: doAcquireSharedInterruptibly() 1. 当前节点的前置节点是head节点 2. 当前节点的前置节点不是head节点 addWaiter() setHeadAndPropagate() shouldParkAfterFailedAcquire() parkAndCheckInterrupt() coun

  • Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

    Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 若有不正之处请多多谅解,并欢迎批评指正. 一.CountDownLatch

  • Java多线程中线程间的通信实例详解

    Java多线程中线程间的通信 一.使用while方式来实现线程之间的通信 package com.ietree.multithread.sync; import java.util.ArrayList; import java.util.List; public class MyList { private volatile static List list = new ArrayList(); public void add() { list.add("apple"); } publ

  • Java多线程中不同条件下编写生产消费者模型方法介绍

    简介: 生产者.消费者模型是多线程编程的常见问题,最简单的一个生产者.一个消费者线程模型大多数人都能够写出来,但是一旦条件发生变化,我们就很容易掉进多线程的bug中.这篇文章主要讲解了生产者和消费者的数量,商品缓存位置数量,商品数量等多个条件的不同组合下,写出正确的生产者消费者模型的方法. 欢迎探讨,如有错误敬请指正 生产消费者模型 生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品.生产消费者模式

  • Java多线程中ReentrantLock与Condition详解

    一.ReentrantLock类 1.1什么是reentrantlock java.util.concurrent.lock中的Lock框架是锁定的一个抽象,它允许把锁定的实现作为Java类,而不是作为语言的特性来实现.这就为Lock的多种实现留下了空间,各种实现可能有不同的调度算法.性能特性或者锁定语义.ReentrantLock类实现了Lock,它拥有与synchronized相同的并发性和内存语义,但是添加了类似锁投票.定时锁等候和可中断锁等候的一些特性.此外,它还提供了在激烈争用情况下更

  • JAVA开发中的一些规范讲解(阿里巴巴Java开发规范手册)

    一.编程规约 (一) 命名规约 1.   [强制]所有编程相关命名均不能以下划线或美元符号开始,也不能以下划线或美元符号结束.反例: _name / __name / $Object / name_ / name$ / Object$ 2.   [强制]所有编程相关的命名严禁使用拼音与英文混合的方式,更不允许直接使用中文的方式.说明:正确的英文拼写和语法可以让阅读者易于理解,避免歧义.注意,即使纯拼音命名方式也要避免采用. 反例: DaZhePromotion [打折] / getPingfen

  • Java多线程中的wait/notify通信模式实例详解

    前言 最近在看一些JUC下的源码,更加意识到想要学好Java多线程,基础是关键,比如想要学好ReentranLock源码,就得掌握好AQS源码,而AQS源码中又有很多Java多线程经典的一些应用:再比如看了线程池的核心源码实现,又学到了很多核心实现,其实这些都可以提出来慢慢消化并变成自己的知识点,今天这个Java等待/通知模式其实是Thread.join()实现的关键,还有线程池工作线程中线程跟线程之间的通信的核心所在,故在此为了加深理解,做此记录! 参考资料<Java并发编程艺术>(电子PD

随机推荐