Java并发编程Semaphore计数信号量详解

Semaphore 是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量)。

简单示例:

package me.socketthread;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class SemaphoreLearn {
  //信号量总数
  private static final int SEM_MAX = 12;
  public static void main(String[] args) {
    Semaphore sem = new Semaphore(SEM_MAX);
    //创建线程池
    ExecutorService threadPool = Executors.newFixedThreadPool(3);
    //在线程池中执行任务
    threadPool.execute(new MyThread(sem, 7));
    threadPool.execute(new MyThread(sem, 4));
    threadPool.execute(new MyThread(sem, 2));
    //关闭池
    threadPool.shutdown();
  }
}
  class MyThread extends Thread {
    private volatile Semaphore sem;  // 信号量
    private int count;    // 申请信号量的大小  

    MyThread(Semaphore sem, int count) {
      this.sem = sem;
      this.count = count;
    }
    public void run() {
      try {
       // 从信号量中获取count个许可
        sem.acquire(count);
        Thread.sleep(2000);
        System.out.println(Thread.currentThread().getName() + " acquire count="+count);
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        // 释放给定数目的许可,将其返回到信号量。
        sem.release(count);
        System.out.println(Thread.currentThread().getName() + " release " + count + "");
      }
    }
  } 

执行结果:

pool-1-thread-2 acquire count=4
pool-1-thread-1 acquire count=7
pool-1-thread-1 release 7
pool-1-thread-2 release 4
pool-1-thread-3 acquire count=2
pool-1-thread-3 release 2

线程1和线程2会并发执行,因为两者的信号量和没有超过总信号量,当前两个线程释放掉信号量之后线程3才能继续执行。

源码分析:

1、构造函数

在构造函数中会初始化信号量值,这值最终是作为锁标志位state的值

Semaphore sem = new Semaphore(12);//简单来说就是给锁标识位state赋值为12 

2、Semaphore.acquire(n);简单理解为获取锁资源,如果获取不到线程阻塞

Semaphore.acquire(n);//从锁标识位state中获取n个信号量,简单来说是state = state-n 此时state大于0表示可以获取信号量,如果小于0则将线程阻塞 
public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    //获取锁
    sync.acquireSharedInterruptibly(permits);
  } 

acquireSharedInterruptibly中的操作是获取锁资源,如果可以获取则将state= state-permits,否则将线程阻塞

public final void acquireSharedInterruptibly(int arg)
      throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)//tryAcquireShared中尝试获取锁资源
      doAcquireSharedInterruptibly(arg); //将线程阻塞
  } 

tryAcquireShared中的操作是尝试获取信号量值,简单来说就是state=state-acquires ,如果此时小于0则返回负值,否则返回大于新值,再判断是否将当线程线程阻塞

protected int tryAcquireShared(int acquires) {
      for (;;) {
        if (hasQueuedPredecessors())
          return -1;
      //获取state值
        int available = getState();
      //从state中获取信号量
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
        //如果信号量小于0则直接返回,表示无法获取信号量,否则将state值修改为新值
          return remaining;
      }
    } 

doAcquireSharedInterruptibly中的操作简单来说是将当前线程添加到FIFO队列中并将当前线程阻塞。

/会将线程添加到FIFO队列中,并阻塞
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    //将线程添加到FIFO队列中
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
      for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
          int r = tryAcquireShared(arg);
          if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            failed = false;
            return;
          }
        }
        //parkAndCheckInterrupt完成线程的阻塞操作
        if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
          throw new InterruptedException();
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  } 

3、Semaphore.release(int permits),这个函数的实现操作是将state = state+permits并唤起处于FIFO队列中的阻塞线程。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
  //state = state+permits,并将FIFO队列中的阻塞线程唤起
    sync.releaseShared(permits);
  } 

releaseShared中的操作是将state = state+permits,并将FIFO队列中的阻塞线程唤起。

public final boolean releaseShared(int arg) {
    //tryReleaseShared将state设置为state = state+arg
    if (tryReleaseShared(arg)) {
      //唤起FIFO队列中的阻塞线程
      doReleaseShared();
      return true;
    }
    return false;
  } 

tryReleaseShared将state设置为state = state+arg

protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        int next = current + releases;
        if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
        //将state值设置为state=state+releases
        if (compareAndSetState(current, next))
          return true;
      }
    } 

doReleaseShared()唤起FIFO队列中的阻塞线程

private void doReleaseShared() {  

    for (;;) {
      Node h = head;
      if (h != null && h != tail) {
        int ws = h.waitStatus;
        if (ws == Node.SIGNAL) {
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
            continue;      // loop to recheck cases
          //完成阻塞线程的唤起操作
          unparkSuccessor(h);
        }
        else if (ws == 0 &&
             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
          continue;        // loop on failed CAS
      }
      if (h == head)          // loop if head changed
        break;
    }
  }  

总结:Semaphore简单来说设置了一个信号量池state,当线程执行时会从state中获取值,如果可以获取则线程执行,并且在执行后将获取的资源返回到信号量池中,并唤起其他阻塞线程;如果信号量池中的资源无法满足某个线程的需求则将此线程阻塞。

Semaphore源码:

public class Semaphore implements java.io.Serializable {
  private static final long serialVersionUID = -3222578661600680210L;
  private final Sync sync;
  abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    //设置锁标识位state的初始值
    Sync(int permits) {
      setState(permits);
    }
    //获取锁标识位state的值,如果state值大于其需要的值则表示锁可以获取
    final int getPermits() {
      return getState();
    }
    //获取state值减去acquires后的值,如果大于等于0则表示锁可以获取
    final int nonfairTryAcquireShared(int acquires) {
      for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
    //释放锁
    protected final boolean tryReleaseShared(int releases) {
      for (;;) {
        int current = getState();
        //将state值加上release值
        int next = current + releases;
        if (next < current) // overflow
          throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
          return true;
      }
    }
    //将state的值减去reductions
    final void reducePermits(int reductions) {
      for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
          throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
          return;
      }
    }
    final int drainPermits() {
      for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
          return current;
      }
    }
  }
  //非公平锁
  static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
      super(permits);
    }
    protected int tryAcquireShared(int acquires) {
      return nonfairTryAcquireShared(acquires);
    }
  }
  //公平锁
  static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
      super(permits);
    }
    protected int tryAcquireShared(int acquires) {
      for (;;) {
        if (hasQueuedPredecessors())
          return -1;
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
          compareAndSetState(available, remaining))
          return remaining;
      }
    }
  }
  //设置信号量
  public Semaphore(int permits) {
    sync = new NonfairSync(permits);
  }
  public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
  }
  //获取锁
  public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
  }
  public void acquireUninterruptibly() {
    sync.acquireShared(1);
  }
  public boolean tryAcquire() {
    return sync.nonfairTryAcquireShared(1) >= 0;
  }
  public boolean tryAcquire(long timeout, TimeUnit unit)
    throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
  }
  public void release() {
    sync.releaseShared(1);
  }
  //获取permits值锁
  public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
  }
  public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
  }
  public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
  }
  public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
  }
  //释放
  public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
  }
  public int availablePermits() {
    return sync.getPermits();
  }
  public int drainPermits() {
    return sync.drainPermits();
  }
  protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
  }
  public boolean isFair() {
    return sync instanceof FairSync;
  }
  public final boolean hasQueuedThreads() {
    return sync.hasQueuedThreads();
  }
  public final int getQueueLength() {
    return sync.getQueueLength();
  }
  protected Collection<Thread> getQueuedThreads() {
    return sync.getQueuedThreads();
  }
  public String toString() {
    return super.toString() + "[Permits = " + sync.getPermits() + "]";
  }
} 

总结

以上就是本文关于Java并发编程Semaphore计数信号量详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Java并发编程之重入锁与读写锁、Java系统的高并发解决方法详解、java高并发锁的3种实现示例代码等,有什么问题,可以留言交流讨论。感谢朋友们对本站的支持!

(0)

相关推荐

  • 详解java中的互斥锁信号量和多线程等待机制

    互斥锁和信号量都是操作系统中为并发编程设计基本概念,互斥锁和信号量的概念上的不同在于,对于同一个资源,互斥锁只有0和1 的概念,而信号量不止于此.也就是说,信号量可以使资源同时被多个线程访问,而互斥锁同时只能被一个线程访问 互斥锁在java中的实现就是 ReetranLock , 在访问一个同步资源时,它的对象需要通过方法 tryLock() 获得这个锁,如果失败,返回 false,成功返回true.根据返回的信息来判断是否要访问这个被同步的资源.看下面的例子 public class Reen

  • java信号量控制线程打印顺序的示例分享

    复制代码 代码如下: import java.util.concurrent.Semaphore; public class ThreeThread { public static void main(String[] args) throws InterruptedException {  Semaphore sempA = new Semaphore(1);  Semaphore sempB = new Semaphore(0);  Semaphore sempC = new Semapho

  • JAVA 多线程之信号量(Semaphore)实例详解

    java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. 一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.拿到信号量的线程可以进入

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • Java并发编程总结——慎用CAS详解

    一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

  • Java并发编程预防死锁过程详解

    这篇文章主要介绍了Java并发编程预防死锁过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在java并发编程领域已经有技术大咖总结出了发生死锁的条件,只有四个条件都发生时才会出现死锁: 1.互斥,共享资源X和Y只能被一个线程占用 2.占有且等待,线程T1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X 3.不可抢占,其他线程不能强行抢占线程T1占有的资源 4.循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有

  • Java并发编程之阻塞队列详解

    1.什么是阻塞队列? 队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素.阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略.使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞. 2.主要的阻塞队列及其方法 java.util.concurrent包下提供主要的几种阻塞队列,主要有以下几个: 1

  • Java并发编程之Exchanger方法详解

    简介 Exchanger是一个用于线程间数据交换的工具类,它提供一个公共点,在这个公共点,两个线程可以交换彼此的数据. 当一个线程调用exchange方法后将进入等待状态,直到另外一个线程调用exchange方法,双方完成数据交换后继续执行. Exchanger的使用 方法介绍 exchange(V x):阻塞当前线程,直到另外一个线程调用exchange方法或者当前线程被中断. x : 需要交换的对象. exchange(V x, long timeout, TimeUnit unit):阻塞

  • java异步编程CompletableFuture使用示例详解

    目录 一.简单介绍 二.常见操作 1.使用默认线程池 2.使用自定义线程池 3.获取线程的执行结果 三.处理异步结算的结果 四.异常处理 五.组合 CompletableFuture 六.并行运行多个 CompletableFuture 七.案例 1.从多个平台获取书价格 2.从任意一个平台获取结果就返回 一.简单介绍 CompletableFuture 同时实现了 Future 和 CompletionStage 接口. public class CompletableFuture<T> i

  • Java并发编程之LockSupport类详解

    一.LockSupport类的属性 private static final sun.misc.Unsafe UNSAFE; // 表示内存偏移地址 private static final long parkBlockerOffset; // 表示内存偏移地址 private static final long SEED; // 表示内存偏移地址 private static final long PROBE; // 表示内存偏移地址 private static final long SEC

  • Java并发编程之Executors类详解

    一.Executors的理解 Executors类属于java.util.concurrent包: 线程池的创建分为两种方式:ThreadPoolExecutor 和 Executors: Executors(静态Executor工厂)用于创建线程池: 工厂和工具方法Executor , ExecutorService , ScheduledExecutorService , ThreadFactory和Callable在此包中定义的类: jdk1.8API中的解释如下: 二.Executors

  • Java并发编程之Volatile变量详解分析

    目录 一.volatile变量的特性 1.1.保证可见性,不保证原子性 1.2.禁止指令重排 二.内存屏障 三.happens-before Volatile关键字是Java提供的一种轻量级的同步机制.Java 语言包含两种内在的同步机制:同步块(或方法)和 volatile 变量, 相比synchronized(synchronized通常称为重量级锁),volatile更轻量级,因为它不会引起线程上下文的切换和调度. 但是volatile 变量的同步性较差(有时它更简单并且开销更低),而且其

  • Java GUI编程菜单组件实例详解

    前面讲解了如果构建GUI界面,其实就是把一些GUI的组件,按照一定的布局放入到容器中展示就可以了.在实际开发中,除了主界面,还有一类比较重要的内容就是菜单相关组件,可以通过菜单相关组件很方便的使用特定的功能,在AWT中,菜单相关组件的使用和之前学习的组件是一模一样的,只需要把菜单条.菜单.菜单项组合到一起,按照一定的布局,放入到容器中即可. 下表中给出常见的菜单相关组件: 菜单组件名称 功能 MenuBar 菜单条 , 菜单的容器 . Menu 菜单组件 , 菜单项的容器 . 它也是Menult

随机推荐