Java中ReentrantLock和ReentrantReadWriteLock的原理
目录
- ReentrantLock 原理
- 概念
- 核心变量和构造器
- 核心方法
- ReentrantReadWriteLock 原理
- 用例
- 核心变量和构造器
- Sync类
- tryAcquire获取写锁的流程
- tryAcquireShared获取读锁的流程获取写锁的流程
- fullTryAcquireShared完全获取读锁流程
- tryRelease释放写锁的流程
- tryReleaseShared释放读锁的流程
- readerShouldBlock和writerShouldBlock模板方法公平锁实现
- readerShouldBlock和writerShouldBlock模板方法非公平锁实现
ReentrantLock 原理
概念
基于AQS实现的可重入锁实现类。
核心变量和构造器
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; public ReentrantLock() { // 默认为非公平锁。为何默认为非公平锁?因为通过大量测试下来,发现非公平锁的性能优于公平锁 sync = new NonfairSync(); } public ReentrantLock(boolean fair) { // 由fair变量来表明选择锁类型 sync = fair ? new FairSync() : new NonfairSync(); } abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); // 非公平锁标准获取锁方法 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 当执行到这里时,正好获取所得线程释放了锁,那么可以尝试抢锁 if (c == 0) { // 继续抢锁,不看有没有线程排队 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 当前线程就是持有锁的线程,表明锁重入 else if (current == getExclusiveOwnerThread()) { // 利用state整形变量进行次数记录 int nextc = c + acquires; // 如果超过了int表示范围,表明符号溢出,所以抛出异常0111 1111 + 1 = 1000 0000 if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false 表明需要AQS来将当前线程放入阻塞队列,然后进行阻塞操作等待唤醒获取锁 return false; } // 公平锁和非公平锁公用方法,因为在释放锁的时候,并不区分是否公平 protected final boolean tryRelease(int releases) { int c = getState() - releases; // 如果当前线程不是上锁的那个线程 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 不是重入锁,那么当前线程一定是释放锁了,然后我们把当前AQS用于保存当前锁对象的变量ExclusiveOwnerThread设置为null,表明释放锁成功 if (c == 0) { free = true; setExclusiveOwnerThread(null); } // 注意:此时state全局变量没有改变,也就意味着在setState之前,没有别的线程能够获取锁,这时保证了以上的操作原子性 setState(c); // 告诉AQS,我当前释放锁成功了,你可以去唤醒正在等待锁的线程了 return free; } protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { return new ConditionObject(); } } static final class NonfairSync extends Sync { // 由ReentrantLock调用获取锁 final void lock() { // 非公平锁,直接抢锁,不管有没有线程排队 if (compareAndSetState(0, 1)) // 上锁成功,那么标识当前线程为获取锁的线程 setExclusiveOwnerThread(Thread.currentThread()); else // 抢锁失败,进入AQS的标准获取锁流程 acquire(1); } protected final boolean tryAcquire(int acquires) { // 使用父类提供的获取非公平锁的方法来获取锁 return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { // 由ReentrantLock调用 final void lock() { // 没有尝试抢锁,直接进入AQS标准获取锁流程 acquire(1); } // AQS调用,子类自己实现获取锁的流程 protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); // 此时有可能正好获取锁的线程释放了锁,也有可能本身就没有线程获取锁 if (c == 0) { // 注意:这里和非公平锁的区别在于:hasQueuedPredecessors看看队列中是否有线程正在排队,没有的话再通过CAS抢锁 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { // 抢锁成功 setExclusiveOwnerThread(current); return true; } } // 当前线程就是获取锁的线程,那么这里是锁重入,和非公平锁操作一模一样 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } // 返回false 表明需要AQS来将当前线程放入阻塞队列,然后进行阻塞操作等待唤醒获取锁 return false; } } }
核心方法
获取锁操作:
public void lock() { // 直接通过sync同步器上锁 sync.lock(); }
释放锁操作:
public void unlock() { sync.release(1); }
ReentrantReadWriteLock 原理
用例
将原来的锁,分割为两把锁:读锁、写锁。适用于读多写少的场景,读锁可以并发,写锁与其他锁互斥。写写互斥、写读互斥、读读兼容。
public class ThreadDemo { static volatile int a; public static void readA() { System.out.println(a); } public static void writeA() { a++; } public static void main(String[] args) { ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock(); ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock(); ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock(); Thread readThread1 = new Thread(() -> { readLock.lock(); try { readA(); } finally { readLock.unlock(); } }); Thread readThread2 = new Thread(() -> { readLock.lock(); try { readA(); } finally { readLock.unlock(); } }); Thread writeThread = new Thread(() -> { writeLock.lock(); try { writeA(); } finally { writeLock.unlock(); } }); readThread1.start(); readThread2.start(); writeThread.start(); } }
核心变量和构造器
该接口用于获取读锁和写锁对象
public interface ReadWriteLock { // 用于获取读锁 Lock readLock(); // 用于获取写锁 Lock writeLock(); }
readerLock和writerLock变量用于支撑以上描述的ReadWriteLock接口的读锁和写锁方法。通过构造方法得知,读写锁对象的创建和用例均依赖于公平锁或者非公平锁同步器。
public class ReentrantReadWriteLock implements ReadWriteLock { // 读锁对象 private final ReentrantReadWriteLock.ReadLock readerLock; // 写锁对象 private final ReentrantReadWriteLock.WriteLock writerLock; // 同步器 final Sync sync; // 默认构造器,创建了非公平锁 public ReentrantReadWriteLock() { this(false); } // 根据fair变量,来选择创建不同的锁:公平锁 FairSync 和非公平锁 NonfairSync public ReentrantReadWriteLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); // 用同步器来创建读写锁对象 readerLock = new ReadLock(this); writerLock = new WriteLock(this); } public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; } public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; } }
Sync类
核心变量和构造器
我们说读锁可以多个线程同时持有,而写锁只允许一个线程持有,此时我们称 读锁-----共享锁 写锁------互斥锁(排他锁)。然后我们在AQS中了解到一个变量state,它是32位的值,那么我们这里将其切割为高16位和低16位。
abstract static class Sync extends AbstractQueuedSynchronizer { // 高16位用于表示读锁 static final int SHARED_SHIFT = 16; // 用于对高16位操作:加1 减1 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 最大读锁量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 用于获取低16位的值。例如 获取低八位:0000 0000 1111 1111 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; /** 获取当前持有读锁的线程数量 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } /** 获取当前持有写锁的线程数量 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; } // 高16位为所有读锁获取,那么我想知道每个线程对于读锁重入的次数?采用ThreadLocal来进行统计,每个线程自己统计自己的 static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); } // 继承自ThreadLocal,重写了其中的initialValue方法,该方法将在线程第一次获取该变量时调用初始化HoldCounter计数器 static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } } // 创建ThreadLocal对象 private transient ThreadLocalHoldCounter readHolds; // 缓存最后一个线程获取的读锁数量 private transient HoldCounter cachedHoldCounter; // 保存获取到该锁的第一个读锁线程 private transient Thread firstReader = null; // 保存第一个该锁的第一个读锁线程获取到的读锁数量 private transient int firstReaderHoldCount; Sync() { // 构造器中初始化ThreadLocalHoldCounter ThreadLocal对象 readHolds = new ThreadLocalHoldCounter(); // 用于保证可见性,使用了state变量的volatile语义 setState(getState()); } }
tryAcquire获取写锁的流程
由AQS调用,用于子类实现自己的上锁逻辑,和原有获取互斥锁保持一致,
protected final boolean tryAcquire(int acquires) { // 获取当前线程 Thread current = Thread.currentThread(); // 获取当前状态值和互斥锁的数量 int c = getState(); int w = exclusiveCount(c); // 状态值有效 if (c != 0) { // 有线程获取到了读锁或者当前线程不是持有互斥锁的线程 if (w == 0 || // 有线程获取到了读锁 current != getExclusiveOwnerThread()) // 有线程获取到了写锁 // 返回false 让AQS执行阻塞操作 return false; // 写锁重入,而又由于写锁的数量保存在低16位,所以直接加就行了 if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } // 既没有读锁,也没有写锁 if (writerShouldBlock() || // 由子类实现判断当前线程是否应该获取写锁 !compareAndSetState(c, c + acquires)) // 通过CAS抢写锁 return false; // 获取写锁成功,那么将当前线程标识为获取互斥锁的线程对象 setExclusiveOwnerThread(current); return true; }
tryAcquireShared获取读锁的流程获取写锁的流程
protected final int tryAcquireShared(int unused) { // 获取到当前线程对象 Thread current = Thread.currentThread(); // 获取到当前状态值 int c = getState(); if (exclusiveCount(c) != 0 && // 有没有线程持有写锁 getExclusiveOwnerThread() != current) // 如果有线程获取到了互斥锁,那么进一步看看是不是当前线程 // 不是当前线程,那么直接返回-1,告诉AQS获取共享锁失败 return -1; // 获取到读锁的持有数量 int r = sharedCount(c); if (!readerShouldBlock() && // 让子类来判定当前获取读锁的线程是否应该被阻塞 r < MAX_COUNT && // 判断是否发生了溢出 compareAndSetState(c, c + SHARED_UNIT)) { // 直接CAS 增加state的高16位的读锁持有数量 // 增加高16位之前的计数为0,此时表明当前线程就是第一个获取读锁的线程 if (r == 0) { // 注意:持有两个变量来优化threadlocal firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 当前获取读锁的线程就是一个线程,那么此时表明:锁重入,直接++计数位即可 firstReaderHoldCount++; } else { // 当前线程不是第一个读线程,此时将其获取读锁的次数保存在ThreadLocal中 HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } // 有很多同学走到这里,直接懵逼?不知道这是啥情况?经验:在看doug lea写的代码时,请注意:经常做优化,就是把一些常见的场景前置,保证性能 return fullTryAcquireShared(current); }
fullTryAcquireShared完全获取读锁流程
final int fullTryAcquireShared(Thread current) { HoldCounter rh = null; for (;;) { int c = getState(); // 当前已经有线程获取到写锁且当前获取写锁的线程不是,当前线程 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { // 子类判断当前线程应该阻塞 if (firstReader == current) { // 当前线程就是第一个获取到读锁的线程 } else { // 获取到当前线程记录读锁重入次数的HoldCounter对象 if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } // 当前读锁重入次数为0时,表明没有获取读锁,此时返回-1,阻塞当前线程 if (rh.count == 0) return -1; } } // 读锁获取次数溢出 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // CAS增加读锁次数 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1; } } }
tryRelease释放写锁的流程
protected final boolean tryRelease(int releases) { // 没有获取写锁,为啥能释放写锁呢? if (!isHeldExclusively()) throw new IllegalMonitorStateException(); int nextc = getState() - releases; // 释放完毕后,写锁状态是否为0(锁重入),因为此时计算的不是当前state,是nextc boolean free = exclusiveCount(nextc) == 0; // 如果下一个状态值为0,此时表明当前线程完全释放了锁,也即锁重入为0,那么将当前线程对象从OwnerThread中移除 if (free) setExclusiveOwnerThread(null); // 此时设置全局state变量即可 setState(nextc); // 如果返回为true,那么由AQS完成后面线程的唤醒 return free; }
tryReleaseShared释放读锁的流程
释放时,需要考虑:重入多少次,就释放多少次。总结:先完成自己的释放,然后再完成共享的高16位的释放。
protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 当前线程是第一个获取到读锁的线程 if (firstReader == current) { // 当前重入次数为1,代表什么?代表可以直接释放,如果不是1,那么表明还持有多个读锁,也即重入多次,那么直接-- if (firstReaderHoldCount == 1) firstReader = null; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { // 当前线程已经释放完读锁,那么不需要在ThreadLocal里持有HoldCounter对象 readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } for (;;) { // CAS释放高16位计数 int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // 释放完毕后是否为0,为无锁状态,此时需要干啥?由AQS来唤醒阻塞的线程 return nextc == 0; } }
readerShouldBlock和writerShouldBlock模板方法公平锁实现
判断条件只有一个:hasQueuedPredecessors()方法,就是看看AQS的阻塞队列里是否有其他线程正在等待,如果有排队去。
总结:有人在排队,那么不插队。w->r->r->r 此时来了个r:w->r->r->r->r, 此时来了个w:w->r->r->r->w。
static final class FairSync extends Sync { final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } // w->r->r r获取锁 w->r->r-r }
readerShouldBlock和writerShouldBlock模板方法非公平锁实现
写线程永远false,因为读写锁本身适用的是读多写少,此时不应该 让写线程饥饿,而且非公平,写锁永远不阻塞,让它抢,不管前面是否有人排队,先抢了再说。apparentlyFirstQueuedIsExclusive()第一个排队的是不是写线程。r(10),当前线程是第十一个,此时已经有一个写线程排队,r(10)->w,此时排队去。r(10)->w->r。
static final class NonfairSync extends Sync { final boolean writerShouldBlock() { return false; } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } // w->r->r r获取锁 r->r->r }
到此这篇关于Java中ReentrantLock和ReentrantReadWriteLock的原理的文章就介绍到这了,更多相关Java ReentrantLock内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!