Java多线程之Semaphore实现信号灯

目录
  • 1 Semaphore的主要方法
  • 2 实例讲解
    • 实现单例模式
  • 3 源码解析
    • 构造方法
    • 获取许可
    • 释放许可
    • 减小许可数量
    • 获取剩余许可数量

前言:

Semaphore是计数信号量。Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。
Semaphore可以维护当前访问自身的线程个数,并提供了同步机制。使用Semaphore可以控制同时访问资源的线程个数,例如,实现一个文件允许的并发访问数。

1 Semaphore的主要方法

Semaphore(int permits):构造方法,创建具有给定许可数的计数信号量并设置为非公平信号量。

Semaphore(int permits,boolean fair):构造方法,当fair等于true时,创建具有给定许可数的计数信号量并设置为公平信号量。

void acquire():当前线程尝试去阻塞的获取1个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了1个可用的许可证,则会停止等待,继续执行。
  • 当前线程被中断,则会抛出InterruptedException异常,并停止等待,继续执行。

void acquire(int n):从此信号量获取给定数目许可,在提供这些许可前一直将线程阻塞。
当前线程尝试去阻塞的获取多个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了n个可用的许可证,则会停止等待,继续执行。
  • 当前线程被中断,则会抛出InterruptedException异常,并停止等待,继续执行。

void release():释放一个许可,将其返回给信号量。

void release(int n):释放n个许可。

int availablePermits():当前可用的许可数。
void acquierUninterruptibly():当前线程尝试去阻塞的获取1个许可证(不可中断的)。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了1个可用的许可证,则会停止等待,继续执行。

void acquireUninterruptibly(permits):当前线程尝试去阻塞的获取多个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了n个可用的许可证,则会停止等待,继续执行。

boolean tryAcquire()当前线程尝试去获取1个许可证。

  • 此过程是非阻塞的,它只是在方法调用时进行一次尝试。
  • 如果当前线程获取了1个可用的许可证,则会停止等待,继续执行,并返回true
  • 如果当前线程没有获得这个许可证,也会停止等待,继续执行,并返回false

boolean tryAcquire(permits):当前线程尝试去获取多个许可证。

  • 此过程是非阻塞的,它只是在方法调用时进行一次尝试。
  • 如果当前线程获取了permits个可用的许可证,则会停止等待,继续执行,并返回true
  • 如果当前线程没有获得permits个许可证,也会停止等待,继续执行,并返回false

boolean tryAcquire(timeout,TimeUnit):当前线程在限定时间内,阻塞的尝试去获取1个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了可用的许可证,则会停止等待,继续执行,并返回true
  • 当前线程等待时间timeout超时,则会停止等待,继续执行,并返回false
  • 当前线程在timeout时间内被中断,则会抛出InterruptedException一次,并停止等待,继续执行。

boolean tryAcquire(permits,timeout,TimeUnit):当前线程在限定时间内,阻塞的尝试去获取permits个许可证。

此过程是阻塞的,它会一直等待许可证,直到发生以下任意一件事:

  • 当前线程获取了可用的permits个许可证,则会停止等待,继续执行,并返回true
  • 当前线程等待时间timeout超时,则会停止等待,继续执行,并返回false
  • 当前线程在timeout时间内被中断,则会抛出InterruptedException一次,并停止等待,继续执行。

2 实例讲解

public class SemaphoreTest {

    private static final Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {
        Executor executor = Executors.newCachedThreadPool();

        String[] name = {"Jack", "Pony", "Larry", "Martin", "James", "ZhangSan","Tree"};

        int[] age = {21,22,23,24,25,26,27};

        for(int i=0;i<7;i++)
        {
            Thread t1=new InformationThread(name[i],age[i]);
            executor.execute(t1);
        }
    }

    private static class InformationThread extends Thread {
        private final String name;
        private final int age;

        public InformationThread(String name, int age) {
            this.name = name;
            this.age = age;
        }

        @Override
        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()
                        + ":大家好,我是" + name + "我今年" + age +
                        "当前时间段为:" + System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println(name + "要准备释放许可证了,当前时间为:" + System.currentTimeMillis());
                System.out.println("当前可使用的许可数为:" + semaphore.availablePermits());
                System.out.println("是否有正在等待许可证的线程:" + semaphore.hasQueuedThreads());
                System.out.println("正在等待许可证的队列长度(线程数量):" + semaphore.getQueueLength());
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

pool-1-thread-1:大家好,我是Jack我今年21当前时间段为:1543498535306
pool-1-thread-3:大家好,我是Larry我今年23当前时间段为:1543498535306
pool-1-thread-2:大家好,我是Pony我今年22当前时间段为:1543498535306
Pony要准备释放许可证了,当前时间为:1543498536310
Jack要准备释放许可证了,当前时间为:1543498536310
当前可使用的许可数为:0
Larry要准备释放许可证了,当前时间为:1543498536310
是否有正在等待许可证的线程:true
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):4
正在等待许可证的队列长度(线程数量):4
当前可使用的许可数为:0
pool-1-thread-4:大家好,我是Martin我今年24当前时间段为:1543498536311
是否有正在等待许可证的线程:true
pool-1-thread-5:大家好,我是James我今年25当前时间段为:1543498536311
正在等待许可证的队列长度(线程数量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26当前时间段为:1543498536312
James要准备释放许可证了,当前时间为:1543498537315
Martin要准备释放许可证了,当前时间为:1543498537315
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):1
当前可使用的许可数为:0
是否有正在等待许可证的线程:false
pool-1-thread-7:大家好,我是Tree我今年27当前时间段为:1543498537316
正在等待许可证的队列长度(线程数量):0
ZhangSan要准备释放许可证了,当前时间为:1543498537317
当前可使用的许可数为:1
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
Tree要准备释放许可证了,当前时间为:1543498538319
当前可使用的许可数为:2
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0

以上是非公平信号量,将建立Semaphore对象的语句改为如下语句:

private static final Semaphore semaphore=new Semaphore(3,true);

pool-1-thread-1:大家好,我是Jack我今年21当前时间段为:1543498810563
pool-1-thread-3:大家好,我是Larry我今年23当前时间段为:1543498810564
pool-1-thread-2:大家好,我是Pony我今年22当前时间段为:1543498810563
Jack要准备释放许可证了,当前时间为:1543498811564
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):4
pool-1-thread-4:大家好,我是Martin我今年24当前时间段为:1543498811564
Larry要准备释放许可证了,当前时间为:1543498811568
当前可使用的许可数为:0
Pony要准备释放许可证了,当前时间为:1543498811568
是否有正在等待许可证的线程:true
当前可使用的许可数为:0
正在等待许可证的队列长度(线程数量):3
是否有正在等待许可证的线程:true
pool-1-thread-5:大家好,我是James我今年25当前时间段为:1543498811568
正在等待许可证的队列长度(线程数量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26当前时间段为:1543498811568
Martin要准备释放许可证了,当前时间为:1543498812566
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):1
pool-1-thread-7:大家好,我是Tree我今年27当前时间段为:1543498812566
James要准备释放许可证了,当前时间为:1543498812572
当前可使用的许可数为:0
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
ZhangSan要准备释放许可证了,当前时间为:1543498812572
当前可使用的许可数为:1
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0
Tree要准备释放许可证了,当前时间为:1543498813568
当前可使用的许可数为:2
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0

实现单例模式

将创建信号量对象语句修改如下:

private static final Semaphore semaphore=new Semaphore(1);

运行程序,结果如下:

pool-1-thread-1:大家好,我是Jack我今年21当前时间段为:1543499053898
Jack要准备释放许可证了,当前时间为:1543499054903
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):6
pool-1-thread-2:大家好,我是Pony我今年22当前时间段为:1543499054904
Pony要准备释放许可证了,当前时间为:1543499055907
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):5
pool-1-thread-3:大家好,我是Larry我今年23当前时间段为:1543499055907
Larry要准备释放许可证了,当前时间为:1543499056909
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):4
pool-1-thread-4:大家好,我是Martin我今年24当前时间段为:1543499056909
Martin要准备释放许可证了,当前时间为:1543499057913
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):3
pool-1-thread-5:大家好,我是James我今年25当前时间段为:1543499057913
James要准备释放许可证了,当前时间为:1543499058914
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):2
pool-1-thread-6:大家好,我是ZhangSan我今年26当前时间段为:1543499058915
ZhangSan要准备释放许可证了,当前时间为:1543499059919
当前可使用的许可数为:0
是否有正在等待许可证的线程:true
正在等待许可证的队列长度(线程数量):1
pool-1-thread-7:大家好,我是Tree我今年27当前时间段为:1543499059919
Tree要准备释放许可证了,当前时间为:1543499060923
当前可使用的许可数为:0
是否有正在等待许可证的线程:false
正在等待许可证的队列长度(线程数量):0

如上可知,如果将给定许可数设置为1,就如同一个单例模式,即单个停车位,只有一辆车进,然后这辆车出来后,下一辆车才能进。

3 源码解析

Semaphore有两种模式,公平模式和非公平模式。公平模式就是调用acquire的顺序就是获取许可证的顺序,遵循FIFO;而非公平模式是抢占式的,也就是有可能一个新的获取线程恰好在一个许可证释放时得到了这个许可证,而前面还有等待的线程。

构造方法

Semaphore有两个构造方法,如下:

 public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

获取许可

先从获取一个许可看起,并且先看非公平模式下的实现。首先看acquire方法,acquire方法有几个重载,但主要是下面这个方法

  public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

从上面可以看到,调用了SyncacquireSharedInterruptibly方法,该方法在父类AQS中,如下:

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted()) //如果线程被中断了,抛出异常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //获取许可失败,将线程加入到等待队列中
            doAcquireSharedInterruptibly(arg);
    }

AQS子类如果要使用共享模式的话,需要实现tryAcquireShared方法,下面看NonfairSync的该方法实现:

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }

该方法调用了父类中的nonfairTyAcquireShared方法,如下:

final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                //获取剩余许可数量
                int available = getState();
                //计算给完这次许可数量后的个数
                int remaining = available - acquires;
                //如果许可不够或者可以将许可数量重置的话,返回
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

从上面可以看到,只有在许可不够时返回值才会小于0,其余返回的都是剩余许可数量,这也就解释了,一旦许可不够,后面的线程将会阻塞。看完了非公平的获取,再看下公平的获取,

代码如下:

protected int tryAcquireShared(int acquires) {
            for (;;) {
                //如果前面有线程再等待,直接返回-1
                if (hasQueuedPredecessors())
                    return -1;
                //后面与非公平一样
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

从上面可以看到,FairSyncNonFairSync的区别就在于会首先判断当前队列中有没有线程在等待,如果有,就老老实实进入到等待队列;而不像NonfairSync一样首先试一把,说不定就恰好获得了一个许可,这样就可以插队了。
看完了获取许可后,再看一下释放许可。

释放许可

释放许可也有几个重载方法,但都会调用下面这个带参数的方法,

public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

releaseShared方法在AQS中,如下:

public final boolean releaseShared(int arg) {
        //如果改变许可数量成功
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

AQS子类实现共享模式的类需要实现tryReleaseShared类来判断是否释放成功,

实现如下:

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                //获取当前许可数量
                int current = getState();
                //计算回收后的数量
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                //CAS改变许可数量成功,返回true
                if (compareAndSetState(current, next))
                    return true;
            }
        }

从上面可以看到,一旦CAS改变许可数量成功,那么就会调用doReleaseShared()方法释放阻塞的线程。

减小许可数量

Semaphore还有减小许可数量的方法,该方法可以用于用于当资源用完不能再用时,这时就可以减小许可证。代码如下:

protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }

可以看到,委托给了Sync,Sync的reducePermits方法如下:

  final void reducePermits(int reductions) {
            for (;;) {
                //得到当前剩余许可数量
                int current = getState();
                //得到减完之后的许可数量
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                //如果CAS改变成功
                if (compareAndSetState(current, next))
                    return;
            }
        }

从上面可以看到,就是CAS改变AQS中的state变量,因为该变量代表许可证的数量。

获取剩余许可数量

Semaphore还可以一次将剩余的许可数量全部取走,该方法是drain方法,

如下:

public int drainPermits() {
        return sync.drainPermits();
    }

Sync的实现如下:

 final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }

可以看到,就是CAS将许可数量置为0。

4、总结

Semaphore是信号量,用于管理一组资源。其内部是基于AQS的共享模式,AQS的状态表示许可证的数量,在许可证数量不够时,线程将会被挂起;而一旦有一个线程释放一个资源,那么就有可能重新唤醒等待队列中的线程继续执行。

到此这篇关于Java多线程之Semaphore实现信号灯的文章就介绍到这了,更多相关Java多线程 Semaphore实现信号灯内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • JAVA 多线程之信号量(Semaphore)实例详解

    java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. 一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.拿到信号量的线程可以进入

  • 分析Java并发编程之信号量Semaphore

    目录 一.认识Semaphore 1.1.Semaphore 的使用场景 1.2.Semaphore 使用 1.3.Semaphore 信号量的模型 二.Semaphore 深入理解 2.1.Semaphore 基本属性 2.2.Semaphore 的公平性和非公平性 2.3.其他 Semaphore 方法 一.认识Semaphore 1.1.Semaphore 的使用场景 Semaphore 的使用场景主要用于流量控制,比如数据库连接,同时使用的数据库连接会有数量限制,数据库连接不能超过一定的

  • Java使用Semaphore对单接口进行限流

    目录 一.实战说明 1.1 效果说明 1.2 核心知识点 二. 环境搭建 三.限流演示 3.1 并发请求工具 3.2 效果示例图 一.实战说明 1.1 效果说明 本篇主要讲如何使用Semaphore对单接口进行限流,例如有如下场景 a. A系统的有a接口主要给B系统调用,现在希望对B系统进行限流,例如处理峰值在100,超过100的请求快速失败 b. 接口作为总闸入口,希望限制所有外来访问,例如某个房间只能同时100个玩家在线,只有前面的处理完后面的才能继续请求 c. 其他类型场景,也就是资源固定

  • java线程并发semaphore类示例

    复制代码 代码如下: package com.yao; import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore; /** * Java 5.0里新加了4个协调线程间进程的同步装置,它们分别是: * Semaphore, CountDownLatch, CyclicBarrier和Exchanger. * 本例主要介

  • Java多线程之Semaphore实现信号灯

    目录 1 Semaphore的主要方法 2 实例讲解 实现单例模式 3 源码解析 构造方法 获取许可 释放许可 减小许可数量 获取剩余许可数量 前言: Semaphore是计数信号量.Semaphore管理一系列许可证.每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证:每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法.然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量. Semaphore可以维护当前访问自

  • java多线程之CyclicBarrier的使用方法

    java多线程之CyclicBarrier的使用方法 public class CyclicBarrierTest { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); final CyclicBarrier cb = new CyclicBarrier(3); for(int i=0;i<3;i++){ Runnable runnable = n

  • Java多线程之readwritelock读写分离的实现代码

    在多线程开发中,经常会出现一种情况,我们希望读写分离.就是对于读取这个动作来说,可以同时有多个线程同时去读取这个资源,但是对于写这个动作来说,只能同时有一个线程来操作,而且同时,当有一个写线程在操作这个资源的时候,其他的读线程是不能来操作这个资源的,这样就极大的发挥了多线程的特点,能很好的将多线程的能力发挥出来. 在Java中,ReadWriteLock这个接口就为我们实现了这个需求,通过他的实现类ReentrantReadWriteLock我们可以很简单的来实现刚才的效果,下面我们使用一个例子

  • Java并发编程之Semaphore(信号量)详解及实例

    Java并发编程之Semaphore(信号量)详解及实例 概述 通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预知的接入量呢?及在同一时刻只能获得指定数目的数据库连接,在JDK1.5 java.util.concurrent 包中引入了Semaphore(信号量),信号量是在简单上锁的基础上实现的,相当于能令线程安全执行,并初始化为可用资源个数的计数器,通常用于限制可以访问某些资源(物

  • Java多线程之volatile关键字及内存屏障实例解析

    前面一篇文章在介绍Java内存模型的三大特性(原子性.可见性.有序性)时,在可见性和有序性中都提到了volatile关键字,那这篇文章就来介绍volatile关键字的内存语义以及实现其特性的内存屏障. volatile是JVM提供的一种最轻量级的同步机制,因为Java内存模型为volatile定义特殊的访问规则,使其可以实现Java内存模型中的两大特性:可见性和有序性.正因为volatile关键字具有这两大特性,所以我们可以使用volatile关键字解决多线程中的某些同步问题. volatile

  • java多线程之Phaser的使用详解

    前面的文章中我们讲到了CyclicBarrier.CountDownLatch的使用,这里再回顾一下CountDownLatch主要用在一个线程等待多个线程执行完毕的情况,而CyclicBarrier用在多个线程互相等待执行完毕的情况. Phaser是java 7 引入的新的并发API.他引入了新的Phaser的概念,我们可以将其看成一个一个的阶段,每个阶段都有需要执行的线程任务,任务执行完毕就进入下一个阶段.所以Phaser特别适合使用在重复执行或者重用的情况. 基本使用 在CyclicBar

  • java多线程之Future和FutureTask使用实例

    Executor框架使用Runnable 作为其基本的任务表示形式.Runnable是一种有局限性的抽象,然后可以写入日志,或者共享的数据结构,但是他不能返回一个值. 许多任务实际上都是存在延迟计算的:执行数据库查询,从网络上获取资源,或者某个复杂耗时的计算.对于这种任务,Callable是一个更好的抽象,他能返回一个值,并可能抛出一个异常.Future表示一个任务的周期,并提供了相应的方法来判断是否已经完成或者取消,以及获取任务的结果和取消任务. public interface Callab

  • Java多线程之Park和Unpark原理

    一.基本使用 它们是 LockSupport 类中的方法 // 暂停当前线程 LockSupport.park(); // 恢复某个线程的运行 LockSupport.unpark(暂停线程对象) 应用:先 park 再 unpark Thread t1 = new Thread(() -> { log.debug("start..."); sleep(1); log.debug("park..."); LockSupport.park(); log.debu

  • Java多线程之Disruptor入门

    一.Disruptor简介 Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级).基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注.2011年,企业应用软件专家Martin Fowler专门撰写长文介绍.同年它还获得了Oracle官方的Duke大奖.目前,包括Apache Storm.Camel.Log4j 2在内的很多知名项

  • Java多线程之synchronized关键字的使用

    一.使用在非静态方法上 public synchronized void syzDemo(){ System.out.println(System.currentTimeMillis()); System.out.println("进入synchronized锁:syzDemo"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } 二.使用在静态方法上 publi

随机推荐