Java AQS信号量Semaphore的使用

目录
  • 一.什么是Semaphore
  • 二.Semaphore的使用
  • 三.Semaphore源码分析

一.什么是Semaphore

Semaphore,俗称信号量,它是操作系统中PV操作的原语在java的实现,它也是基于AbstractQueuedSynchronizer实现的。

Semaphore的功能非常强大,大小为1的信号量就类似于互斥锁,通过同时只能有一个线程获取信号量实现。大小为n(n>0)的信号量可以实现限流的功能,它可以实现只能有n个线程同时获取信号量。

PV操作是操作系统一种实现进程互斥与同步的有效方法。PV操作与信号量(S)的处理相关,P表示通过的意思,V表示释放的意思。用PV操作来管理共享资源时,首先要确保PV操作自身执行的正确性。
P操作的主要动作是:
①S减1;
②若S减1后仍大于或等于0,则进程继续执行;
③若S减1后小于0,则该进程被阻塞后放入等待该信号量的等待队列中,然后转进程调度。
V操作的主要动作是:
①S加1;
②若相加后结果大于0,则进程继续执行;
③若相加后结果小于或等于0,则从该信号的等待队列中释放一个等待进程,然后再返回原进程继续执行或转进程调度。

二.Semaphore的使用

构造器

  public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
  public Semaphore(int permits, boolean fair) {
       sync = fair ? new FairSync(permits) : new NonfairSync(permits);
   }

permits 表示许可证的数量(资源数)

fair 表示公平性,如果这个设为 true 的话,下次执行的线程会是等待最久的线程

常用方法

public void acquire() throws InterruptedException
public boolean tryAcquire()
public void release()
public int availablePermits()
public final int getQueueLength()
public final boolean hasQueuedThreads()
protected void reducePermits(int reduction)

  • acquire() 表示阻塞并获取许可
  • tryAcquire() 方法在没有许可的情况下会立即返回 false,要获取许可的线程不会阻塞
  • release() 表示释放许可
  • int availablePermits():返回此信号量中当前可用的许可证数。
  • int getQueueLength():返回正在等待获取许可证的线程数。
  • boolean hasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermit(int reduction):减少 reduction 个许可证
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合

应用场景

可以用于做流量控制,特别是公用资源有限的应用场景

限流

/**
     * 实现一个同时只能处理5个请求的限流器
     */
    private static Semaphore semaphore = new Semaphore(5);
    /**
     * 定义一个线程池
     */
    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 50, 60, TimeUnit.SECONDS, new LinkedBlockingDeque<>(200));
    public static void exec() {
        try {
            semaphore.acquire(1);
            // 模拟真实方法执行
            System.out.println("执行exec方法" );
            Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            semaphore.release(1);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        {
            for (; ; ) {
                Thread.sleep(100);
                // 模拟请求以10个/s的速度
                executor.execute(() -> exec());
            }
        }
    }

三.Semaphore源码分析

主要关注 Semaphore的加锁解锁(共享锁)逻辑实现,线程竞争锁失败入队阻塞逻辑和获取锁的线程释放锁唤醒阻塞线程竞争锁的逻辑实现

	public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
            //尝试获取共享锁,大于等于0则直接获取锁成功,小于0时则共享锁阻塞
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared的实现

   final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                // 当减一之后的值小于0 或者
                // compareAndSetState成功,把state变为remaining,即将状态减一
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

入队阻塞

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //入队,创建节点 使用共享模式
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
            //获取当前节点的前躯节点
                final Node p = node.predecessor();
                //如果节点为head节点
                if (p == head) {
                	//阻塞动作比较重,通常会再尝试获取资源,没有获取到返回负数
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //判断是否可以阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

入队操作

private Node addWaiter(Node mode) {
		//构建节点,模式是共享模式
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
        	//设置前一节点为tail
            node.prev = pred;
            //设置当前节点为尾节点
            if (compareAndSetTail(pred, node)) {
            // 前一节点的next为当前节点
                pred.next = node;
                return node;
            }
        }
        //创建队列
        enq(node);
        return node;
    }

创建队列,经典的for循环创建双向链表

 private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) {
            	//节点为空 则new一个节点 设置头节点
                if (compareAndSetHead(new Node()))
                //把这个节点的头节点赋值给尾节点
                    tail = head;
            } else {
            // 如果尾节点存在 就将该节点的前节点指向tail
                node.prev = t;
                //设置当前节点为tail
                if (compareAndSetTail(t, node)) {
                //前一个节点的next指向当前节点,入队操作就完成了
                    t.next = node;
                    return t;
                }
            }
        }
    }

设置waitStatus状态及获取waitStatus状态,waitStatus为-1时可以唤醒后续节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 状态是-1 就可以唤醒后续节点
             *
             */
            return true;
        if (ws > 0) {
            /*
             * 前置任务已取消,删掉节点
             *
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * cas设置waitstatus状态,设置为-1
             *
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

阻塞 调用LockSupport.park

 private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

释放锁的逻辑

	public void release() {
        sync.releaseShared(1);
    }
	public final boolean releaseShared(int arg) {
		//cas成功则进行释放共享锁
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

state状态+1操作,cas成功,返回true

		protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
private void doReleaseShared() {
        for (;;) {
            Node h = head;
            //头节点不为空并且不是尾节点
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                //waitstatus为-1
                if (ws == Node.SIGNAL) {
                //将SIGNAL置为0
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;
                    //唤醒
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;
            }
            if (h == head)
                break;
        }
    }

唤醒操作

 private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        //ws小于0就将其设置为0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
        //当前节点的下一个节点为空或者ws大于0
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //s不为空 则进行唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }

唤醒下一个线程之后,要把上一个节点移除队列

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                //下一个线程进来,如果前置节点是头节点,则将前置节点出队
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //cas获取资源成功
                    if (r >= 0) {
                    	//出队操作
                        setHeadAndPropagate(node, r);
                        //将p.next移除
                        p.next = null;
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

出队操作

private void setHeadAndPropagate(Node node, int propagate) {
		//设置当前节点为head节点,前一节点的head属性被删除
        Node h = head;
        setHead(node);
        //如果是传播属性
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
           //并且是共享模式,可以持续唤醒下一个,
           //只要资源数充足 就可以一直往下唤醒,提高并发量
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
 private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

至此,线程的阻塞唤醒核心逻辑就这么多,共享锁与独占锁的区别是可以唤醒后续的线程,如果资源数充足的话,可以一直往下唤醒,提高了并发量。

到此这篇关于Java AQS信号量Semaphore的使用的文章就介绍到这了,更多相关Java信号量Semaphore内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java AQS中ReentrantReadWriteLock读写锁的使用

    目录 一. 简介 二. 接口及实现类 三.使用 四. 应用场景 五. 锁降级 六.源码解析 七.总结 一. 简介 为什么会使用读写锁? 日常大多数见到的对共享资源有读和写的操作,写操作并没有读操作那么频繁(读多写少),在没有写操作的时候,多个线程同时读一个资源没有任何问题,所以应该允许多个线程同时读取共享资源(读读可以并发):但是如果一个线程想去写这些共享资源,就不应该允许其他线程对该资源进行读和写操作了(读写,写读,写写互斥).在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量.

  • Java AQS中闭锁CountDownLatch的使用

    目录 一. 简介 二. 使用 三. 应用场景 四. 底层原理 五. CountDownLatch与Thread.join的区别 一. 简介 CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集. CountDownLatch使用给定的计数值(count)初始化.await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回.这是一个一次

  • Java AQS中ReentrantLock条件锁的使用

    目录 一.什么是AQS 1.定义 2.特性 3.属性 4.资源共享方式 5.两种队列 6.队列节点状态 7.实现方法 二.等待队列 1.同步等待队列 2.条件等待队列 三.condition接口 四.ReentrantLock 五.源码解析 一.什么是AQS 1.定义 java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列.条件队列.独占获取.共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的

  • AQS加锁机制Synchronized相似点详解

    目录 正文 1. Synchronized加锁流程 2. AQS加锁原理 3. 总结 正文 在并发多线程的情况下,为了保证数据安全性,一般我们会对数据进行加锁,通常使用Synchronized或者ReentrantLock同步锁.Synchronized是基于JVM实现,而ReentrantLock是基于Java代码层面实现的,底层是继承的AQS. AQS全称 AbstractQueuedSynchronizer ,即抽象队列同步器,是一种用来构建锁和同步器的框架. 我们常见的并发锁Reentr

  • AQS核心流程解析cancelAcquire方法

    目录 引出问题 更新正常节点的链表 当前取消节点是tail节点的情况 当前取消节点是非tail节点的情况 引出问题 首先,先考虑一个问题,什么条件会触发cancelAcquire()方法? cancelAcquire()方法的反向查找 可以清楚的看到在互斥锁和共享锁的拿锁过程中都是有调用此方法的,而cancelAcquire()方法是写在finally代码块中,并且使用failed标志位来控制cancelAcquire()方法的执行.可以得出,在触发异常的情况下会执行cancelAcquire(

  • 详解Java 信号量Semaphore

    Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数: 一.简单使用 同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行: package com.example.demo.study; import java.util.concurrent.ExecutorService; import ja

  • 分析Java并发编程之信号量Semaphore

    目录 一.认识Semaphore 1.1.Semaphore 的使用场景 1.2.Semaphore 使用 1.3.Semaphore 信号量的模型 二.Semaphore 深入理解 2.1.Semaphore 基本属性 2.2.Semaphore 的公平性和非公平性 2.3.其他 Semaphore 方法 一.认识Semaphore 1.1.Semaphore 的使用场景 Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的

  • 详解Java信号量Semaphore的原理及使用

    目录 1.Semaphore的概述 2.Semaphore的原理 2.1 基本结构 2.2 可中断获取信号量 2.3 不可中断获取信号量 2.4 超时可中断获取信号量 2.5 尝试获取信号量 2.6 释放信号量 3.Semaphore的使用 4.Semaphore的总结 1.Semaphore的概述 public class Semaphore extends Object implements Serializable Semaphore来自于JDK1.5的JUC包,直译过来就是信号量,被作为

  • Java并发编程Semaphore计数信号量详解

    Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

  • JAVA 多线程之信号量(Semaphore)实例详解

    java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. 一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.拿到信号量的线程可以进入

  • Java信号量Semaphore原理及代码实例

    Semaphore 通常用于限制可以访问某些资源(物理或逻辑的)的线程数目.自从5.0开始,jdk在java.util.concurrent包里提供了Semaphore 的官方实现,因此大家不需要自己去实现Semaphore. 下面的类使用信号量控制对内容池的访问: import java.util.concurrent.Semaphore; class Pool { private static final int MAX_AVAILABLE = 100; private final Sema

  • Java 信号量Semaphore的实现

    近日于LeetCode看题遇1114 按序打印,获悉一解法使用了Semaphore,顺势研究,记心得于此. 此解视Semaphore为锁,以保证同一时刻单线程的顺序执行.在此原题上,我作出如下更改. package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class

  • Java中的Semaphore如何使用

    目录 简介 简述实现原理 方法介绍 案例分析 适用场景 简介 semaphore中文意思既是信号量,它的主要功能就是用来控制某个资源同时被访问的线程数. 为了控制某块资源的并发访问量时,可以使用Semaphore对象中的acquire()方法获取访问令牌,如果Semaphore对象访问令牌已发完,那么当前获取令牌的线程将会进入阻塞,带其他线程进行release()释放令牌时,当前线程才有机会获得令牌从而拥有访问权限. 简述实现原理 Semaphore实际上是一种共享锁,因为它允许多个线程并发获取

  • java线程并发semaphore类示例

    复制代码 代码如下: package com.yao; import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore; /** * Java 5.0里新加了4个协调线程间进程的同步装置,它们分别是: * Semaphore, CountDownLatch, CyclicBarrier和Exchanger. * 本例主要介

随机推荐