详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。

内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。

 public class {
   //Synchronization control For CountDownLatch.
   //Uses AQS state to represent count.
  private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
      setState(count);
    }

    int getCount() {
      return getState();
    }

    protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {
        int c = getState();
        if (c == 0)
          return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
          return nextc == 0;
      }
    }
  }

  private final Sync sync;
  ... ...//
 }
 public class CyclicBarrier {
  /**
   * Each use of the barrier is represented as a generation instance.
   * The generation changes whenever the barrier is tripped, or
   * is reset. There can be many generations associated with threads
   * using the barrier - due to the non-deterministic way the lock
   * may be allocated to waiting threads - but only one of these
   * can be active at a time (the one to which {@code count} applies)
   * and all the rest are either broken or tripped.
   * There need not be an active generation if there has been a break
   * but no subsequent reset.
   */
  private static class Generation {
    boolean broken = false;
  }

  /** The lock for guarding barrier entry */
  private final ReentrantLock lock = new ReentrantLock();
  /** Condition to wait on until tripped */
  private final Condition trip = lock.newCondition();
  /** The number of parties */
  private final int parties;
  /* The command to run when tripped */
  private final Runnable barrierCommand;
  /** The current generation */
  private Generation generation = new Generation();

  /**
   * Number of parties still waiting. Counts down from parties to 0
   * on each generation. It is reset to parties on each new
   * generation or when broken.
   */
  private int count;

  /**
   * Updates state on barrier trip and wakes up everyone.
   * Called only while holding lock.
   */
  private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
  }

  /**
   * Sets current barrier generation as broken and wakes up everyone.
   * Called only while holding lock.
   */
  private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
  }

  /**
   * Main barrier code, covering the various policies.
   */
  private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      final Generation g = generation;

      if (g.broken)
        throw new BrokenBarrierException();

      if (Thread.interrupted()) {
        breakBarrier();
        throw new InterruptedException();
      }

      int index = --count;
      if (index == 0) { // tripped
        boolean ranAction = false;
        try {
          final Runnable command = barrierCommand;
          if (command != null)
            command.run();
          ranAction = true;
          nextGeneration();
          return 0;
        } finally {
          if (!ranAction)
            breakBarrier();
        }
      }

      // loop until tripped, broken, interrupted, or timed out
      for (;;) {
        try {
          if (!timed)
            trip.await();
          else if (nanos > 0L)
            nanos = trip.awaitNanos(nanos);
        } catch (InterruptedException ie) {
          if (g == generation && ! g.broken) {
            breakBarrier();
            throw ie;
          } else {
            // We're about to finish waiting even if we had not
            // been interrupted, so this interrupt is deemed to
            // "belong" to subsequent execution.
            Thread.currentThread().interrupt();
          }
        }

        if (g.broken)
          throw new BrokenBarrierException();

        if (g != generation)
          return index;

        if (timed && nanos <= 0L) {
          breakBarrier();
          throw new TimeoutException();
        }
      }
    } finally {
      lock.unlock();
    }
  }
  ... ... //
 }

实战 - 展示各自的使用场景

/**
 *类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
 */
public class UseCountDownLatch {

  static CountDownLatch latch = new CountDownLatch(6);

  /*初始化线程*/
  private static class InitThread implements Runnable{

    public void run() {
      System.out.println("Thread_"+Thread.currentThread().getId()
         +" ready init work......");
      latch.countDown();
      for(int i =0;i<2;i++) {
        System.out.println("Thread_"+Thread.currentThread().getId()
           +" ........continue do its work");
      }
    }
  }

  /*业务线程等待latch的计数器为0完成*/
  private static class BusiThread implements Runnable{

    public void run() {
      try {
        latch.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      for(int i =0;i<3;i++) {
        System.out.println("BusiThread_"+Thread.currentThread().getId()
           +" do business-----");
      }
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new Thread(new Runnable() {
      public void run() {
        SleepTools.ms(1);
        System.out.println("Thread_"+Thread.currentThread().getId()
           +" ready init work step 1st......");
        latch.countDown();
        System.out.println("begin step 2nd.......");
        SleepTools.ms(1);
        System.out.println("Thread_"+Thread.currentThread().getId()
           +" ready init work step 2nd......");
        latch.countDown();
      }
    }).start();
    new Thread(new BusiThread()).start();
    for(int i=0;i<=3;i++){
      Thread thread = new Thread(new InitThread());
      thread.start();
    }
    latch.await();
    System.out.println("Main do ites work........");
  }
}
/**
 *类说明:共4个子线程,他们全部完成工作后,交出自己结果,
 *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
 */
class UseCyclicBarrier {
  private static CyclicBarrier barrier
      = new CyclicBarrier(4,new CollectThread());

  //存放子线程工作结果的容器
  private static ConcurrentHashMap<String,Long> resultMap
      = new ConcurrentHashMap<String,Long>();

  public static void main(String[] args) {
    for(int i=0;i<4;i++){
      Thread thread = new Thread(new SubThread());
      thread.start();
    }

  }

  /*汇总的任务*/
  private static class CollectThread implements Runnable{

    @Override
    public void run() {
      StringBuilder result = new StringBuilder();
      for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
        result.append("["+workResult.getValue()+"]");
      }
      System.out.println(" the result = "+ result);
      System.out.println("do other business........");
    }
  }

  /*相互等待的子线程*/
  private static class SubThread implements Runnable{
    @Override
    public void run() {
      long id = Thread.currentThread().getId();
      resultMap.put(Thread.currentThread().getId()+"",id);
      try {
          Thread.sleep(1000+id);
          System.out.println("Thread_"+id+" ....do something ");
        barrier.await();
        Thread.sleep(1000+id);
        System.out.println("Thread_"+id+" ....do its business ");
        barrier.await();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
}

 两者总结

1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;

到此这篇关于详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别的文章就介绍到这了,更多相关java CountDownLatch和CyclicBarrier区别内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

    Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 若有不正之处请多多谅解,并欢迎批评指正. 一.CountDownLatch

  • 详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

    前言 CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用.但两者就其内部实现和使用场景而言是各有所侧重的. 内部实现差异 前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性. public class { //Synchronization control For CountDownLatch. //Uses

  • 详解Java中super的几种用法并与this的区别

    1. 子类的构造函数如果要引用super的话,必须把super放在函数的首位 复制代码 代码如下: class Base { Base() { System.out.println("Base"); } } public class Checket extends Base { Checket() { super();//调用父类的构造方法,一定要放在方法的首个语句 System.out.println("Checket"); } public static voi

  • 详解Java读取Jar中资源文件及示例代码

    详解Java读取Jar中资源文件及实现代码 直接上代码,文章的注释部分说的比较清楚,大家可以参考下, 工具类源代码: ResourceLoadFromJarUtil.java 实现代码: import java.io.IOException; import java.io.InputStream; import java.net.JarURLConnection; import java.net.MalformedURLException; import java.net.URL; import

  • 详解Java回环屏障CyclicBarrier

    上一篇说的CountDownLatch是一个计数器,类似线程的join方法,但是有一个缺陷,就是当计数器的值到达0之后,再调用CountDownLatch的await和countDown方法就会立刻返回,就没有作用了,那么反正是一个计数器,为什么不能重复使用呢?于是就出现了这篇说的CyclicBarrier,它的状态可以被重用: 一.简单例子 用法其实和CountDownLatch差不多,也就是一个计数器,当计数器的值变为0之后,就会把阻塞的线程唤醒: package com.example.d

  • 详解Java线程同步器CountDownLatch

    Java程序有的时候在主线程中会创建多个线程去执行任务,然后在主线程执行完毕之前,把所有线程的任务进行汇总,以前可以用线程的join方法,但是这个方法不够灵活,我们可以使用CountDownLatch类,实现更优雅,而且使用线程池的话,可没有办法调用线程的join方法的呀! 一.简单使用CountDownLatch 直接使用线程: package com.example.demo.study; import java.util.concurrent.CountDownLatch; public

  • 详解Java中CountDownLatch异步转同步工具类

    使用场景 由于公司业务需求,需要对接socket.MQTT等消息队列. 众所周知 socket 是双向通信,socket的回复是人为定义的,客户端推送消息给服务端,服务端的回复是两条线.无法像http请求有回复. 下发指令给硬件时,需要校验此次数据下发是否成功. 用户体验而言,点击按钮就要知道此次的下发成功或失败. 如上图模型, 第一种方案使用Tread.sleep 优点:占用资源小,放弃当前cpu资源 缺点: 回复速度快,休眠时间过长,仍然需要等待休眠结束才能返回,响应速度是固定的,无法及时响

  • 详解Java 信号量Semaphore

    Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数: 一.简单使用 同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行: package com.example.demo.study; import java.util.concurrent.ExecutorService; import ja

  • 详解Java并发包基石AQS

    一.概述 AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的.当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器. 本章我们就一起探究下这个神奇的东东,并对其实现原理进行剖析理解 二.基本实现原理 AQS使用一个int成员变量来表示同步状态,通

  • 一文详解Java线程中的安全策略

    目录 一.不可变对象 二.线程封闭 三.线程不安全类与写法 四.线程安全-同步容器 1. ArrayList -> Vector, Stack 2. HashMap -> HashTable(Key, Value都不能为null) 3. Collections.synchronizedXXX(List.Set.Map) 五.线程安全-并发容器J.U.C 1. ArrayList -> CopyOnWriteArrayList 2.HashSet.TreeSet -> CopyOnW

  • 详解Java信号量Semaphore的原理及使用

    目录 1.Semaphore的概述 2.Semaphore的原理 2.1 基本结构 2.2 可中断获取信号量 2.3 不可中断获取信号量 2.4 超时可中断获取信号量 2.5 尝试获取信号量 2.6 释放信号量 3.Semaphore的使用 4.Semaphore的总结 1.Semaphore的概述 public class Semaphore extends Object implements Serializable Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为

随机推荐