详解Java回环屏障CyclicBarrier

  上一篇说的CountDownLatch是一个计数器,类似线程的join方法,但是有一个缺陷,就是当计数器的值到达0之后,再调用CountDownLatch的await和countDown方法就会立刻返回,就没有作用了,那么反正是一个计数器,为什么不能重复使用呢?于是就出现了这篇说的CyclicBarrier,它的状态可以被重用;

一.简单例子

  用法其实和CountDownLatch差不多,也就是一个计数器,当计数器的值变为0之后,就会把阻塞的线程唤醒:

package com.example.demo.study;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Study0216 {
 // 注意这里的构造器,第一个参数表示计数器初始值
 // 第二个参数表示当计数器的值变为0的时候就触发的任务
 static CyclicBarrier cyclicBarrier = new CyclicBarrier(2, () -> {
  System.out.println("cyclicBarrier task ");
 });

 public static void main(String[] args) {
  // 新建两个线程的线程池
  ExecutorService pool = Executors.newFixedThreadPool(2);
  // 线程1放入线程池中
  pool.submit(() -> {
   try {
    System.out.println("Thread1----await-begin");
    cyclicBarrier.await();
    System.out.println("Thread1----await-end");
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 线程2放到线程池中
  pool.submit(() -> {
   try {
    System.out.println("Thread2----await-begin");
    cyclicBarrier.await();
    System.out.println("Thread2----await-end");
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 关闭线程池,此时还在执行的任务会继续执行
  pool.shutdown();
 }
}

 我们再看看CyclicBarrier的复用性,这里比如有一个任务,有三部分组成,分别是A,B,C,然后创建两个线程去执行这个任务,必须要等到两个线程都执行完成A部分,然后才能开始执行B,只有两个线程都执行完成B部分,才能执行C:

package com.example.demo.study;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Study0216 {
 // 这里的构造器,只有一个参数,表示计数器初始值
 static CyclicBarrier cyclicBarrier = new CyclicBarrier(2);

 public static void main(String[] args) {
  // 新建两个线程的线程池
  ExecutorService pool = Executors.newFixedThreadPool(2);
  // 线程1放入线程池中
  pool.submit(() -> {
   try {
    System.out.println("Thread1----stepA-start");
    cyclicBarrier.await();

    System.out.println("Thread1----stepB-start");
    cyclicBarrier.await();

    System.out.println("Thread1----stepC-start");

   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 线程2放到线程池中
  pool.submit(() -> {
   try {
    System.out.println("Thread2----stepA-start");
    cyclicBarrier.await();

    System.out.println("Thread2----stepB-start");
    cyclicBarrier.await();

    System.out.println("Thread2----stepC-start");
   } catch (Exception e) {
    e.printStackTrace();
   }
  });
  // 关闭线程池,此时还在执行的任务会继续执行
  pool.shutdown();
 }
}

二.基本原理

  我们看看一些重要属性:

public class CyclicBarrier {
 //这个内部类只有一个boolean值
 private static class Generation {
  boolean broken = false;
 }

 //独占锁
 private final ReentrantLock lock = new ReentrantLock();
  //条件变量
 private final Condition trip = lock.newCondition();
  //保存线程的总数
 private final int parties;
 //这是一个任务,通过构造器传递一个任务,当计数器变为0之后,就可以执行这个任务
 private final Runnable barrierCommand;
 //这类内部之后一个boolean的值,表示屏障是否被打破
 private Generation generation = new Generation();
 //计数器
 private int count;
}

  构造器:

//我们的构造器初始值设置的是parties
public CyclicBarrier(int parties) {
 this(parties, null);
}
//注意,这里开始的时候是count等于parties
//为什么要有两个变量呢?我们每次调用await方法的时候count减一,当count的值变为0之后,怎么又还原成初始值呢?
//直接就把parties的值赋值给count就行了呀,简单吧!
public CyclicBarrier(int parties, Runnable barrierAction) {
 if (parties <= 0) throw new IllegalArgumentException();
 this.parties = parties;
 this.count = parties;
 this.barrierCommand = barrierAction;
}

  然后再看看await方法:

public int await() throws InterruptedException, BrokenBarrierException {
 try {
  //调用的是dowait方法
  return dowait(false, 0L);
 } catch (TimeoutException toe) {
  throw new Error(toe); // cannot happen
 }
}

//假设count等于3,有三个线程都在调用这个方法,默认超时时间为0,那么首每次都只有一个线程可以获取锁,将count减一,不为0
//就会到下面的for循环中扔到条件队列中挂起;直到第三个线程调用这个dowait方法,count减一等于0,那么当前线程执行任务之后,
//就会唤醒条件变量中阻塞的线程,并重置count为初始值3
private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException, TimeoutException {
 //获取锁
 final ReentrantLock lock = this.lock;
 lock.lock();
 try {
  //g中只有一个boolean值
  final Generation g = generation;
  //如果g中的值为true的时候,抛错
  if (g.broken)
   throw new BrokenBarrierException();
  //如果当前线程中断,就抛错
  if (Thread.interrupted()) {
   breakBarrier();
   throw new InterruptedException();
  }
  //count减一,再赋值给index
  int index = --count;
  //如果index等于0的时候,说明所有的线程已经到屏障点了,就可以
  if (index == 0) { // tripped
   boolean ranAction = false;
   try {
    //执行当前线程的任务
    final Runnable command = barrierCommand;
    if (command != null)
     command.run();
    ranAction = true;
    //唤醒其他因为调用了await方法阻塞的线程
    nextGeneration();
    return 0;
   } finally {
    if (!ranAction)
     breakBarrier();
   }
  }
  //能到这里来,说明是count不等于0,也就是还有的线程没有到屏障点
  for (;;) {
   try {
    //wait方法有两种情况,一种是设置超时时间,一种是不设置超时时间
    //这里就是对超时时间进行的一个判断,如果设置的超时时间为0,则会在条件队列中无限的等待下去,直到被唤醒
    //设置了超时时间,那就等待该时间
    if (!timed)
     trip.await();
    else if (nanos > 0L)
     nanos = trip.awaitNanos(nanos);
   } catch (InterruptedException ie) {
    if (g == generation && ! g.broken) {
     breakBarrier();
     throw ie;
    } else {
     Thread.currentThread().interrupt();
    }
   }

   if (g.broken)
    throw new BrokenBarrierException();

   if (g != generation)
    return index;

   if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
   }
  }
 } finally {
  //释放锁
  lock.unlock();
 }
}

//唤醒其他因为调用了await方法阻塞的线程
private void nextGeneration() {
 //唤醒条件变量中所有线程
 trip.signalAll();
 //重置count的值
 count = parties;
 generation = new Generation();
}

private void breakBarrier() {
 generation.broken = true;
 //重置count为初始值parties
 count = parties;
 //唤醒条件队列中的所有线程
 trip.signalAll();
}

以上就是详解Java回环屏障CyclicBarrier的详细内容,更多关于Java CyclicBarrier的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java中CyclicBarrier的用法分析

    复制代码 代码如下: public class TestCyclicBarrier { private static final int THREAD_NUM = 5; public static class WorkerThread implements Runnable{ CyclicBarrier barrier; public WorkerThread(CyclicBarrier b){             this.barrier = b;         } @Override 

  • java多线程开发之通过对战游戏学习CyclicBarrier

    CyclicBarrier是java.util.concurrent包下面的一个工具类,字面意思是可循环使用(Cyclic)的屏障(Barrier),通过它可以实现让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,所有被屏障拦截的线程才会继续执行. 这篇文章将介绍CyclicBarrier这个同步工具类的以下几点 通过案例分析 两种不同构造函数测试 CyclicBarrier和CountDownLatch的区别 await方法及源码分析. 需求 继上一篇CountDo

  • Java多线程下的其他组件之CyclicBarrier、Callable、Future和FutureTask详解

    CyclicBarrier 接着讲多线程下的其他组件,第一个要讲的就是CyclicBarrier.CyclicBarrier从字面理解是指循环屏障,它可以协同多个线程,让多个线程在这个屏障前等待,直到所有线程都达到了这个屏障时,再一起继续执行后面的动作.看一下CyclicBarrier的使用实例: public static class CyclicBarrierThread extends Thread { private CyclicBarrier cb; private int sleep

  • Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

    Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 若有不正之处请多多谅解,并欢迎批评指正. 一.CountDownLatch

  • Java并发系列之CyclicBarrier源码分析

    现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始.例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始.在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类.利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作.下图演示了这一过程. 在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await

  • java并发编程专题(九)----(JUC)浅析CyclicBarrier

    上一篇我们介绍了CountDownlatch,我们知道CountDownlatch是"在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待",即CountDownLatch的作用是允许1或N个线程等待其他线程完成执行,而我们今天要介绍的CyclicBarrier则是允许N个线程相互等待. 1.CyclicBarrier简介 CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier).它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)

  • 详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    前言 CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用.但两者就其内部实现和使用场景而言是各有所侧重的. 内部实现差异 前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性. public class { //Synchronization control For CountDownLatch. //Uses

  • Java并发实例之CyclicBarrier的使用

    最近一直整并发这块东西,顺便写点Java并发的例子,给大家做个分享,也强化下自己记忆,如果有什么错误或者不当的地方,欢迎大家斧正. CyclicBarrier是一种多线程并发控制实用工具,和CountDownLatch非常类似,它也可以实现线程间的计数等待,但是它的功能比CountDownLatch更加复杂且强大. CyclicBarrier的介绍 CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier).它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)

  • java多线程之CyclicBarrier的使用方法

    java多线程之CyclicBarrier的使用方法 public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for(int i=0;i<3;i++){ Runnable runnable = n

  • Java并发编程(CyclicBarrier)实例详解

    Java并发编程(CyclicBarrier)实例详解 前言: 使用JAVA编写并发程序的时候,我们需要仔细去思考一下并发流程的控制,如何让各个线程之间协作完成某项工作.有时候,我们启动N个线程去做一件事情,只有当这N个线程都达到某一个临界点的时候,我们才能继续下面的工作,就是说如果这N个线程中的某一个线程先到达预先定义好的临界点,它必须等待其他N-1线程也到达这个临界点,接下来的工作才能继续,只要这N个线程中有1个线程没有到达所谓的临界点,其他线程就算抢先到达了临界点,也只能等待,只有所有这N

  • Java并发编程之栅栏(CyclicBarrier)实例介绍

    栅栏类似闭锁,但是它们是有区别的. 1.闭锁用来等待事件,而栅栏用于等待其他线程.什么意思呢?就是说闭锁用来等待的事件就是countDown事件,只有该countDown事件执行后所有之前在等待的线程才有可能继续执行;而栅栏没有类似countDown事件控制线程的执行,只有线程的await方法能控制等待的线程执行. 2.CyclicBarrier强调的是n个线程,大家相互等待,只要有一个没完成,所有人都得等着. 场景分析:10个人去春游,规定达到一个地点后才能继续前行.代码如下 复制代码 代码如

  • java线程并发cyclicbarrier类使用示例

    复制代码 代码如下: package com.yao; import java.util.Random;import java.util.concurrent.CyclicBarrier; /** * CyclicBarrier类似于CountDownLatch也是个计数器, * 不同的是CyclicBarrier数的是调用了CyclicBarrier.await()进入等待的线程数, * 当线程数达到了CyclicBarrier初始时规定的数目时,所有进入等待状态的线程被唤醒并继续. * Cy

随机推荐