JDK源码之线程并发协调神器CountDownLatch和CyclicBarrier详解

目录
  • 引言
  • CountDownLatch
  • 使用场景
  • 底层实现原理
    • 初始化
    • 计数器递减
    • 阻塞线程
  • CyclicBarrier
  • 使用场景
  • 底层实现原理
    • 初始化
    • 阻塞等待
  • 总结

引言

那么在程序的世界中是如何对这种协调关系进行描述的呢?今天就和大家聊聊Java大神Doug Lea在并发包中如何通过CountDownLatch和CyclicBarrier实现任务协调的代码描述。

CountDownLatch

我相信大家都知道好代码的一个重要特性就是代码中类、变量等的命名可以做到顾名思义,也就是说看到命名就可以大概知道这个类或者变量表达了怎样的业务语义。就拿 CountDownLatch 来说,它的命名形象的表示了其能力属性,Count代表着计数,Down代表着计数器的递减操作,而Latch表示计数器递减后的结果动作。CountDownLatch结合起来的字面意思就是计数器递减后打开门栓,通过后面内容的描述,回过头来看大家肯定会觉得这个命名十分之形象。

好了通过它的类的名称,我们猜测了它的功能是通过计数器的递减操作来控制线程,那我们再看看官方描述是不是这个意思。

/**
* A synchronization aid that allows one or more threads to wait until
* a set of operations being performed in other threads completes.
*
* <p>A {@code CountDownLatch} is initialized with a given <em>count</em>.
* The {@link #await await} methods block until the current count reaches
* zero due to invocations of the {@link #countDown} method, after which
* all waiting threads are released and any subsequent invocations of
* {@link #await await} return immediately. This is a one-shot phenomenon
* -- the count cannot be reset. If you need a version that resets the
* count, consider using a {@link CyclicBarrier}.
*...
*/

上面注释的大致意思就是CountDownLatch是一个线程同步器,它允许一个或者多个线程阻塞等待直到其他线程中业务执行完成。CountDownLatch可以通过一个计数器进行初始化,他可以让那个等待的线程被阻塞,直到对应的计数器被置为0。当计数器置为0后,阻塞的线程被释放。另外它是一个一次性使用的同步器,计数器无法被重置。

通过JDK的官方描述我们可以明确CountDownLatch三个核心特征:

1、它是一种线程同步器,用以协调线程的执行触发时机;

2、它本质是一个计数器,是控制线程的号令枪;

3、它是一次性使用的,用完即失效。

知道了CountDownLatch是一个什么东东之后,我们再一起来看下它的使用场景是什么,我们在什么样的情况下可以使用它帮我们解决一些代码中的问题。

使用场景

就像上文描述的,CountDownLatch就像是田径赛场上裁判员发射的发令枪,所有参赛的选手准备就绪后,发令枪一响,所有运动员闻声而动。那么在Java多线程场景中,CountDownLatch就是线程协调者,它的计数器在没有减为0之前。假设有这样一个业务场景,在一个监控告警平台中,需要从告警服务中查询告警信息以及从工单服务中查询工单信息,然后再分析哪些告警没有转工单。按照老系统的做法,参见如下简化后的伪代码:

List<Alarm> alarmList = alarmService.getAlarm();
List<WorkOrder> workOrderList = workOrderService.getWorkOrder();
List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);

大家能看出来这段伪代码有什么需要进行优化的地方吗?我们来一起分析一下。这段代码在数据量不大的时候可能没什么影响,但是一旦告警以及工单的数据量大的时候,获取告警信息或者获取工单信息都可能出现数据查询慢的问题,那就会导致这个分析任务就会出现性能瓶颈的问题。那么我们应该怎么进行优化呢?从业务以及代码我们可以看的出来,获取告警信息以及获取工单信息,实际上并没有业务上面的耦合性,在上述代码中他们是顺序执行的,因此要进行性能优化,可以考虑将它们进行并行执行。

那么修改优化后的伪代码如下所示:

Executor executor = Executors.newFixedThreadPool(2);
executor.execute(()-> { alarmList = alarmService.getAlarm(); });
executor.execute(()-> { workOrderList = workOrderService.getWorkOrder(); });

List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);

我们通过使用线程池的方式,在获取告警信息以及工单信息的时候并发执行,不再像之前的执行完获取告警信息再执行获取工单信息,这样效率更高。但是这样的实现方式还是存在问题,由于在线的线程中执行操作,并不知道其实际的执行结果,这就不好判断执行数据分析的具体时机。这个时候CountDownLatch就派上用场了,利用它可以实现线程拣的等待,条件满足后再放开执行后续的逻辑。这就好比公司组织团建,约定好了早上8点半在公司大门集合,那么司机师傅肯定要等到所有参加团建的同时都到齐后才会发车。

使用CountDownLatch之后的伪代码如下所示:

Executor executor = Executors.newFixedThreadPool(2);
CountDownLatch latch = new CountDownLatch(2);
executor.execute(()-> { alarmList = alarmService.getAlarm();
                      latch.countDown();
                      });
executor.execute(()-> { workOrderList = workOrderService.getWorkOrder();
                      latch.countDown();
                      });
latch.await();
List<Alarm> notTransferToWorkOrder = analysis(alarmList, workOrderList);

底层实现原理

初始化

在使用CountDownLatch之前我们得先进行初始化,在初始化的过程中实际做了两件事情,一个是创建了一个AQS的同步队列,另外一个是将AQS中的state设置成了count,这个state是AQS的核心变量(AQS是并发包的底层实现基础,关于它的分析我们放到下一篇文章中进行)。

从代码中我们可以看的出来实际创建了Sync内部类实例,而Sync继承了AQS,同时重写了AQS加锁解锁的方法,并通过Sync的对象,调用AQS的方法,阻塞线程的运行。Sync内部类的代码如下所示,其中tryAcquireShared方法重写了AQS的模板方法,主要用来获取共享锁,在CountDownLatch内部主要通过判断获取到的state的值是否为0来决定到底有没有获取到锁。如果获取到的state为0,则表示获取锁成功,此时线程不会阻塞,反之则获取锁失败,线程会阻塞。

private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }
		//尝试加共享锁(通过state判断)
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }
		//尝试释放共享锁(通过state判断)
        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

计数器递减

如上文场景中介绍的代码,每个线程在执行完成自身业务后执行countDown操作,表示该线程已经准备完成。同时检查count值是否为0。如果为0则需要唤醒所有等待的线程。如下代码所示,实际上它调用的是父类AQS的releaseShared方法。

public void countDown() {
        sync.releaseShared(1);
    }

tryReleaseShared这个方法实际是进行尝试释放锁的操作,如果此次count递减为0,然后释放所有的线程。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

大致的代码执行逻辑可参见下图:

阻塞线程

await的作用就是将当前线程阻塞住,直到count值减为0才会放开执行。它实际调用了内部类的tryAcquireSharedNanos方法,这个方法实际是Sync类的父类AQS中的方法。

public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

AQS提供了可以响应中断的获取公平锁的实现的方式。tryAcquireShared在上文已经进行了介绍,该方法的作用是尝试获取共享锁,如果获取失败,则线程将会被加入到AQS的同步队列中进行等待,也就是所谓的线程阻塞。

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

CyclicBarrier

我们还是从CyclicBarrier的字面意思来先进行理解,Cyclic是循环的意思而Barrier则表示栅栏、障碍的意思,字面的意思就是可循环的栅栏。还是老套路,在进行CyclicBarrier之前,我们先来看下JDK是怎么描述的。

/**
* A synchronization aid that allows a set of threads to all wait for
* each other to reach a common barrier point. CyclicBarriers are
* useful in programs involving a fixed sized party of threads that
* must occasionally wait for each other. The barrier is called
* <em>cyclic</em> because it can be re-used after the waiting threads
* are released.
*
* <p>A {@code CyclicBarrier} supports an optional {@link Runnable} command
* that is run once per barrier point, after the last thread in the party
* arrives, but before any threads are released.
* This <em>barrier action</em> is useful
* for updating shared-state before any of the parties continue.
*...
**/

通过JDK的描述,我们可以看得出来,CyclicBarrier也是一个线程同步协调器,用以协调一组进程的执行。当指定个数的线程到达栅栏后,可以放开栅栏,结束线程阻塞状态。这么看上去它和CountDownLatch作用差不多了,实际上还是有区别的,CyclicBarrier是可循环使用的,而CountDownLatch却是一次性的。我们来看下CyclicBarrier的核心属性。

//栅栏入口的锁
private final ReentrantLock lock = new ReentrantLock();
//线程等待条件
private final Condition trip = lock.newCondition();
//拦截的线程数量
private final int parties;
//在下一个栅栏代数到来前执行的任务
private final Runnable barrierCommand;
//当前的栅栏代数
private Generation generation = new Generation();

CyclicBarrier 的源码实现和 CountDownLatch 大同小异,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现的。

CyclicBarrier内部维护了parties和count变量,parties表示每次参与到一个Generation中需要被拦截的线程数量,而count是内部计数器,在初始化的时候count与parties相等,当每次调用await方法的时候计数器count就会减1,这和上文中的countDown类似。

使用场景

还是以上文中的业务场景为例我们再分析一下,上文中我们通过CountDownLatch实现了查询告警信息与查询工单信息的线程协调问题,但是新的问题又出现了。因为告警信息和工单信息都是实时在产生的,而使用CountDownLatch的实现方式只能完成一次的线程协调,后续产生的告警信息以及工单信息如果还有需要查询到之后再进行数据分析的话,它就爱莫能助了。也就是说,如果需要进行持续的线程之间的互相等待完成之后再执行后续的业务操作的话,这个时候就需要使用CyclicBarrier 来实现我们的需求了。

底层实现原理

初始化

CyclicBarrier 存在两种的构造函数,一种是构建CyclicBarrier 的时候指定每次需要进行协调的线程个数以及解除阻塞之后需要进行后续任务的执行,另一种只是设置需要协调的线程个数不设置后续执行的任务。

public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

 public CyclicBarrier(int parties) {
        this(parties, null);
    }

阻塞等待

对于CyclicBarrier 来说,其最核心的等待方法实现就是dowait方法,具体代码如下所示:

private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            int index = --count;
            //如果count计算为0,则需要唤醒所有线程并进入到下一阶段的线程协调期
            if (index == 0) {  // tripped
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration();
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            //计数器不为0,继续进行循环
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

我们可以看到在dowait方法中进行了count的递减操作,检查count的值是否为0,如果在初始化的时候定义好了要执行的任务,那么在count为0的时候就进行任务执行,任务执行完成之后调用nextGeneration进行下一次的线程协调周期,同时唤醒所有线程并重置计数器。

总结

本文分别从使用场景以及底层实现的角度分别介绍了线程同步协调神器CountDownLatch和CyclicBarrier,虽然它们都可以起到协调线程的作用但是实际上它们还是有区别的。CountDownLatch比较适合一个线程与其他多个线程之间的同步协调场景,而CyclicBarrier则适合一组线程之间的互相等待。另外CountDownLatch是一次性产品,而CyclicBarrier的计数器是可以重复使用的,可以进行自动重置计数器。

到此这篇关于JDK源码之线程并发协调神器CountDownLatch和CyclicBarrier详解的文章就介绍到这了,更多相关Java 线程并发协调内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java多线程之并发工具类CountDownLatch,CyclicBarrier和Semaphore

    目录 CountDownLatch Semaphore CyclicBarrier 总结 CountDownLatch CountDownLatch允许一个或多个线程等待其他线程完成操作. 假设一个Excel文件有多个sheet,我们需要去记录每个sheet有多少行数据, 这时我们就可以使用CountDownLatch实现主线程等待所有sheet线程完成sheet的解析操作后,再继续执行自己的任务. public class CountDownLatchTest { private static

  • Java中CyclicBarrier和CountDownLatch的用法与区别

    目录 前言 CountDownLatch 例子 CyclicBarrier 构造函数 例子 两者区别 前言 CyclicBarrier和CountDownLatch这两个工具都是在java.util.concurrent包下,并且平时很多场景都会使用到. 本文将会对两者进行分析,记录他们的用法和区别. CountDownLatch CountDownLatch是一个非常实用的多线程控制工具类,称之为"倒计时器",它允许一个或多个线程一直等待,直到其他线程的操作执行完后再执行. Coun

  • Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

    Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 若有不正之处请多多谅解,并欢迎批评指正. 一.CountDownLatch

  • 详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    前言 CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用.但两者就其内部实现和使用场景而言是各有所侧重的. 内部实现差异 前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性. public class { //Synchronization control For CountDownLatch. //Uses

  • JDK源码之线程并发协调神器CountDownLatch和CyclicBarrier详解

    目录 引言 CountDownLatch 使用场景 底层实现原理 初始化 计数器递减 阻塞线程 CyclicBarrier 使用场景 底层实现原理 初始化 阻塞等待 总结 引言 那么在程序的世界中是如何对这种协调关系进行描述的呢?今天就和大家聊聊Java大神Doug Lea在并发包中如何通过CountDownLatch和CyclicBarrier实现任务协调的代码描述. CountDownLatch 我相信大家都知道好代码的一个重要特性就是代码中类.变量等的命名可以做到顾名思义,也就是说看到命名

  • Java源码解析之HashMap的put、resize方法详解

    一.HashMap 简介 HashMap 底层采用哈希表结构 数组加链表加红黑树实现,允许储存null键和null值 数组优点:通过数组下标可以快速实现对数组元素的访问,效率高 链表优点:插入或删除数据不需要移动元素,只需要修改节点引用效率高 二.源码分析 2.1 继承和实现 public class HashMap<K,V> extends AbstractMap<K,V> implements Map<K,V>, Cloneable, Serializable {

  • jQuery源码解读之extend()与工具方法、实例方法详解

    本文实例讲述了jQuery源码解读之extend()与工具方法.实例方法.分享给大家供大家参考,具体如下: 使用jQuery的时候会发现,jQuery中有的函数是这样使用的: $.get(); $.post(); $.getJSON(); 有些函数是这样使用的: $('div').css(); $('ul').find('li'); 有些函数是这样使用的: $('li').each(callback); $.each(lis,callback); 这里涉及到两个概念:工具方法与实例方法.通常我们

  • Vue源码之关于vm.$delete()/Vue.use()内部原理详解

    vm.$delete() vm.$delete用法见官网. 为什么需要Vue.delete()? 在ES6之前, JS没有提供方法来侦测到一个属性被删除了, 因此如果我们通过delete删除一个属性, Vue是侦测不到的, 因此不会触发数据响应式. 见下面的demo. <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8" /> <meta na

  • Linux下源码包安装Swoole及基本使用操作图文详解

    本文实例讲述了Linux下源码包安装Swoole及基本使用操作.分享给大家供大家参考,具体如下: 下载Swoole PECL扩展源码包:http://pecl.php.net/package/swoole 关于PHP版本依赖选择: 下载好放到/usr/local/src下,解压缩: tar -zxvf swoole-2.2.0.tgz 准备扩展安装编译环境: phpize 查看php-config位置: find / -name php-config 配置:(--with-php-config=

  • 从java源码分析线程池(池化技术)的实现原理

    目录 线程池的起源 线程池的定义和使用 方案一:Executors(仅做了解,推荐使用方案二) 方案二:ThreadPoolExecutor 线程池的实现原理 前言: 线程池是一个非常重要的知识点,也是池化技术的一个典型应用,相信很多人都有使用线程池的经历,但是对于线程池的实现原理大家都了解吗?本篇文章我们将深入线程池源码来一探究竟. 线程池的起源 背景: 随着计算机硬件的升级换代,使我们的软件具备多线程执行任务的能力.当我们在进行多线程编程时,就需要创建线程,如果说程序并发很高的话,我们会创建

  • Java从JDK源码角度对Object进行实例分析

    Object是所有类的父类,也就是说java中所有的类都是直接或者间接继承自Object类.比如你随便创建一个classA,虽然没有明说,但默认是extendsObject的. 后面的三个点"..."表示可以接受若干不确定数量的参数.老的写法是Objectargs[]这样,但新版本的java中推荐使用...来表示.例如 publicvoidgetSomething(String...strings)(){} object是java中所有类的父类,也就是说所有的类,不管是自己创建的类还是

  • JDK源码分析之String、StringBuilder和StringBuffer

    前言 本文主要介绍了关于JDK源码分析之String.StringBuilder和StringBuffer的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧 String类的申明 public final class String implements java.io.Serializable, Comparable<String>, CharSequence {-} String类用了final修饰符,表示它不可以被继承,同时还实现了三个接口, 实现Serializa

  • JDK源码中一些实用的“小技巧”总结

    前言 这段时间比较闲,就看起了jdk源码.一般的一个高级开发工程师, 能阅读一些源码对自己的提升还是蛮大的.本文总结了一些JDK源码中的"小技巧",分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 1 i++ vs i-- String源码的第985行,equals方法中 while (n--!= 0) { if (v1[i] != v2[i]) return false; i++; } 这段代码是用于判断字符串是否相等,但有个奇怪地方是用了i--!=0来做判断,我们通

  • 最通俗的白话讲解JDK源码中的ThreadLocal

    目录 引言 ThreadLocal是什么?它能干什么? ThreadLocal实现线程隔离的秘密 为什么ThreadLocal会出现OOM的问题? 内存泄漏演示 内存泄漏问题分析 父子线程的参数传递 总结 引言 其实网上有很多关于ThreadLocal的文章了,有不少文章也已经写的非常好了.但是很多同学反应还有一些部分没有讲解的十分清楚,还是有一定的疑惑没有想的十分清楚.因此本文主要结合常见的一些疑问.ThreadLocal源码.应用实例以注意事项来全面而深入地再详细讲解一遍ThreadLoca

随机推荐