JUC之Semaphore源码分析

Semaphore 主要用于限量控制并发执行代码的工具类, 其内部通过 一个 permit 来进行定义并发执行的数量。

/**
 * 使用非公平版本构件 Semaphore
 */
public KSemaphore(int permits){
 sync = new NonfairSync(permits);
}

/**
 * 指定版本构件 Semaphore
 */
public KSemaphore(int permits, boolean fair){
 sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
/** AQS 的子类主要定义获取释放 lock */
abstract static class Sync extends KAbstractQueuedSynchronizer{
 private static final long serialVersionUID = 1192457210091910933L;

 /**
  * 指定 permit 初始化 Semaphore
  */
 Sync(int permits){
  setState(permits);
 }

 /**
  * 返回剩余 permit
  */
 final int getPermits(){
  return getState();
 }

 /**
  * 获取 permit
  */
 final int nonfairTryAcquireShared(int acquires){
  for(;;){
   int available = getState();
   int remaining = available - acquires; // 判断获取 acquires 的剩余 permit 数目
   if(remaining < 0 ||
     compareAndSetState(available, remaining)){ // cas改变 state
    return remaining;
   }
  }
 }

 /**
  * 释放 lock
  */
 protected final boolean tryReleaseShared(int releases){
  for(;;){
   int current = getState();
   int next = current + releases;
   if(next < current){ // overflow
    throw new Error(" Maximum permit count exceeded");
   }
   if(compareAndSetState(current, next)){ // cas改变 state
    return true;
   }
  }
 }

 final void reducePermits(int reductions){ // 减少 permits
  for(;;){
   int current = getState();
   int next = current - reductions;
   if(next > current){ // underflow
    throw new Error(" Permit count underflow ");
   }
   if(compareAndSetState(current, next)){
    return;
   }
  }
 }

 /** 将 permit 置为 0 */
 final int drainPermits(){
  for(;;){
   int current = getState();
   if(current == 0 || compareAndSetState(current, 0)){
    return current;
   }
  }
 }

}
/**
 * 调用 acquireSharedInterruptibly 响应中断的方式获取 permit
 */
public void acquire() throws InterruptedException{
 sync.acquireSharedInterruptibly(1);
}

/**
 * 调用 acquireUninterruptibly 非响应中断的方式获取 permit
 */
public void acquireUninterruptibly(){
 sync.acquireShared(1);
}

/**
 * 尝试获取 permit
 */
public boolean tryAcquire(){
 return sync.nonfairTryAcquireShared(1) >= 0;
}

/**
 * 尝试的获取 permit, 支持超时与中断
 */
public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException{
 return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

/**
 * 支持中断的获取permit
 */
public void acquire(int permits) throws InterruptedException{
 if(permits < 0){
  throw new IllegalArgumentException();
 }
 sync.acquireSharedInterruptibly(permits);
}

/**
 * 不响应中断的获取 permit
 */
public void acquireUninterruptibly(int permits){
 if(permits < 0) throw new IllegalArgumentException();
 sync.acquireShared(permits);
}

/**
 * 尝试获取 permit
 */
public boolean tryAcquire(int permits){
 if(permits < 0) throw new IllegalArgumentException();
 return sync.nonfairTryAcquireShared(permits) >= 0;
}

/**
 * 尝试 支持超时机制, 支持中断 的获取 permit
 */
public boolean tryAcquire(int permits, long timout, TimeUnit unit) throws InterruptedException{
 if(permits < 0) throw new IllegalArgumentException();
 return sync.tryAcquireSharedNanos(permits, unit.toNanos(timout));
}
/**
 * 释放 permit
 */
public void release(){
 sync.releaseShared(1);
}

/**
 * 释放 permit
 */
public void release(int permits){
 if(permits < 0) throw new IllegalArgumentException();
 sync.releaseShared(permits);
}
/**
 * 返回可用的 permit
 */
public int availablePermits(){
 return sync.getPermits();
}

/**
 * 消耗光 permit
 */
public int drainPermits(){
 return sync.drainPermits();
}

/**
 * 减少 reduction 个permit
 */
protected void reducePermits(int reduction){
 if(reduction < 0) throw new IllegalArgumentException();
 sync.reducePermits(reduction);
}

/**
 * 判断是否是公平版本
 */
public boolean isFair(){
 return sync instanceof FairSync;
}

/**
 * 返回 AQS 中 Sync Queue 里面的等待线程
 */
public final boolean hasQueuedThreads(){
 return sync.hasQueuedThreads();
}

/**
 * 返回 AQS 中 Sync Queue 里面的等待线程长度
 */
public final int getQueueLength(){
 return sync.getQueueLength();
}

/**
 * 返回 AQS 中 Sync Queue 里面的等待线程
 */
protected Collection<Thread> getQueueThreads(){
 return sync.getQueuedThreads();
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java编程线程同步工具Exchanger的使用实例解析

    本文研究的主要是Java编程线程同步工具Exchanger的使用,下面看看具体内容. 如果两个线程在运行过程中需要交换彼此的信息,比如一个数据或者使用的空间,就需要用到Exchanger这个类,Exchanger为线程交换信息提供了非常方便的途径,它可以作为两个线程交换对象的同步点,只有当每个线程都在进入 exchange ()方法并给出对象时,才能接受其他线程返回时给出的对象. 每次只能两个线程交换数据,如果有多个线程,也只有两个能交换数据.下面看个通俗的例子:一手交钱一首交货! public

  • 通俗易懂学习java并发工具类-Semaphore,Exchanger

    1. 控制资源并发访问--Semaphore Semaphore可以理解为信号量,用于控制资源能够被并发访问的线程数量,以保证多个线程能够合理的使用特定资源. Semaphore就相当于一个许可证,线程需要先通过acquire方法获取该许可证,该线程才能继续往下执行,否则只能在该方法出阻塞等待.当执行完业务功能后,需要通过release()方法将许可证归还,以便其他线程能够获得许可证继续执行. Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接.假如有多个线程读取

  • Java多线程编程之使用Exchanger数据交换实例

    用于实现两个人之间的数据交换,每个人在完成一定的事务后想与对方交换数据,第一个先拿出数据的人将一直等待第二个人拿着数据到来时,才能彼此交换数据. 复制代码 代码如下: package com.ljq.test.thread;   import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;   public cla

  • JUC之Semaphore源码分析

    Semaphore 主要用于限量控制并发执行代码的工具类, 其内部通过 一个 permit 来进行定义并发执行的数量. /** * 使用非公平版本构件 Semaphore */ public KSemaphore(int permits){ sync = new NonfairSync(permits); } /** * 指定版本构件 Semaphore */ public KSemaphore(int permits, boolean fair){ sync = fair ? new Fair

  • Java并发系列之Semaphore源码分析

    Semaphore(信号量)是JUC包中比较常用到的一个类,它是AQS共享模式的一个应用,可以允许多个线程同时对共享资源进行操作,并且可以有效的控制并发数,利用它可以很好的实现流量控制.Semaphore提供了一个许可证的概念,可以把这个许可证看作公共汽车车票,只有成功获取车票的人才能够上车,并且车票是有一定数量的,不可能毫无限制的发下去,这样就会导致公交车超载.所以当车票发完的时候(公交车以满载),其他人就只能等下一趟车了.如果中途有人下车,那么他的位置将会空闲出来,因此如果这时其他人想要上车

  • Java并发系列之CyclicBarrier源码分析

    现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始.例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始.在JUC包中为我们提供了一个同步工具类能够很好的模拟这类场景,它就是CyclicBarrier类.利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作.下图演示了这一过程. 在CyclicBarrier类的内部有一个计数器,每个线程在到达屏障点的时候都会调用await

  • Java并发系列之AbstractQueuedSynchronizer源码分析(概要分析)

    学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore等.而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性.所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析,由于这个类比较重要,而且代码比较长,为了

  • Java并发系列之CountDownLatch源码分析

    CountDownLatch(闭锁)是一个很有用的工具类,利用它我们可以拦截一个或多个线程使其在某个条件成熟后再执行.它的内部提供了一个计数器,在构造闭锁时必须指定计数器的初始值,且计数器的初始值必须大于0.另外它还提供了一个countDown方法来操作计数器的值,每调用一次countDown方法计数器都会减1,直到计数器的值减为0时就代表条件已成熟,所有因调用await方法而阻塞的线程都会被唤醒.这就是CountDownLatch的内部机制,看起来很简单,无非就是阻塞一部分线程让其在达到某个条

  • Java并发系列之ConcurrentHashMap源码分析

    我们知道哈希表是一种非常高效的数据结构,设计优良的哈希函数可以使其上的增删改查操作达到O(1)级别.Java为我们提供了一个现成的哈希结构,那就是HashMap类,在前面的文章中我曾经介绍过HashMap类,知道它的所有方法都未进行同步,因此在多线程环境中是不安全的.为此,Java为我们提供了另外一个HashTable类,它对于多线程同步的处理非常简单粗暴,那就是在HashMap的基础上对其所有方法都使用synchronized关键字进行加锁.这种方法虽然简单,但导致了一个问题,那就是在同一时间

  • Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)

    通过上一篇的分析,我们知道了独占模式获取锁有三种方式,分别是不响应线程中断获取,响应线程中断获取,设置超时时间获取.在共享模式下获取锁的方式也是这三种,而且基本上都是大同小异,我们搞清楚了一种就能很快的理解其他的方式.虽然说AbstractQueuedSynchronizer源码有一千多行,但是重复的也比较多,所以读者不要刚开始的时候被吓到,只要耐着性子去看慢慢的自然能够渐渐领悟.就我个人经验来说,阅读AbstractQueuedSynchronizer源码有几个比较关键的地方需要弄明白,分别是

  • Java并发系列之ReentrantLock源码分析

    在Java5.0之前,协调对共享对象的访问可以使用的机制只有synchronized和volatile.我们知道synchronized关键字实现了内置锁,而volatile关键字保证了多线程的内存可见性.在大多数情况下,这些机制都能很好地完成工作,但却无法实现一些更高级的功能,例如,无法中断一个正在等待获取锁的线程,无法实现限定时间的获取锁机制,无法实现非阻塞结构的加锁规则等.而这些更灵活的加锁机制通常都能够提供更好的活跃性或性能.因此,在Java5.0中增加了一种新的机制:Reentrant

  • Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)

    通过前面三篇的分析,我们深入了解了AbstractQueuedSynchronizer的内部结构和一些设计理念,知道了AbstractQueuedSynchronizer内部维护了一个同步状态和两个排队区,这两个排队区分别是同步队列和条件队列.我们还是拿公共厕所做比喻,同步队列是主要的排队区,如果公共厕所没开放,所有想要进入厕所的人都得在这里排队.而条件队列主要是为条件等待设置的,我们想象一下如果一个人通过排队终于成功获取锁进入了厕所,但在方便之前发现自己没带手纸,碰到这种情况虽然很无奈,但是它

  • Java并发 结合源码分析AQS原理

    前言: 如果说J.U.C包下的核心是什么?那我想答案只有一个就是AQS.那么AQS是什么呢?接下来让我们一起揭开AQS的神秘面纱 AQS是什么? AQS是AbstractQueuedSynchronizer的简称.为什么说它是核心呢?是因为它提供了一个基于FIFO的队列和state变量来构建锁和其他同步装置的基础框架.下面是其底层的数据结构. AQS的特点 1.其内使用Node实现FIFO(FirstInFirstOut)队列.可用于构建锁或者其他同步装置的基础框架 2.且利用了一个int类表示

随机推荐