详解Java 信号量Semaphore

  Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数;

一.简单使用

  同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行:

package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
  //创建一个信号量的实例,信号量初始值为0
  static Semaphore semaphore = new Semaphore(0);

  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信号量加一
      semaphore.release();
    });

    pool.submit(()->{
      System.out.println("Thread2---start");
      //信号量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread3---start");
      //信号量加一
      semaphore.release();
    });
    //等待两个子线程执行完毕就放过,必须要信号量等于2才放过
    semaphore.acquire(2);
    System.out.println("两个子线程执行完毕");

    //关闭线程池,正在执行的任务继续执行
    pool.shutdown();

  }

}

这个信号量也可以复用,类似CyclicBarrier:

package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
  //创建一个信号量的实例,信号量初始值为0
  static Semaphore semaphore = new Semaphore(0);

  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信号量加一
      semaphore.release();
    });

    pool.submit(()->{
      System.out.println("Thread2---start");
      //信号量加一
      semaphore.release();
    });

    //等待两个子线程执行完毕就放过,必须要信号量等于2才放过
    semaphore.acquire(2);
    System.out.println("子线程1,2执行完毕");

    pool.submit(()->{
      System.out.println("Thread3---start");
      //信号量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread4---start");
      //信号量加一
      semaphore.release();
    });

    semaphore.acquire(2);
    System.out.println("子线程3,4执行完毕");

    //关闭线程池,正在执行的任务继续执行
    pool.shutdown();

  }

}

二.信号量原理 

  看看下面这个图,可以知道信号量Semaphore还是根据AQS实现的,内部有个Sync工具类操作AQS,还分为公平策略和非公平策略;

构造器:

//默认是非公平策略
public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}
//可以根据第二个参数选择是公平策略还是非公平策略
public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire(int permits)方法:

public void acquire(int permits) throws InterruptedException {
  if (permits < 0) throw new IllegalArgumentException();
  sync.acquireSharedInterruptibly(permits);
}

//AQS中的方法
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
  if (Thread.interrupted()) throw new InterruptedException();
  //这里根据子类是公平策略还是非公平策略
  if (tryAcquireShared(arg) < 0)
    //获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park方法挂起当前线程
    doAcquireSharedInterruptibly(arg);
}
//非公平策略
protected int tryAcquireShared(int acquires) {
  return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
  //一个无限循环,获取state剩余的信号量,因为每调用一次release()方法的话,信号量就会加一,这里将
  //最新的信号量减去传进来的参数比较,比如有两个线程,其中一个线程已经调用了release方法,然后调用acquire(2)方法,那么
  //这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;如果另外一个线程也调用了release方法,
  //那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state
  //
  for (;;) {
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 || compareAndSetState(available, remaining))
      return remaining;
  }
}
//公平策略
//和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1,
//就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平模式一样的了
protected int tryAcquireShared(int acquires) {
  for (;;) {
    if (hasQueuedPredecessors())
      return -1;
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||compareAndSetState(available, remaining))
      return remaining;
  }
}

再看看release(int permits)方法:

//这个方法的作用就是将信号量加一
public void release(int permits) {
  if (permits < 0) throw new IllegalArgumentException();
  sync.releaseShared(permits);
}
//AQS中方法
public final boolean releaseShared(int arg) {
  //tryReleaseShared尝试释放资源
  if (tryReleaseShared(arg)) {
    //释放资源成功就调用park方法唤醒唤醒AQS队列中最前面的节点中的线程
    doReleaseShared();
    return true;
  }
  return false;
}

protected final boolean tryReleaseShared(int releases) {
  //一个无限循环,获取state,然后加上传进去的参数,如果新的state的值小于旧的state,说明已经超过了state的最大值,溢出了
  //没有溢出的话,就用CAS更新state的值
  for (;;) {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
      throw new Error("Maximum permit count exceeded");
    if (compareAndSetState(current, next))
      return true;
  }
}

private void doReleaseShared() {

  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      //ws==Node.SIGNAL表示节点中线程需要被唤醒
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;      // loop to recheck cases
        //调用阻塞队列中线程的unpark方法唤醒线程
        unparkSuccessor(h);
      }
      //ws == 0表示节点中线程是初始状态
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;        // loop on failed CAS
    }

    if (h == head)          // loop if head changed
      break;
  }
}

  以最上面的例子简单说一下,其实不是很难,首先线程1和线程2分别去调用release方法,这个方法里面会将AQS中的state加一,但是在执行这个操作之前,主线程肯定会先到acquire(2),在这个方法里面,假如默认使用非公平策略,首先获取当前的信号量state(state的初始值是0),用当前信号量减去2,如果小于0,那么当前主线程就会丢到AQS队列中阻塞;

  这个时候线程1的release方法执行了,于是就把信号量state加一(此时state==1),CAS更新state为一,成功的话,就调用doReleaseShared()方法唤醒AQS阻塞队列中最先挂起的线程(这里就是因为调用acquire方法而阻塞的主线程),主线程唤醒之后又会去获取最新的信号量,与2比较,发现还是小于0,于是又会阻塞;

  线程2此时的release方法执行完成,重复线程一的操作,主线程唤醒之后(此时state==2),又去获取最新的信号量发现是2,减去acquire方法的参数2等于0,于是就用CAS更新state的值,然后acquire方法也就执行完毕,主线程继续执行后面的代码;

  其实信号量还是很有意思的,记得在项目里,有人利用信号量实现了一个故障隔离,什么时候我可以把整理之后的代码贴出来分享一下,还是很有意思的,就跟springcloud的熔断机制差不多,场景是:比如你在service的一个方法调用第三方的接口,你不知道调不调得通,而且你不希望每次前端过来都会去调用,比如当调用失败的次数超过100次,那么五分钟之后才会再去实际调用这个第三方服务!这五分钟内前调用这个服务,就会触发我们这个故障隔离的机制,向前端返回一个特定的错误码和错误信息!

以上就是详解Java 信号量Semaphore的详细内容,更多关于Java 信号量Semaphore的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java 信号量Semaphore的实现

    近日于LeetCode看题遇1114 按序打印,获悉一解法使用了Semaphore,顺势研究,记心得于此. 此解视Semaphore为锁,以保证同一时刻单线程的顺序执行.在此原题上,我作出如下更改. package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • JAVA 多线程之信号量(Semaphore)实例详解

    java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. 一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.拿到信号量的线程可以进入

  • Java中Semaphore(信号量)的使用方法

    Semaphore的作用: 在java中,使用了synchronized关键字和Lock锁实现了资源的并发访问控制,在同一时间只允许唯一了线程进入临界区访问资源(读锁除外),这样子控制的主要目的是为了解决多个线程并发同一资源造成的数据不一致的问题.在另外一种场景下,一个资源有多个副本可供同时使用,比如打印机房有多个打印机.厕所有多个坑可供同时使用,这种情况下,Java提供了另外的并发访问控制--资源的多副本的并发访问控制,今天学习的信号量Semaphore即是其中的一种. Semaphore实现

  • Java并发编程之Semaphore(信号量)详解及实例

    Java并发编程之Semaphore(信号量)详解及实例 概述 通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预知的接入量呢?及在同一时刻只能获得指定数目的数据库连接,在JDK1.5 java.util.concurrent 包中引入了Semaphore(信号量),信号量是在简单上锁的基础上实现的,相当于能令线程安全执行,并初始化为可用资源个数的计数器,通常用于限制可以访问某些资源(物

  • 详解Java 信号量Semaphore

    Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数: 一.简单使用 同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行: package com.example.demo.study; import java.util.concurrent.ExecutorService; import ja

  • 详解Java信号量Semaphore的原理及使用

    目录 1.Semaphore的概述 2.Semaphore的原理 2.1 基本结构 2.2 可中断获取信号量 2.3 不可中断获取信号量 2.4 超时可中断获取信号量 2.5 尝试获取信号量 2.6 释放信号量 3.Semaphore的使用 4.Semaphore的总结 1.Semaphore的概述 public class Semaphore extends Object implements Serializable Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为

  • 详解Java Slipped Conditions

    所谓Slipped conditions,就是说, 从一个线程检查某一特定条件到该线程操作此条件期间,这个条件已经被其它线程改变,导致第一个线程在该条件上执行了错误的操作.这里有一个简单的例子: public class Lock { private boolean isLocked = true; public void lock(){ synchronized(this){ while(isLocked){ try{ this.wait(); } catch(InterruptedExcep

  • 详解Java并发包基石AQS

    一.概述 AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的.当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器. 本章我们就一起探究下这个神奇的东东,并对其实现原理进行剖析理解 二.基本实现原理 AQS使用一个int成员变量来表示同步状态,通

  • 一文详解Java线程中的安全策略

    目录 一.不可变对象 二.线程封闭 三.线程不安全类与写法 四.线程安全-同步容器 1. ArrayList -> Vector, Stack 2. HashMap -> HashTable(Key, Value都不能为null) 3. Collections.synchronizedXXX(List.Set.Map) 五.线程安全-并发容器J.U.C 1. ArrayList -> CopyOnWriteArrayList 2.HashSet.TreeSet -> CopyOnW

  • 详解Java ReentrantReadWriteLock读写锁的原理与实现

    目录 概述 原理概述 加锁原理 图解过程 源码解析 解锁原理 图解过程 源码解析 概述 ReentrantReadWriteLock读写锁是使用AQS的集大成者,用了独占模式和共享模式.本文和大家一起理解下ReentrantReadWriteLock读写锁的实现原理.在这之前建议大家阅读下下面3篇关联文章: 深入浅出理解Java并发AQS的独占锁模式 深入浅出理解Java并发AQS的共享锁模式 通俗易懂读写锁ReentrantReadWriteLock的使用 原理概述 上图是ReentrantR

  • 详解Java中@Override的作用

    详解Java中@Override的作用 @Override是伪代码,表示重写(当然不写也可以),不过写上有如下好处: 1.可以当注释用,方便阅读: 2.编译器可以给你验证@Override下面的方法名是否是你父类中所有的,如果没有则报错.例如,你如果没写@Override,而你下面的方法名又写错了,这时你的编译器是可以编译通过的,因为编译器以为这个方法是你的子类中自己增加的方法. 举例:在重写父类的onCreate时,在方法前面加上@Override 系统可以帮你检查方法的正确性. @Overr

  • 详解Java中多线程异常捕获Runnable的实现

    详解Java中多线程异常捕获Runnable的实现 1.背景: Java 多线程异常不向主线程抛,自己处理,外部捕获不了异常.所以要实现主线程对子线程异常的捕获. 2.工具: 实现Runnable接口的LayerInitTask类,ThreadException类,线程安全的Vector 3.思路: 向LayerInitTask中传入Vector,记录异常情况,外部遍历,判断,抛出异常. 4.代码: package step5.exception; import java.util.Vector

  • 详解Java编写并运行spark应用程序的方法

    我们首先提出这样一个简单的需求: 现在要分析某网站的访问日志信息,统计来自不同IP的用户访问的次数,从而通过Geo信息来获得来访用户所在国家地区分布状况.这里我拿我网站的日志记录行示例,如下所示: 121.205.198.92 - - [21/Feb/2014:00:00:07 +0800] "GET /archives/417.html HTTP/1.1" 200 11465 "http://shiyanjun.cn/archives/417.html/" &qu

  • 详解java 中Spring jsonp 跨域请求的实例

    详解java 中Spring jsonp 跨域请求的实例 jsonp介绍 JSONP(JSON with Padding)是JSON的一种"使用模式",可用于解决主流浏览器的跨域数据访问的问题.由于同源策略,一般来说位于 server1.example.com 的网页无法与不是 server1.example.com的服务器沟通,而 HTML 的<script> 元素是一个例外.利用 <script> 元素的这个开放策略,网页可以得到从其他来源动态产生的 JSO

随机推荐