java JUC信号量Semaphore原理及使用介绍

目录
  • 前言
  • 介绍和使用
    • API介绍
    • 基本使用
  • 原理介绍
    • 获取许可acquire()
    • 释放许可release()
  • 总结

前言

大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行。

那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore

介绍和使用

  • Semaphore(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量
  • Semaphore的许可数量如果小于0个,就会阻塞获取,直到有线程释放许可
  • Semaphore是一个非重入锁

API介绍

  • 构造方法
  • public Semaphore(int permits)permits 表示许可线程的数量
  • public Semaphore(int permits, boolean fair)fair 表示公平性,如果设为 true,表示是公平,那么等待最久的线程先执行
  • 常用API
  • public void acquire():表示一个线程获取1个许可,那么线程许可数量相应减少一个
  • public void release():表示释放1个许可,那么线程许可数量相应会增加
  • 其他API
  • void acquire(int permits):表示一个线程获取n个许可,这个数量由参数permits决定
  • void release(int permits):表示一个线程释放n个许可,这个数量由参数permits决定
  • int availablePermits():返回当前信号量线程许可数量
  • int getQueueLength(): 返回等待获取许可的线程数的预估值

基本使用

public static void main(String[] args) {
        // 1. 创建 semaphore 对象
        Semaphore semaphore = new Semaphore(2);
        // 2. 10个线程同时运行
        for (int i = 0; i < 8; i++) {
            new Thread(() -> {
                // 3. 获取许可
                try {
                    semaphore.acquire();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    log.debug("running...");
                    sleep(1);
                    log.debug("end...");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    // 4. 释放许可
                    semaphore.release();
                }
            }).start();
        }
    }

运行结果:

原理介绍

上面是Semaphore的类结构图,其中FairSyncNonfairSync是它的内部类,他们共同继承了AQS类,AQS的共享模式提供了Semaphore的加锁、解锁。

如果对AQS不了解的请移步深入浅出理解Java并发AQS的共享锁模式

为了更好的搞懂原理,我们通过一个例子来帮助我们理解。

假设Semaphorepermits为 3,这时 5 个线程来获取资源,其中Thread-1Thread-2Thread-4CAS 竞争成功,permits 变为 0,而 Thread-0 Thread-3 竞争失败。

获取许可acquire()

  • acquire()主方法会调用 sync.acquireSharedInterruptibly(1)方法
  • acquireSharedInterruptibly()方法会先调用tryAcquireShared()方法返回许可的数量,如果小于0个,调用doAcquireSharedInterruptibly()方法进入阻塞
// acquire() -> sync.acquireSharedInterruptibly(1),可中断
public final void acquireSharedInterruptibly(int arg) {
    if (Thread.interrupted())
        throw new InterruptedException();
    // 尝试获取通行证,获取成功返回 >= 0的值
    if (tryAcquireShared(arg) < 0)
        // 获取许可证失败,进入阻塞
        doAcquireSharedInterruptibly(arg);
}
  • tryAcquireShared()方法在终会调用到Sync#nonfairTryAcquireShared()方法
  • nonfairTryAcquireShared()方法中会减去获取的许可数量,返回剩余的许可数量
// tryAcquireShared() -> nonfairTryAcquireShared()
// 非公平,公平锁会在循环内 hasQueuedPredecessors()方法判断阻塞队列是否有临头节点(第二个节点)
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        // 获取 state ,state 这里【表示通行证】
        int available = getState();
        // 计算当前线程获取通行证完成之后,通行证还剩余数量
        int remaining = available - acquires;
        // 如果许可已经用完, 返回负数, 表示获取失败,
        if (remaining < 0 ||
            // 许可证足够分配的,如果 cas 重试成功, 返回正数, 表示获取成功
            compareAndSetState(available, remaining))
            return remaining;
    }
}
  • 如果剩余的许可数量<0, 会调用doAcquireSharedInterruptibly()方法将当前线程加入到阻塞队列中阻塞
  • 方法中调用parkAndCheckInterrupt()阻塞当前线程
private void doAcquireSharedInterruptibly(int arg) {
    // 将调用 Semaphore.aquire 方法的线程,包装成 node 加入到 AQS 的阻塞队列中
    final Node node = addWaiter(Node.SHARED);
    // 获取标记
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 前驱节点是头节点可以再次获取许可
            if (p == head) {
                // 再次尝试获取许可,【返回剩余的许可证数量】
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 成功后本线程出队(AQS), 所在 Node设置为 head
                    // r 表示【可用资源数】, 为 0 则不会继续传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            // 不成功, 设置上一个节点 waitStatus = Node.SIGNAL, 下轮进入 park 阻塞
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        // 被打断后进入该逻辑
        if (failed)
            cancelAcquire(node);
    }
}

最终的AQS状态如下图所示:

  • Thread-1Thread-2Thread-4正常运行
  • AQS的state也就是等于0
  • Thread-0Thread-3再阻塞队列中

释放许可release()

现在Thread-4运行完毕,要释放许可,Thread-0Thread-3又是如何恢复执行的呢?

  • 调用release()方法释放许可,最终调用 Sync#releaseShared()方法
  • 如果方法tryReleaseShared(arg)尝试释放许可成功,那么调用doReleaseShared();进行唤醒
// release() -> releaseShared()
public final boolean releaseShared(int arg) {
    // 尝试释放锁
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
  • tryReleaseShared()方法主要是尝试释放许可
  • 获取当前许可数量 + 释放的数量,然后通过cas设置回去
protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        // 获取当前锁资源的可用许可证数量
        int current = getState();
        int next = current + releases;
        // 索引越界判断
        if (next < current)
            throw new Error("Maximum permit count exceeded");
        // 释放锁
        if (compareAndSetState(current, next))
            return true;
    }
}
  • 调用doReleaseShared()方法唤醒队列中的线程
  • 其中unparkSuccessor()方法是唤醒的核心操作
// 唤醒
private void doReleaseShared() {
    // 如果 head.waitStatus == Node.SIGNAL ==> 0 成功, 下一个节点 unpark
    // 如果 head.waitStatus == 0 ==> Node.PROPAGATE
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 防止 unparkSuccessor 被多次执行
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                // 唤醒后继节点
                unparkSuccessor(h);
            }
            // 如果已经是 0 了,改为 -3,用来解决传播性
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)
            break;
    }
}

最终AQS状态如下图所示:

  • 许可state变回1
  • 然后Thread-0开始竞争,如果竞争成功,如下图所示:

  • 由于Thread-0竞争成功,再次获取到许可,许可数量减1,最终又变回0
  • 然后等待队列中剩余Thread-3

总结

Semaphore信号量类基于AQS的共享锁实现,有公平锁和非公平锁两个版本,它用来限制能同时访问共享资源的线程上限,典型的应用场景是可以用来保护有限的公共资源,比如数据库连接等。

以上就是java JUC信号量Semaphore原理及使用介绍的详细内容,更多关于java JUC信号量Semaphore的资料请关注我们其它相关文章!

(0)

相关推荐

  • java并发编程包JUC线程同步CyclicBarrier语法示例

    目录 1.创建CyclicBarrier障碍 2.在CyclicBarrier障碍处等待 3.CyclicBarrierAction 4.CyclicBarrier例子 在之前的文章中已经为大家介绍了java并发编程的工具:BlockingQueue接口.ArrayBlockingQueue.DelayQueue.LinkedBlockingQueue.PriorityBlockingQueue.SynchronousQueue.BlockingDeque接口.ConcurrentHashMap

  • java并发包JUC同步器框架AQS框架原文翻译

    目录 摘要 1.背景介绍 2需求 2.1功能 2.2性能目标 3设计与实现 3.1同步状态 3.2阻塞 3.3队列 3.4条件队列 4用法 4.1公平调度的控制 4.2同步器 5性能 5.1开销 5.2吞吐量 6总结 7致谢 参考文献 摘要 在J2SE 1.5的java.util.concurrent包(下称j.u.c包)中,大部分的同步器(例如锁,屏障等等)都是基于AbstractQueuedSynchronizer类(下称AQS类),这个简单的框架而构建的.这个框架为同步状态的原子性管理.线

  • java并发JUC工具包AtomicInteger原子整型语法基础

    目录 1.AtomicInteger基础用法 2.什么时候需要使用AtomicInteger 2.1.原子计数器场景 2.2.数值比对及交换操作 3.总结 AtomicInteger 类底层存储一个int值,并提供方法对该int值进行原子操作.AtomicInteger 作为java.util.concurrent.atomic包的一部分,从Java 1.5开始引入. 1. AtomicInteger基础用法 通过下文的AtomicInteger构造方法,可以创建一个AtomicInteger对

  • java并发包JUC诞生及详细内容

    目录 前言 关于JCP和JSR DougLea和他的JSR-166 Lock接口的原型 CountDownLatch的原型 AbstractQueuedSynchronizer抽象类的原型 JSR-166的详细内容 1.请描述拟议的规范: 2.什么是目标Java平台? 3.拟议规范将解决Java社区的哪些需求? 4.为什么现有规范不满足这种需求? 5.请简要介绍基础技术或技术: 6.API规范是否有建议的包名? 7.建议的规范是否与您知道的特定操作系统,CPU或I/O设备有任何依赖关系? 8.当

  • java并发数据包Exchanger线程间的数据交换器

    java.util.concurrent.Exchanger可以用来进行数据交换,或者被称为“数据交换器”.两个线程可以使用Exchanger交换数据,下图用来说明Exchanger的作用 在下面的代码中 首先我们定义了一个Exchanger,用于数据交换 然后定义了两个线程对象bookExchanger1和bookExchanger2,两个线程都持有Exchanger交换器对象用于数据交换 两个线程中的每个线程都有自己的数据,比如下面代码中的String[] 书籍数组. public stat

  • Java开发JUC交换器Exchanger使用详解

    目录 前言 Exchanger介绍 API介绍 Exchanger使用 实现机制 总结 前言 JDK中提供了不少的同步工具,现在分享一个相对比较冷门的同步工具——交换器(Exchanger).你知道Exchanger的作用是什么吗?实现机制是什么?可以用来做什么呢? Exchanger介绍 交换器(Exchanger),顾名思义,用于两个线程之间进行数据交换的. 简单来说,就是一个线程在完成一定的事务后想与另一个线程交换数据,则第一个先拿出数据的线程会一直等待第二个线程,直到第二个线程拿着数据到

  • Java信号量Semaphore原理及代码实例

    Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目.自从5.0开始,jdk在java.util.concurrent包里提供了Semaphore 的官方实现,因此大家不需要自己去实现Semaphore. 下面的类使用信号量控制对内容池的访问: import java.util.concurrent.Semaphore; class Pool { private static final int MAX_AVAILABLE = 100; private final Sema

  • 详解Java信号量Semaphore的原理及使用

    目录 1.Semaphore的概述 2.Semaphore的原理 2.1 基本结构 2.2 可中断获取信号量 2.3 不可中断获取信号量 2.4 超时可中断获取信号量 2.5 尝试获取信号量 2.6 释放信号量 3.Semaphore的使用 4.Semaphore的总结 1.Semaphore的概述 public class Semaphore extends Object implements Serializable Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为

  • java并发编程专题(六)----浅析(JUC)Semaphore

    半路开始看的朋友可以回顾一下前几篇 java并发编程专题(一)----线程基础知识 java并发编程专题(二)----如何创建并运行java线程 java并发编程专题(三)----详解线程的同步 java并发编程专题(四)----浅谈(JUC)Lock锁 java并发编程专题(五)----详解(JUC)ReentrantLock Semaphore,从字面意义上我们知道他是信号量的意思.在java中,一个计数信号量维护了一个许可集.Semaphore 只对可用许可的号码进行计数,并采取相应的行动

  • 详解Java 信号量Semaphore

    Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数: 一.简单使用 同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行: package com.example.demo.study; import java.util.concurrent.ExecutorService; import ja

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • JAVA 多线程之信号量(Semaphore)实例详解

    java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. 一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.拿到信号量的线程可以进入

  • Java多线程Condition接口原理介绍

    Condition接口提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的 Condition接口详解 Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁.Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的. Lock lock = new ReentrantL

  • 分析Java并发编程之信号量Semaphore

    目录 一.认识Semaphore 1.1.Semaphore 的使用场景 1.2.Semaphore 使用 1.3.Semaphore 信号量的模型 二.Semaphore 深入理解 2.1.Semaphore 基本属性 2.2.Semaphore 的公平性和非公平性 2.3.其他 Semaphore 方法 一.认识Semaphore 1.1.Semaphore 的使用场景 Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的

  • Java 信号量Semaphore的实现

    近日于LeetCode看题遇1114 按序打印,获悉一解法使用了Semaphore,顺势研究,记心得于此. 此解视Semaphore为锁,以保证同一时刻单线程的顺序执行.在此原题上,我作出如下更改. package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class

  • Java中synchronized实现原理详解

    记得刚刚开始学习Java的时候,一遇到多线程情况就是synchronized,相对于当时的我们来说synchronized是这么的神奇而又强大,那个时候我们赋予它一个名字"同步",也成为了我们解决多线程情况的百试不爽的良药.但是,随着我们学习的进行我们知道synchronized是一个重量级锁,相对于Lock,它会显得那么笨重,以至于我们认为它不是那么的高效而慢慢摒弃它. 诚然,随着Javs SE 1.6对synchronized进行的各种优化后,synchronized并不会显得那么

随机推荐