一文搞懂Java并发AQS的共享锁模式
目录
- 概述
- 自定义共享锁例子
- 核心原理机制
- 源码解析
- 成员变量
- 共享锁获取acquireShared(int)
- 共享释放releaseShared(int)
概述
这篇文章深入浅出理解Java并发AQS的独占锁模式讲解了AQS的独占锁实现原理,那么本篇文章在阐述AQS另外一个重要模式,共享锁模式,那什么是共享锁呢?
共享锁可以由多个线程同时获取, 比较典型的就是读锁,读操作并不会产生副作用,所以可以允许多个线程同时对数据进行读操作而不会有线程安全问题,jdk中的很多并发工具比如ReadWriteLock和CountdownLatch就是依赖AQS的共享锁实现的。
本文重点讲解下AQS是如何实现共享锁的。
自定义共享锁例子
首先我们通过AQS实现一个非常最最最轻量简单的共享锁例子,帮助大家对共享锁有一个整体的感知。
@Slf4j public class ShareLock { /** * 共享锁帮助类 */ private static class ShareSync extends AbstractQueuedSynchronizer { private int lockCount; /** * 创建共享锁帮助类,最多有count把共享锁,超过了则阻塞 * * @param count 共享锁数量 */ public ShareSync(int count) { this.lockCount = count; } /** * 尝试获取共享锁 * * @param arg 每次获取锁的数量 * @return 返回正数,表示后续其他线程获取共享锁可能成功; 返回0,表示后续其他线程无法获取共享锁;返回负数,表示当前线程获取共享锁失败 */ @Override protected int tryAcquireShared(int arg) { // 自旋 for (;;) { int c = getState(); // 如果持有锁的数量大于指定数量,返回-1,线程进入阻塞 if(c >= lockCount) { return -1; } int nextc = c + 1; // cas设置成功,返回1,获取到共享锁 if (compareAndSetState(c, nextc)) { return 1; } } } /** * 尝试释放共享锁 * * @param arg 释放锁的数量 * @return 如果释放后允许唤醒后续等待结点返回true,否则返回false */ @Override protected boolean tryReleaseShared(int arg) { // 自旋操作 for (; ; ) { int c = getState(); // 如果没有锁了 if (c == 0) { return false; } // 否则锁量-1 int nextc = c - 1; // cas修改状态 if (compareAndSetState(c, nextc)) { return true; } } } } private final ShareSync sync; public ShareLock(int count) { this.sync = new ShareSync(count); } /** * 加共享锁 */ public void lockShare() { sync.acquireShared(1); } /** * 释放共享锁 */ public void releaseShare() { sync.releaseShared(1); } }
创建内部类共享帮助锁ShareSync
类,继承自AbstractQueuedSynchronizer
类,实现了共享锁相关的方法tryAcquireShared()
和tryReleaseShared()
。
创建ShareLock
,提供了lockShare()
加锁和releaseShare()
两个API。
验证:
public static void main(String[] args) throws InterruptedException { ShareLock shareLock = new ShareLock(3); for (int i = 0; i < 5; i++) { new Thread(() -> { shareLock.lockShare(); try { log.info("lock success"); Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } finally { shareLock.releaseShare(); log.info("release success"); } }, "thread-" + i).start(); } Thread.sleep(10000); }
- 一共创建最多共同有3个线程共享的共享锁。
- 创建5个线程去竞争共享锁。
运行结果:
- 运行结果显示每次最多只有3个
lock success
,说明同时只有3个线程共享。 - 只有在释放共享锁以后,其他线程才能获取锁。
下面对它的实现原理一探究竟。
核心原理机制
共享模式也是由AQS提供的,首先我们关注下AQS的数据结构。
AQS内部维护了一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
AQS作为一个抽象方法,提供了加锁、和释放锁的框架,这里采用的模板方模式,在上面中提到的tryAcquireShared
、tryReleaseShared
就是和共享模式相关的模板方法。
方法名 | 描述 |
---|---|
protected int tryAcquireShared(int arg) | 共享方式。arg为获取锁的次数,尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。 |
protected boolean tryReleaseShared(int arg) | 共享方式。arg为释放锁的次数,尝试释放资源,如果释放后允许唤醒后续等待结点返回True,否则返回False。 |
共享模式的入口方法如下:
方法名 | 描述 |
---|---|
void acquireShared(int arg) | 共享模式获取锁,不响应中断。 |
void acquireSharedInterruptibly(int arg) | 共享模式获取锁,响应中断。 |
tryAcquireSharedNanos(int arg, long nanosTimeout) | 尝试在共享模式下获取锁,如果中断则中止,如果超过给定超时则失败。 |
boolean releaseShared(int arg) | 共享模式下释放锁。 |
源码解析
上图是AQS的类结构图,其中标红部分是组成AQS的重要成员变量。
成员变量
1.state共享变量
AQS中里一个很重要的字段state,表示同步状态,是由volatile修饰的,用于展示当前临界资源的获锁情况。通过getState(),setState(),compareAndSetState()三个方法进行维护。
关于state的几个要点:
- 使用volatile修饰,保证多线程间的可见性。
- getState()、setState()、compareAndSetState()使用final修饰,限制子类不能对其重写。
- compareAndSetState()采用乐观锁思想的CAS算法,保证原子性操作。
2.CLH队列(FIFO队列)
AQS里另一个重要的概念就是CLH队列,它是一个双向链表队列,其内部由head和tail分别记录头结点和尾结点,队列的元素类型是Node。
private transient volatile Node head; private transient volatile Node tail;
Node的结构如下:
static final class Node { //共享模式下的等待标记 static final Node SHARED = new Node(); //独占模式下的等待标记 static final Node EXCLUSIVE = null; //表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 static final int CANCELLED = 1; //表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。 static final int SIGNAL = -1; //表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。 static final int CONDITION = -2; //共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 static final int PROPAGATE = -3; //状态,包括上面的四种状态值,初始值为0,一般是节点的初始状态 volatile int waitStatus; //上一个节点的引用 volatile Node prev; //下一个节点的引用 volatile Node next; //保存在当前节点的线程引用 volatile Thread thread; //condition队列的后续节点 Node nextWaiter; }
注意,waitSstatus负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
3.exclusiveOwnerThread
AQS通过继承AbstractOwnableSynchronizer类,拥有的属性。表示独占模式下同步器持有的线程。
共享锁获取acquireShared(int)
acquireShared(int)是共享锁模式下线程获取共享资源的入口方法,它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程无法响应中断。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
方法的整体流程如下:
- tryAcquireShared()尝试获取资源,需要自定义同步器去实现,返回负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。
- 如果失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。
doAcquireShared(int)
此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。
private void doAcquireShared(int arg) { //封装线程为共享Node 加入队列尾部 final Node node = addWaiter(Node.SHARED); //是否成功标志 boolean failed = true; try { //等待过程中是否被中断过的标志 boolean interrupted = false; // 自旋操作 for (;;) { // 获取前驱节点 final Node p = node.predecessor(); //如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的 if (p == head) { //尝试获取资源 int r = tryAcquireShared(arg); //成功 if (r >= 0) { //将head指向自己,还有剩余资源可以再唤醒之后的线程 setHeadAndPropagate(node, r); p.next = null; // help GC //如果等待过程中被打断过,此时将中断补上。 if (interrupted) selfInterrupt(); failed = false; return; } } //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt() if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared
方法的实现和获取独占锁中的acquireQueued
方法很类似,但是主要有一点不同,那就是线程在被唤醒后,若成功获取到了共享锁,还需要判断共享锁是否还能被其他线程获取,若可以,则继续向后唤醒它的下一个节点对应的线程。
setHeadAndPropagate(Node, int)
该方法主要将当前节点设置为头节点,同时判断条件是否符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; //head指向自己 setHead(node); //如果还有剩余量,继续唤醒下一个邻居线程 if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) // 唤醒操作 doReleaseShared(); } }
共享释放releaseShared(int)
releaseShared(int)
是共享模式下线程释放共享资源的入口,它会释放指定量的资源,如果成功释放且允许唤醒等待线程,它会唤醒等待队列里的其他线程来获取资源。
public final boolean releaseShared(int arg) { //尝试释放资源 if (tryReleaseShared(arg)) { //唤醒后继结点 doReleaseShared(); return true; } return false; }
方法的整体流程如下:
- tryReleaseShared尝试释放锁,这由自定义同步器去实现, 返回true表示释放成功。
- doReleaseShared唤醒后续队列中等待的节点,
doReleaseShared()
此方法主要用于唤醒队列中等待的共享节点。
private void doReleaseShared() { // 自旋操作 for (;;) { // 获取头节点 Node h = head; if (h != null && h != tail) { // 获取节点的等待状态 int ws = h.waitStatus; // 如果节点等待状态是-1, -1表示有责任唤醒后续节点的状态 if (ws == Node.SIGNAL) { // cas修改当前节点的等待状态为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; //唤醒后续节点 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head)// head发生变化 break; } }
逻辑是一个死循环,每次循环中重新读取一次head,然后保存在局部变量h中,再配合if(h == head) break;
,这样,循环检测到head没有变化时就会退出循环。注意,head变化一定是因为:acquire thread被唤醒,之后它成功获取锁,然后setHead设置了新head。而且注意,只有通过if(h == head) break;
即head不变才能退出循环,不然会执行多次循环。
if (h != null && h != tail)
判断队列是否至少有两个node,如果队列从来没有初始化过(head为null),或者head就是tail,那么中间逻辑直接不走,直接判断head是否变化了。
如果队列中有两个或以上个node,那么检查局部变量h的状态:
- 如果状态为SIGNAL,说明h的后继是需要被通知的。通过对CAS操作结果取反,将
compareAndSetWaitStatus(h, Node.SIGNAL, 0)
和unparkSuccessor(h)
绑定在了一起。说明了只要head成功得从SIGNAL修改为0,那么head的后继的代表线程肯定会被唤醒了。 - 如果状态为0,说明h的后继所代表的线程已经被唤醒或即将被唤醒,并且这个中间状态即将消失,要么由于acquire thread获取锁失败再次设置head为SIGNAL并再次阻塞,要么由于acquire thread获取锁成功而将自己(head后继)设置为新head并且只要head后继不是队尾,那么新head肯定为SIGNAL。所以设置这种中间状态的head的status为PROPAGATE,让其status又变成负数,这样可能被被唤醒线程检测到。
如果状态为PROPAGATE,直接判断head是否变化。
两个continue保证了进入那两个分支后,只有当CAS操作成功后,才可能去执行if(h == head) break;,才可能退出循环。
if(h == head) break;保证了,只要在某个循环的过程中有线程刚获取了锁且设置了新head,就会再次循环。目的当然是为了再次执行unparkSuccessor(h),即唤醒队列中第一个等待的线程。
以上就是一文搞懂Java并发AQS的共享锁模式的详细内容,更多关于Java AQS共享锁模式的资料请关注我们其它相关文章!