Java Semaphore信号量使用分析讲解

目录
  • 前言
  • 介绍和使用
    • API介绍
    • 基本使用
  • 原理介绍
    • 获取许可acquire()
    • 释放许可release()
  • 总结

前言

大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行。那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore

介绍和使用

Semaphore(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量

Semaphore的许可数量如果小于0个,就会阻塞获取,直到有线程释放许可

Semaphore是一个非重入锁

API介绍

构造方法

  • public Semaphore(int permits)permits 表示许可线程的数量
  • public Semaphore(int permits, boolean fair)fair 表示公平性,如果设为 true,表示是公平,那么等待最久的线程先执行

常用API

  • public void acquire():表示一个线程获取1个许可,那么线程许可数量相应减少一个
  • public void release():表示释放1个许可,那么线程许可数量相应会增加

其他API

  • void acquire(int permits):表示一个线程获取n个许可,这个数量由参数permits决定
  • void release(int permits):表示一个线程释放n个许可,这个数量由参数permits决定
  • int availablePermits():返回当前信号量线程许可数量
  • int getQueueLength(): 返回等待获取许可的线程数的预估值

基本使用

public static void main(String[] args) {
        // 1. 创建 semaphore 对象
        Semaphore semaphore = new Semaphore(2);
        // 2. 10个线程同时运行
        for (int i = 0; i < 8; i++) {
            new Thread(() -> {
                // 3. 获取许可
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.debug("running...");
                    sleep(1);
                    log.debug("end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 4. 释放许可
                    semaphore.release();
                }
            }).start();
        }
    }

运行结果:

原理介绍

上面是Semaphore的类结构图,其中FairSyncNonfairSync是它的内部类,他们共同继承了AQS类,AQS的共享模式提供了Semaphore的加锁、解锁。

为了更好的搞懂原理,我们通过一个例子来帮助我们理解。

假设Semaphorepermits为 3,这时 5 个线程来获取资源,其中Thread-1Thread-2Thread-4CAS 竞争成功,permits 变为 0,而 Thread-0 Thread-3 竞争失败。

获取许可acquire()

  • acquire()主方法会调用 sync.acquireSharedInterruptibly(1)方法
  • acquireSharedInterruptibly()方法会先调用tryAcquireShared()方法返回许可的数量,如果小于0个,调用doAcquireSharedInterruptibly()方法进入阻塞
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取通行证,获取成功返回 >= 0的值
    if (tryAcquireShared(arg) < 0)
        // 获取许可证失败,进入阻塞
        doAcquireSharedInterruptibly(arg);
}
  • tryAcquireShared()方法在终会调用到Sync#nonfairTryAcquireShared()方法
  • nonfairTryAcquireShared()方法中会减去获取的许可数量,返回剩余的许可数量
// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 获取 state ,state 这里【表示通行证】
        int available = getState();
        // 计算当前线程获取通行证完成之后,通行证还剩余数量
        int remaining = available - acquires;
        // 如果许可已经用完, 返回负数, 表示获取失败,
        if (remaining < 0 ||
            // 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  • 如果剩余的许可数量<0, 会调用doAcquireSharedInterruptibly()方法将当前线程加入到阻塞队列中阻塞
  • 方法中调用parkAndCheckInterrupt()阻塞当前线程
private void doAcquireSharedInterruptibly(int arg) {
    // 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    // 获取标记
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 前驱节点是头节点可以再次获取许可
            if (p == head) {
                // 再次尝试获取许可,【返回剩余的许可证数量】
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 成功后本线程出队(AQS), 所在 Node设置为 head
                    // r 表示【可用资源数】, 为 0 则不会继续传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 被打断后进入该逻辑
        if (failed)
            cancelAcquire(node);
    }
}

最终的AQS状态如下图所示:

  • Thread-1Thread-2Thread-4正常运行
  • AQS的state也就是等于0
  • Thread-0Thread-3再阻塞队列中

释放许可release()

现在Thread-4运行完毕,要释放许可,Thread-0Thread-3又是如何恢复执行的呢?

  • 调用release()方法释放许可,最终调用 Sync#releaseShared()方法
  • 如果方法tryReleaseShared(arg)尝试释放许可成功,那么调用doReleaseShared();进行唤醒
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
    // 尝试释放锁
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
  • tryReleaseShared()方法主要是尝试释放许可
  • 获取当前许可数量 + 释放的数量,然后通过cas设置回去
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前锁资源的可用许可证数量
        int current = getState();
        int next = current + releases;
        // 索引越界判断
        if (next < current)
            throw new Error("Maximum permit count exceeded");
        // 释放锁
        if (compareAndSetState(current, next))
            return true;
    }
}
  • 调用doReleaseShared()方法唤醒队列中的线程
  • 其中unparkSuccessor()方法是唤醒的核心操作
// 唤醒
private void doReleaseShared() {
    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
    // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 防止 unparkSuccessor 被多次执行
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            // 如果已经是 0 了,改为 -3,用来解决传播性
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

最终AQS状态如下图所示:

  • 许可state变回1
  • 然后Thread-0开始竞争,如果竞争成功,如下图所示:

  • 由于Thread-0竞争成功,再次获取到许可,许可数量减1,最终又变回0
  • 然后等待队列中剩余Thread-3

总结

Semaphore信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本,它用来限制能同时访问共享资源的线程上限,典型的应用场景是可以用来保护有限的公共资源,比如数据库连接等。

到此这篇关于Java Semaphore信号量使用分析讲解的文章就介绍到这了,更多相关Java Semaphore信号量内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • JAVA 多线程之信号量(Semaphore)实例详解

    java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. 一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.拿到信号量的线程可以进入

  • Java信号量Semaphore原理及代码实例

    Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目.自从5.0开始,jdk在java.util.concurrent包里提供了Semaphore 的官方实现,因此大家不需要自己去实现Semaphore. 下面的类使用信号量控制对内容池的访问: import java.util.concurrent.Semaphore; class Pool { private static final int MAX_AVAILABLE = 100; private final Sema

  • 详解Java信号量Semaphore的原理及使用

    目录 1.Semaphore的概述 2.Semaphore的原理 2.1 基本结构 2.2 可中断获取信号量 2.3 不可中断获取信号量 2.4 超时可中断获取信号量 2.5 尝试获取信号量 2.6 释放信号量 3.Semaphore的使用 4.Semaphore的总结 1.Semaphore的概述 public class Semaphore extends Object implements Serializable Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为

  • Java中Semaphore(信号量)的使用方法

    Semaphore的作用: 在java中,使用了synchronized关键字和Lock锁实现了资源的并发访问控制,在同一时间只允许唯一了线程进入临界区访问资源(读锁除外),这样子控制的主要目的是为了解决多个线程并发同一资源造成的数据不一致的问题.在另外一种场景下,一个资源有多个副本可供同时使用,比如打印机房有多个打印机.厕所有多个坑可供同时使用,这种情况下,Java提供了另外的并发访问控制--资源的多副本的并发访问控制,今天学习的信号量Semaphore即是其中的一种. Semaphore实现

  • Java并发编程之Semaphore(信号量)详解及实例

    Java并发编程之Semaphore(信号量)详解及实例 概述 通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预知的接入量呢?及在同一时刻只能获得指定数目的数据库连接,在JDK1.5 java.util.concurrent 包中引入了Semaphore(信号量),信号量是在简单上锁的基础上实现的,相当于能令线程安全执行,并初始化为可用资源个数的计数器,通常用于限制可以访问某些资源(物

  • 分析Java并发编程之信号量Semaphore

    目录 一.认识Semaphore 1.1.Semaphore 的使用场景 1.2.Semaphore 使用 1.3.Semaphore 信号量的模型 二.Semaphore 深入理解 2.1.Semaphore 基本属性 2.2.Semaphore 的公平性和非公平性 2.3.其他 Semaphore 方法 一.认识Semaphore 1.1.Semaphore 的使用场景 Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • 详解Java 信号量Semaphore

    Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数: 一.简单使用 同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行: package com.example.demo.study; import java.util.concurrent.ExecutorService; import ja

  • Java 信号量Semaphore的实现

    近日于LeetCode看题遇1114 按序打印,获悉一解法使用了Semaphore,顺势研究,记心得于此. 此解视Semaphore为锁,以保证同一时刻单线程的顺序执行.在此原题上,我作出如下更改. package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class

  • Java Semaphore信号量使用分析讲解

    目录 前言 介绍和使用 API介绍 基本使用 原理介绍 获取许可acquire() 释放许可release() 总结 前言 大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行.那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore. 介绍和使用 Semaphore(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量 Semaphore的许可数量如果小于0个,就

  • Java 数组高频考点分析讲解

    目录 1.数组理论基础 2.常见考点 1.二分查找 2.移除元素 1.数组理论基础 数组是存放在连续内存空间上的相同类型数据的集合,可以通过下标索引的方式获取到下标下对应的数据. 举个栗子(字符数组)~ 可以看到: 1.数组的下标从0开始 2.数组在内存中的地址是连续的 所以在删除元素时,只能用覆盖的方式进行. 例如,要删除下标为2的元素~ 就需要将从2之后的元素依次移到前一个,覆盖掉要删除的元素. 所以删除元素并不是将该元素的空间释放了,而是将后面的元素移到前面,覆盖掉要删除的元素,然后将数组

  • Java @GlobalLock注解详细分析讲解

    目录 GlobalLock的作用 全局锁 为什么要使用GlobalLock 工作原理 GlobalLock的作用 对于某条数据进行更新操作,如果全局事务正在进行,当某个本地事务需要更新该数据时,需要使用@GlobalLock确保其不会对全局事务正在操作的数据进行修改.防止的本地事务对全局事务的数据脏写.如果和select for update组合使用,还可以起到防止脏读的效果. 全局锁 首先我们知道,seata的AT模式是二段提交的,而且AT模式能够做到事务ACID四种特性中的A原子性和D持久性

  • Java并发系列之Semaphore源码分析

    Semaphore(信号量)是JUC包中比较常用到的一个类,它是AQS共享模式的一个应用,可以允许多个线程同时对共享资源进行操作,并且可以有效的控制并发数,利用它可以很好的实现流量控制.Semaphore提供了一个许可证的概念,可以把这个许可证看作公共汽车车票,只有成功获取车票的人才能够上车,并且车票是有一定数量的,不可能毫无限制的发下去,这样就会导致公交车超载.所以当车票发完的时候(公交车以满载),其他人就只能等下一趟车了.如果中途有人下车,那么他的位置将会空闲出来,因此如果这时其他人想要上车

  • Java 栈与队列超详细分析讲解

    目录 一.栈(Stack) 1.什么是栈? 2.栈的常见方法 3.自己实现一个栈(底层用一个数组实现) 二.队列(Queue) 1.什么是队列? 2.队列的常见方法 3.队列的实现(单链表实现) 4.循环队列 一.栈(Stack) 1.什么是栈? 栈其实就是一种数据结构 - 先进后出(先入栈的数据后出来,最先入栈的数据会被压入栈底) 什么是java虚拟机栈? java虚拟机栈只是JVM当中的一块内存,该内存一般用来存放 例如:局部变量当调用函数时,我们会为函数开辟一块内存,叫做 栈帧,在 jav

  • Java详细分析讲解自动装箱自动拆箱与Integer缓存的使用

    目录 1. 前言 2. 包装类 3. 自动装箱与自动拆箱 4. Interger缓存 5. 回答题目 1. 前言 自动装箱和自动拆箱是什么?Integer缓存是什么?它们之间有什么关系? 先来看一道题目. Integer a = new Integer(1); Integer b = new Integer(1); System.out.println(a==b); Integer c = 1; Integer d = 1; System.out.println(c==d); Integer e

  • Java详细分析讲解泛型

    目录 1.泛型概念 2.泛型的使用 2.1泛型类语法 2.2泛型方法语法 2.3泛型接口语法 2.4泛型在main方法中的使用 3.擦除机制 4.泛型的上界 5.通配符 5.1通配符的上界 5.2通配符的下界 6.包装类 6.1装箱和拆箱 1.泛型概念 泛型就是将类型参数化 所谓类型参数化就是将类型定义成参数的形式,然后在使用此类型的时候的时候再传入具体的类型 到这我们可以看出来:泛型在定义的时候是不知道具体类型的,需要在使用的时候传入具体的类型,泛型可以用在类.接口和方法中,这样做的好处是一个

  • 分析讲解Java Random类里的种子问题

    可以说在现在的计算机语言里面,以及普通用户这里,都是没有办法获得真正的 随机数的.真正的随机数现在也只有在量子计算机当中才有获得. 所以我们现在所说的随机数,也可以称为伪随机数,伪随机数是通过一种算法并且结合当下一个有规律其不断改变的数(比如时间)获得的结果. 我们将这种通过算法集合时间作为初始获得随机数的基准的数据称为种子. Java当中创建随机数的方式是有两种的: 第以种就是直接创建Random对象 Random random = new Random(); 在底层就会调用这个方法来得到种子

  • Java超详细分析讲解哈希表

    目录 哈希表概念 哈希函数的构造 平均数取中法 折叠法 保留余数法 哈希冲突问题以及解决方法 开放地址法 再哈希函数法 公共溢出区法 链式地址法 哈希表的填充因子 代码实现 哈希函数 添加数据 删除数据 判断哈希表是否为空 遍历哈希表 获得哈希表已存键值对个数 哈希表概念 散列表,又称为哈希表(Hash table),采用散列技术将记录存储在一块连续的存储空间中. 在散列表中,我们通过某个函数f,使得存储位置 = f(关键字),这样我们可以不需要比较关键字就可获得需要的记录的存储位置. 散列技术

  • Java分析讲解序列化与字典功能的序列化

    目录 两种解决方案 字典注解定义 字典序列化与返序列化器的实现 字典序列化与反序列工具类 字典转换服务类 字典缓存服务 两种解决方案 前端查询字典数据然后前端转码 后端查询字典值,然后再转码返回给前段. 本文及时针对方案2 进行的改进 目标: 在需要返回给前段的字段上添加指定的注解例如:@DictDesc 则根据该字段定义的值结合注解配置生成 xxxDesc字段并自动赋值为注解属性值所对应的字典描述: 具体使用的技术涉及到jackson序列化与反序列化,其他JSON工具包也类型的效果; 字典注解

随机推荐