AQS同步组件CyclicBarrier循环屏障用例剖析

目录
  • CyclicBarrier原理
  • 源码分析
  • 使用案例
    • await()
    • await(long timeout, TimeUnit unit)
    • CyclicBarrier(int parties, Runnable barrierAction)
    • CyclicBarrier和CountDownLatch的区别

CyclicBarrier原理

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

当某个线程调用了await方法之后,就会进入等待状态,并将计数器+1,直到所有线程调用await方法使计数器为CyclicBarrier设置的值,才可以继续执行,由于计数器可以重复使用,所以我们又叫它循环屏障。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

源码分析

    /**
     * 创建一个新的CyclicBarrier当给定数量的参与方(线程)等待它时,它将触发,
     * 并且在障碍触发时不执行预定义的操作。
     *
     * @param  在barrier被触发之前必须调用await()的线程数
     * @throws IllegalArgumentException 如果parties小于1抛出异常
     */
    public CyclicBarrier(int parties) {
        this(parties, null);
    }

/**
     *
     * 当前线程调用await方法的线程告知CyclicBarrier已经到达屏障,然后当前线程被阻塞
     *
     * @return 当前线程的到达索引,其中索引为- 1表示第一个到达的,0表示最后一个到达的
     * @throws InterruptedException 如果当前线程在等待时被中断
     * @throws BrokenBarrierException 如果另一个线程在当前线程等待时被中断或超时,
     * 或者屏障被重置,或者在调用await方法时屏障被破坏,或者屏障操作(如果存在)由于异常而失败
     */
    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

使用案例

await()

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    /**
     * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready {}", threadNum,barrier.getNumberWaiting());
        //每次调用await方法后计数器+1,当前线程被阻塞
        barrier.await();
        log.info("{} continue", threadNum);
    }

输出结果:

16:16:40.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 is ready 0
16:16:41.244 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 is ready 1
16:16:42.244 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 is ready 2
16:16:43.244 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 is ready 3
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 is ready 4
16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 continue
16:16:44.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 continue
16:16:44.245 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 continue
16:16:44.245 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 continue
16:16:44.245 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 continue
16:16:45.245 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 is ready 0
16:16:46.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 is ready 1
16:16:47.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 is ready 2
16:16:48.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 is ready 3
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 is ready 4
16:16:49.246 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 continue
16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 continue
16:16:49.246 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 continue
16:16:49.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 continue
16:16:49.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 continue
16:16:50.247 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 is ready 0
16:16:51.247 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 is ready 1
16:16:52.247 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 is ready 2
16:16:53.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 is ready 3
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 is ready 4
16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 continue
16:16:54.248 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 continue
16:16:54.248 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 continue
16:16:54.248 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 continue
16:16:54.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 continue

通过输出结果可以知道,每次屏障会阻塞5个线程,5个线程执行后计数器达到预设值,继续执行后续操作。

await(long timeout, TimeUnit unit)

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    /**
     * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready{}", threadNum,barrier.getNumberWaiting());
        //每次调用await方法后计数器+1,当前线程被阻塞
        //等待2s.为了使在发生异常的时候,不影响其他线程,一定要catch
        //由于设置了超时时间后阻塞的线程可能会被中断,抛出BarrierException异常,如果想继续往下执行,需要加上try-catch
        try {
            barrier.await(2, TimeUnit.SECONDS);
        }catch (Exception e){
            //查看执行异常的线程
            log.info("线程{} 执行异常,阻塞被中断?{}",threadNum,barrier.isBroken());
        }
        log.info("{} continue", threadNum);
    }

输出结果:

17:06:24.440 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 is ready0
17:06:25.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 is ready1
17:06:26.435 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 is ready2
17:06:26.455 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 线程0 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 线程2 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程1 执行异常,阻塞被中断?true
17:06:26.456 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 continue
17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 continue
17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 continue
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 is ready0
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程3 执行异常,阻塞被中断?true
17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 continue
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 is ready0
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程4 执行异常,阻塞被中断?true
17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 continue
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 is ready0
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程5 执行异常,阻塞被中断?true
17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 continue
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 is ready0
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程6 执行异常,阻塞被中断?true
17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 continue
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 is ready0
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程7 执行异常,阻塞被中断?true
17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 continue
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 is ready0
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程8 执行异常,阻塞被中断?true
17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 continue
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 is ready0
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程9 执行异常,阻塞被中断?true
17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 continue

CyclicBarrier(int parties, Runnable barrierAction)

 /**
     * 线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景
     */
    private static CyclicBarrier barrier = new CyclicBarrier(5, () -> {
        log.info("callback is running");
    });

输出结果:

17:11:38.867 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 is ready
17:11:38.966 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 is ready
17:11:39.067 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 is ready
17:11:39.167 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 is ready
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 continue
17:11:39.268 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 continue
17:11:39.268 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 continue
17:11:39.268 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 continue
17:11:39.268 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 continue
17:11:39.369 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 is ready
17:11:39.470 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 is ready
17:11:39.570 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 is ready
17:11:39.671 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 is ready
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - callback is running
17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 continue
17:11:39.772 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 continue
17:11:39.772 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 continue
17:11:39.772 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 continue
17:11:39.772 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 continue

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

以上就是AQS同步组件CyclicBarrier循环屏障用例剖析的详细内容,更多关于AQS同步组件CyclicBarrier的资料请关注我们其它相关文章!

(0)

相关推荐

  • 一文读懂go中semaphore(信号量)源码

    运行时信号量机制 semaphore 前言 最近在看源码,发现好多地方用到了这个semaphore. 本文是在go version go1.13.15 darwin/amd64上进行的 作用是什么 下面是官方的描述 // Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case /

  • 详解Java信号量Semaphore的原理及使用

    目录 1.Semaphore的概述 2.Semaphore的原理 2.1 基本结构 2.2 可中断获取信号量 2.3 不可中断获取信号量 2.4 超时可中断获取信号量 2.5 尝试获取信号量 2.6 释放信号量 3.Semaphore的使用 4.Semaphore的总结 1.Semaphore的概述 public class Semaphore extends Object implements Serializable Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为

  • 详解Java 信号量Semaphore

    Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数: 一.简单使用 同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行: package com.example.demo.study; import java.util.concurrent.ExecutorService; import ja

  • AQS(AbstractQueuedSynchronizer)抽象队列同步器及工作原理解析

    目录 前言 AQS是什么? 用银行办理业务的案例模拟AQS如何进行线程管理和通知机制 结语 前言 AQS 绝对是JUC的重要基石,也是面试中经常被问到的,所以我们要搞清楚这个AQS到底是什么?骑工作原理是什么? AQS是什么? AQS,AbstractQueuedSynchronizer,即队列同步器.它是构建锁或者其他同步组件的基础框架(如ReentrantLock.ReentrantReadWriteLock.Semaphore等),JUC并发包的作者(Doug Lea)期望它能够成为实现大

  • Java中的AQS同步队列问题详解

    目录 AQS 同步队列 1.AQS 介绍 1.1.类图关系 1.2.节点剖析 2.AQS 实现原理 2.1.队列初始化 2.2.追加节点 3.AQS 唤醒动作 AQS 同步队列 1.AQS 介绍 AQS 是 AbstractQueuedSynchronizer 的缩写,他是一个抽象同步类,为 JUC 包下的大多数同步工具提供了核心实现,例如 ReentrantLock 的底层就是使用同步队列.AQS 提供一套基础的机制来实现线程的同步.阻塞与唤醒.等待队列等功能,也就是想要深入学习线程工具类,这

  • Java信号量Semaphore原理及代码实例

    Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目.自从5.0开始,jdk在java.util.concurrent包里提供了Semaphore 的官方实现,因此大家不需要自己去实现Semaphore. 下面的类使用信号量控制对内容池的访问: import java.util.concurrent.Semaphore; class Pool { private static final int MAX_AVAILABLE = 100; private final Sema

  • AQS同步组件Semaphore信号量案例剖析

    目录 基本概念 作用和使用场景 源码分析 构造函数 常用方法 使用案例 acquire()获取单个许可 acquire(int permits)获取多个许可 tryAcquire()获取许可 tryAcquire(long timeout, TimeUnit unit) 基本概念 Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制.使用Semaphore可以控制并发访问资源的线程个数. 例如排队买票的情况,如果只有三个窗口,那么同一时间最多也只能有三个人

  • AQS同步组件CyclicBarrier循环屏障用例剖析

    目录 CyclicBarrier原理 源码分析 使用案例 await() await(long timeout, TimeUnit unit) CyclicBarrier(int parties, Runnable barrierAction) CyclicBarrier和CountDownLatch的区别 CyclicBarrier原理 CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier).它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,

  • Java中CyclicBarrier 循环屏障

    目录 一.简介 二.CyclicBarrier的使用 CyclicBarrier 应用场景 模拟合并计算场景 模拟“人满发车”的场景 三.CyclicBarrier 源码分析 CyclicBarrier 流程 几个常见的问题? CyclicBarrier 与 CountDownLatch的区别 一.简介 CyclicBarrier 字面意思回环栅栏(循环屏障),它可以实现让一组线程等待至某个状态(屏障点)之后再全部同时执行.叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以

  • Java 详解循环屏障CyclicBarrier如何实现多线程分段等待执行完成

    前言 工作中是否有这样的场景,多个线程任务,如果所有线程完成到某个阶段,你希望知道所有线程均完成该阶段.当然你使用线程计数可以实现,只是不够优雅. 所以我即:Java 多线程等待优雅的实现方式之Phaser同步屏障 之后再提供一个循环屏障,CyclicBarrier,更优雅的实现工具. Maven依赖 可以依赖,也可以不依赖,只是代码要稍微多一些,最好添加. <dependency> <groupId>org.projectlombok</groupId> <ar

  • Java并发编程之JUC并发核心AQS同步队列原理剖析

    目录 一.AQS介绍 二.AQS中的队列 1.同步等待队列 2.条件等待队列 3.AQS队列节点Node 三.同步队列源码分析 1.同步队列分析 2.同步队列--独占模式源码分析 3.同步队列--共享模式源码分析 一.AQS介绍 队列同步器AbstractQueuedSynchronizer(简称AQS),AQS定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其他同步组件的基础框架,是一个依赖状态(state)的同步器.Java并发编程的核心在java.util.concurrent(

  • EasyUI Tree树组件无限循环的解决方法

    在学习jquery easyui的tree组件的时候,在url为链接地址的时,发现如果最后一个节点的state为closed时,未节点显示为文件夹,单击会重新加载动态(Url:链接地址)形成无限循环.如: tree.json [{ "id":1, "text":"Folder1", "iconCls":"icon-save", "children":[{ "text"

  • 线上dubbo线程池耗尽CyclicBarrier线程屏障异常解决记录

    目录 事件背景 问题定位 解决问题 文末结语 事件背景 系统相关使用人员反馈系统故障,日志显示从ams系统服务提示dubbo处理线程不足,具体异常信息如下: 问题定位 从上图可知,dubbo的处理线程池满了,默认200个线程,活动线程也是200个.这个现象非常不正常,我们的应用并发还没有到这个程度能同时占用200个线程处理请求.然后去读了下dubbo源码,发现dubbo也认为这种情况不正常,然后帮我们记录了应用的线程堆栈信息,这个非常赞.代码如下: 上面这段代码,在线程池不够用时,会每隔十分钟输

  • Tree组件实现支持50W数据方法剖析

    目录 出师未捷身先死 Tree 自身的复杂性 虚拟滚动带来的复杂性 查问题 怎么做 缓存数据 减少响应式数据 用更快的 JS 语法 扣细节 数据结构一致性的魅力 出师未捷身先死 有用户在 fes-design VIP群 吐槽 Tree 组件在处理一万条左右数据时很卡.但是 fes-design 已重视大数据场景,提供基础的虚拟列表组件,以及选择器.表格.树形.级联等组件基于虚拟列表处理了大数据场景,为啥 Tree 组件还卡呢? Tree 自身的复杂性 Tree 数据结构特性决定 Tree 组件中

  • Web componentd组件内部事件回调及痛点剖析

    目录 写在前面 WC 到底是什么? 目前存在的缺陷 1.组件内部事件的回调 2.组件样式覆盖 3.组件内部资源相对路径问题 4.form表单类组件 value 获取问题 5.其它 写在后面 写在前面 最近致力于研究 Web components(以下简称WC),并且也初有成效的拿到了一定的结果,但今天想回过头来重新审视一下 WC. WC 到底是什么? 简单的讲,Web Component 就是把组件封装成 html 标签的形式,并且在使用时不需要写额外的 js 代码. 组件是前端的发展方向,抛开

随机推荐