AQS同步组件Semaphore信号量案例剖析

目录
  • 基本概念
  • 作用和使用场景
  • 源码分析
    • 构造函数
    • 常用方法
  • 使用案例
    • acquire()获取单个许可
    • acquire(int permits)获取多个许可
    • tryAcquire()获取许可
    • tryAcquire(long timeout, TimeUnit unit)

基本概念

Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制并发访问资源的线程个数。

例如排队买票的情况,如果只有三个窗口,那么同一时间最多也只能有三个人买票。第四个人来了之后就必须在后面等着,只有其他人买好了,才可以去相应的窗口进行买票 。

作用和使用场景

  • 用于保证同一时间并发访问线程的数目。
  • 信号量在操作系统中是很重要的概念,Java并发库里的Semaphore就可以很轻松的完成类似操作系统信号量的控制。Semaphore可以很容易控制系统中某个资源被同时访问的线程个数。
  • 在数据结构中我们学过链表,链表正常是可以保存无限个节点的,而Semaphore可以实现有限大小的列表。

使用场景:仅能提供有限访问的资源。比如数据库连接。

源码分析

构造函数

/**
*接受一个整型的数字,表示可用的许可证数量。Semaphore(10)表*示允许10个线程获取许可证,
*也就是最大并发数是10。
*
* @param permits 可用许可证的初始数量。
**/
public Semaphore(int permits) {
        sync = new NonfairSync(permits);
}
/**
 * 使用给定的许可数量和给定的公平性设置
 *
 * @param permits 可用许可证的初始数量。
 *
 * @param fair 指定是公平模式还是非公平模式,默认非公平模式 . 公平模式:先启动的线程优先得到
 * 许可。 非公平模式:先启动的线程并不一定先获得许可,谁抢到谁就获得许可。
 */
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

常用方法

acquire() 获取一个许可

acquire(int permits) 获取指定个数的许可

tryAcquire()方法尝试获取1个许可证

tryAcquire(long timeout, TimeUnit unit) 最大等待许可的时间

tryAcquire(int permits) 获取指定个数的许可

tryAcquire(int permits, long timeout, TimeUnit unit) 最大等待许可的时间

availablePermits() : 返回此信号量中当前可用的许可证数

release() 释放许可

release(int permits) 释放指定个数的许可

int getQueueLength() 返回正在等待获取许可证的线程数。

boolean hasQueuedThreads() 是否有线程正在等待获取许可证。

void reducePermits(int reduction) 减少reduction个许可证。是个protected方法。

Collection getQueuedThreads() 返回所有等待获取许可证的线程集合。是个protected方法。

使用案例

acquire()获取单个许可

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //获取一个许可
                    semaphore.acquire();
                    test(threadNum);
                    //释放一个许可
                    semaphore.release();
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模拟请求的耗时操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

输出结果:

根据输出结果的时间可以看出来同一时间最多只能3个线程执行,符合预期

acquire(int permits)获取多个许可

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        //信号量设置为3,也就是最大并发量为3,同时只允许3个线程获得许可
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //获取多个许可
                    semaphore.acquire(3);
                    test(threadNum);
                    //释放多个许可
                    semaphore.release(3);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模拟请求的耗时操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

输出结果:

设置了3个许可,每个线程每次获取3个许可,因此同一时间只能有1个线程执行 。

tryAcquire()获取许可

tryAcquire()尝试获取一个许可,如果未获取到,不等待,将直接丢弃该线程不执行

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        //信号量设置为3,也就是最大并发量为3,同时只允许3个线程获得许可
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //尝试获取一个许可,如果未获取到,不等待,将直接丢弃该线程不执行
                    if(semaphore.tryAcquire()) {
                        test(threadNum);
                        //释放许可
                        semaphore.release();
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模拟请求的耗时操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

输出结果:

从输出可以看到,在3个线程获取到3个许可后,因为每个线程调用的方法要执行1秒中,最早的一个许可也要在1S后释放,剩下的17个线程未获取到许可,使用了semaphore.tryAcquire()方法,没有设置等待时间,所以便直接被丢弃,不执行了。

tryAcquire(long timeout, TimeUnit unit)

tryAcquire(long timeout, TimeUnit unit)未获取到许可,设置等待时长

/**
     * 线程数量
     */
    private final static int threadCount = 15;
    public static void main(String[] args) throws Exception {
        ExecutorService exec = Executors.newCachedThreadPool();
        //信号量设置为3,也就是最大并发量为3,同时只允许3个线程获得许可
        final Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    //设置了获取许可等待时间为2秒,如果两秒后还是未获得许可的线程便得不到执行
                    if(semaphore.tryAcquire(2000, TimeUnit.MILLISECONDS)) {
                        test(threadNum);
                        //释放许可
                        semaphore.release();
                    }
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }
    private static void test(int threadNum) throws Exception {
        // 模拟请求的耗时操作
        Thread.sleep(1000);
        log.info("{}", threadNum);
    }

输出结果:

tryAcquire通过参数指定了2秒的等待时间。 上述代码中同一时间最多执行3个。第4个线程因前3个线程执行需要耗时一秒未释放许可,因此需要等待。

但是由于设置了2秒的等待时间,所以在5秒内等待到了释放的许可,继续执行,循环往复。

但是15个线程 ,每秒并发3个,2S是执行不完的。所以上面执行到第6个(0开始,显示是5)就结束了,【每次执行结果会有差异,取决于CPU】,并没有全部执行完15个线程。

以上就是AQS同步组件Semaphore信号量案例剖析的详细内容,更多关于AQS同步组件Semaphore的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java信号量Semaphore原理及代码实例

    Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目.自从5.0开始,jdk在java.util.concurrent包里提供了Semaphore 的官方实现,因此大家不需要自己去实现Semaphore. 下面的类使用信号量控制对内容池的访问: import java.util.concurrent.Semaphore; class Pool { private static final int MAX_AVAILABLE = 100; private final Sema

  • Java中的AQS同步队列问题详解

    目录 AQS 同步队列 1.AQS 介绍 1.1.类图关系 1.2.节点剖析 2.AQS 实现原理 2.1.队列初始化 2.2.追加节点 3.AQS 唤醒动作 AQS 同步队列 1.AQS 介绍 AQS 是 AbstractQueuedSynchronizer 的缩写,他是一个抽象同步类,为 JUC 包下的大多数同步工具提供了核心实现,例如 ReentrantLock 的底层就是使用同步队列.AQS 提供一套基础的机制来实现线程的同步.阻塞与唤醒.等待队列等功能,也就是想要深入学习线程工具类,这

  • 一文读懂go中semaphore(信号量)源码

    运行时信号量机制 semaphore 前言 最近在看源码,发现好多地方用到了这个semaphore. 本文是在go version go1.13.15 darwin/amd64上进行的 作用是什么 下面是官方的描述 // Semaphore implementation exposed to Go. // Intended use is provide a sleep and wakeup // primitive that can be used in the contended case /

  • AQS同步组件CyclicBarrier循环屏障用例剖析

    目录 CyclicBarrier原理 源码分析 使用案例 await() await(long timeout, TimeUnit unit) CyclicBarrier(int parties, Runnable barrierAction) CyclicBarrier和CountDownLatch的区别 CyclicBarrier原理 CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier).它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,

  • 详解Java 信号量Semaphore

    Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数: 一.简单使用 同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行: package com.example.demo.study; import java.util.concurrent.ExecutorService; import ja

  • AQS(AbstractQueuedSynchronizer)抽象队列同步器及工作原理解析

    目录 前言 AQS是什么? 用银行办理业务的案例模拟AQS如何进行线程管理和通知机制 结语 前言 AQS 绝对是JUC的重要基石,也是面试中经常被问到的,所以我们要搞清楚这个AQS到底是什么?骑工作原理是什么? AQS是什么? AQS,AbstractQueuedSynchronizer,即队列同步器.它是构建锁或者其他同步组件的基础框架(如ReentrantLock.ReentrantReadWriteLock.Semaphore等),JUC并发包的作者(Doug Lea)期望它能够成为实现大

  • 详解Java信号量Semaphore的原理及使用

    目录 1.Semaphore的概述 2.Semaphore的原理 2.1 基本结构 2.2 可中断获取信号量 2.3 不可中断获取信号量 2.4 超时可中断获取信号量 2.5 尝试获取信号量 2.6 释放信号量 3.Semaphore的使用 4.Semaphore的总结 1.Semaphore的概述 public class Semaphore extends Object implements Serializable Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为

  • AQS同步组件Semaphore信号量案例剖析

    目录 基本概念 作用和使用场景 源码分析 构造函数 常用方法 使用案例 acquire()获取单个许可 acquire(int permits)获取多个许可 tryAcquire()获取许可 tryAcquire(long timeout, TimeUnit unit) 基本概念 Semaphore也是一个线程同步的辅助类,可以维护当前访问自身的线程个数,并提供了同步机制.使用Semaphore可以控制并发访问资源的线程个数. 例如排队买票的情况,如果只有三个窗口,那么同一时间最多也只能有三个人

  • Java并发编程之JUC并发核心AQS同步队列原理剖析

    目录 一.AQS介绍 二.AQS中的队列 1.同步等待队列 2.条件等待队列 3.AQS队列节点Node 三.同步队列源码分析 1.同步队列分析 2.同步队列--独占模式源码分析 3.同步队列--共享模式源码分析 一.AQS介绍 队列同步器AbstractQueuedSynchronizer(简称AQS),AQS定义了一套多线程访问共享资源的同步器框架,是用来构建锁或者其他同步组件的基础框架,是一个依赖状态(state)的同步器.Java并发编程的核心在java.util.concurrent(

  • Java Semaphore信号量使用分析讲解

    目录 前言 介绍和使用 API介绍 基本使用 原理介绍 获取许可acquire() 释放许可release() 总结 前言 大家应该都用过synchronized 关键字加锁,用来保证某个时刻只允许一个线程运行.那么如果控制某个时刻允许指定数量的线程执行,有什么好的办法呢? 答案就是JUC提供的信号量Semaphore. 介绍和使用 Semaphore(信号量)可以用来限制能同时访问共享资源的线程上限,它内部维护了一个许可的变量,也就是线程许可的数量 Semaphore的许可数量如果小于0个,就

  • Vue组件通信方法案例总结

    目录 一.父组件向子组件传值(props) 二.子组件向父组件传值($emit) 三.兄弟组件传值(EventBus) 1. 初始化(new Vue()) 2. 发送事件($emit()) 3. 接收事件($on()) 4. 移除事件监听者 四.Vuex 一.父组件向子组件传值(props) 步骤: 在父组件中通过 v-bind 将数据传给子组件 在子组件中通过 props 接收父组件传递过来的数据 <div id="app"> <!-- 1.通过 v-bind 将数

  • Java中Semaphore(信号量)的使用方法

    Semaphore的作用: 在java中,使用了synchronized关键字和Lock锁实现了资源的并发访问控制,在同一时间只允许唯一了线程进入临界区访问资源(读锁除外),这样子控制的主要目的是为了解决多个线程并发同一资源造成的数据不一致的问题.在另外一种场景下,一个资源有多个副本可供同时使用,比如打印机房有多个打印机.厕所有多个坑可供同时使用,这种情况下,Java提供了另外的并发访问控制--资源的多副本的并发访问控制,今天学习的信号量Semaphore即是其中的一种. Semaphore实现

  • VUE 动态组件的应用案例分析

    本文实例讲述了VUE 动态组件的应用.分享给大家供大家参考,具体如下: 业务场景 我们在开发表单的过程中会遇到这样的问题,我们选择一个控件进行配置,控件有很多中类型,比如文本框,下来框等,这些配置都不同,因此需要不同的配置组件来实现. 较常规的方法是使用v-if 来实现,这样界面看上去比较复杂,而且需要进行修改主页面. 解决方案 可以使用动态组件来实现,为了体现动态组件的特性,我们简化实现方式,编写两个简单的组件来测试一下这个功能. 文本组件配置: <template> <div>

  • vue中拆分组件的实战案例

    目录 一.组件化诞生的历史 二.为什么业务组件越开发越难维护 人的问题 技术问题 2.1 项目现状 2.2 理想目标 三.举一个实际的例子 3.1 需求背景 3.2 开发之前: 前端设计文档 数据流向图 目录结构 逻辑控制 拆分的原则 3.3 受控组件和非受控组件 3.4 开发进行: 逻辑变量和UI变量 四.持续的优化 五.可能的问题 五.实践是学习前端的捷径 总结 组件化是一种思维的表现,这种技能映射到人的本质是,一个人是否有能力把一个复杂的问题拆解.简单化的能力. 一.组件化诞生的历史 我们

随机推荐