Java多线程之ReentrantReadWriteLock源码解析

一、介绍

1.1 ReentrantReadWriteLock

ReentrantReadWriteLock 是一个读写锁,允许多个读或者一个写线程在执行。

内部的 Sync 继承自 AQS,这个 Sync 包含一个共享读锁 ReadLock 和一个独占写锁 WriteLock。

该锁可以设置公平和非公平,默认非公平。

一个持有写锁的线程可以获取读锁。如果该线程先持有写锁,再持有读锁并释放写锁,称为锁降级。

WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常。

public class ReentrantReadWriteLock implements ReadWriteLock {
    /** 读锁 */
    private final ReentrantReadWriteLock.ReadLock readerLock;
    /** 写锁 */
    private final ReentrantReadWriteLock.WriteLock writerLock;
    /** 持有的AQS子类对象 */
    final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {}

    static final class NonfairSync extends Sync {}

    static final class FairSync extends Sync {}

    public static class ReadLock implements Lock {}

    public static class WriteLock implements Lock {}

    //默认非公平
    public ReentrantReadWriteLock() {
        this(false);
    }

    public ReentrantReadWriteLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
        readerLock = new ReadLock(this);
        writerLock = new WriteLock(this);
    }

    public static class ReadLock implements Lock {
    	private final Sync sync;
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

    public static class WriteLock implements Lock {
    	private final Sync sync;
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
    }

}

1.2 state

Sync 继承了 AQS,其中有一个 int 的成员变量 state,int 共32位,这里将其视为两部分,高16位表示读的数量,低16位表示写的数量,这里的数量表示线程重入后的总数量。

abstract static class Sync extends AbstractQueuedSynchronizer {
  	//继承的一个int的成员变量,将其拆分为高16位和低16位
    //private volatile int state;
    static final int SHARED_SHIFT   = 16;
  	//读一次,锁增加的值
    static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
    static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
    static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

    //读的数量
    static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
    //写的数量
    static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
}

1.3 HoldCounter

读锁使用了一个 ThreadLocal<HoldCounter> 让每个线程有一个线程私有的 HoldCounterHoldCounter包含一个线程 id 以及读重入的次数。

查找对应线程的HoldCounter 其实只用一个 ThreadLocalHoldCounter 也足够了。这里为了加快查询,用了两个额外的缓存,即 cachedHoldCounterfirstReaderfirstReaderHoldCount(后两个组合起来相当于一个 HoldCounter)。

在读锁的相关操作中,先检查 firstReader 是否为当前线程,否则检查 cachedHoldCounter 内部的线程是否为当前线程,如果失败最后会通过 readHolds 来获取当前线程的 HoldCounter

static final class HoldCounter {
    int count = 0;
    // 使用线程id,而不是线程的引用。这样可以防止垃圾不被回收
    final long tid = getThreadId(Thread.currentThread());
}

static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
//使用的ThreadLocal
private transient ThreadLocalHoldCounter readHolds;
//一个缓存
private transient HoldCounter cachedHoldCounter;
//组合起来相当于一个缓存
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

二、读锁

2.1 读锁的获取

下面讲解 tryAcquireSharedtryReadLocktryReadLock 是一种直接抢占的非公平获取,和 tryAcquireShared 中的非公平获取有所不同。

2.1.1 tryAcquireShared

根据注释

1.检查是否存在其他线程持有的写锁,是的话失败,返回 -1;

2.判断在当前公平状态下能否读,以及是否超过读的最大数量,满足条件则尝试 CAS 修改状态,让 state 加一个单位的读 SHARED_UNIT;修改成功后会根据三种情况,即首次读、firstReader 是当前线程,以及其他情况分别进行处理,成功,返回1;

3.前面未返回结果,会执行 fullTryAcquireShared

可以将该方法视为 fullTryAcquireShared 的一次快速尝试,如果尝试失败,会在 fullTryAcquireShared 的自旋中一直执行,直到返回成功或者失败。

//ReadLock
public void lock() {
    sync.acquireShared(1);
}
//AQS
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
//Sync
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
  	// 如果写的数量不是0,且写线程不是当前线程,失败
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
  	// 获取读的个数
    int r = sharedCount(c);
  	// 如果当前线程想要读,没有被堵塞
  	// 当前读的数量未超过最大允许的读的个数
  	// CAS执行成功
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
      	// 第一次读,修改firstReader和firstReaderHoldCount
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
          // 如果当前线程正好是firstReader
        } else if (firstReader == current) {
            firstReaderHoldCount++;
          // 其他情况,经过一系列处理后,使得rh为当前线程的HoldCounter
          // 对rh的记数加一
        } else {
            HoldCounter rh = cachedHoldCounter;
          	// 如果cached为null或者不是当前线程
            if (rh == null || rh.tid != getThreadId(current))
              	// 从readHolds中get,并修改cached
                cachedHoldCounter = rh = readHolds.get();
          	// 如果cached不是null,但记数为null
          	// 这种情况表示当前线程的HoldCounter已经被删除,即为null,
          	// 但cached仍然保留着null之前的那个HoldCounter,
          	// 为了方便,直接将cached设置给ThreadLocal即可
            else if (rh.count == 0)
                readHolds.set(rh);
          	//执行到这里,rh表示当前线程的HoldCounter,记数加1
            rh.count++;
        }
        return 1;
    }
  	// 前面未返回结果,执行第三步
    return fullTryAcquireShared(current);
}

2.1.2 fullTryAcquireShared

在上述的简单尝试 tryAcquireShared 未能确定结果后,执行第三步 fullTryAcquireShared 自旋来不断尝试获取读锁,直到成功获取锁返回1,或者满足相应条件认定失败返回-1。

1.其他线程持有写锁,失败

2.当前线程读的尝试满足堵塞条件表示当前线程排在其他线程后面,且当前线程没有持有锁即非重入的情况,失败

3.其他情况则不断自旋CAS,达到最大读的数量会抛出异常,其他情况在成功后返回1。

final int fullTryAcquireShared(Thread current) {
    /*
     * This code is in part redundant with that in
     * tryAcquireShared but is simpler overall by not
     * complicating tryAcquireShared with interactions between
     * retries and lazily reading hold counts.
     */
    HoldCounter rh = null;
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0) {
          	// 存在其他线程持有写锁,返回-1
            if (getExclusiveOwnerThread() != current)
                return -1;
            // else we hold the exclusive lock; blocking here
            // would cause deadlock.
          //没有写锁,且该线程排在其他线程后面,应该被堵塞
          //如果已经持有读锁,此次获取是重入,可以执行else if 之后的操作;
          //否则,会被堵塞,返回-1。
        } else if (readerShouldBlock()) {
            // Make sure we're not acquiring read lock reentrantly
          	//检查firstReader
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
            } else {
                if (rh == null) {
                    rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current)) {
                      	//执行到下一步rh是cached或者readHolds.get(),检查rh
                        rh = readHolds.get();
                      	//在get时,如果不存在,会产生一个新的HoldCounter
                      	//记数为0表示不是重入锁,会删除让其重新为null
                        if (rh.count == 0)
                            readHolds.remove();
                    }
                }
              	//返回失败
                if (rh.count == 0)
                    return -1;
            }
        }
      	//达到最大值,不允许继续增加
        if (sharedCount(c) == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
      	//和2.1.1中相似
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (sharedCount(c) == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                if (rh == null)
                    rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
                cachedHoldCounter = rh; // cache for release
            }
            return 1;
        }
    }
}

2.1.3 readerShouldBlock

该方法返回当前线程请求获得读锁是否应该被堵塞,在公平锁和非公平锁中的实现不同

在公平锁中,返回在排队的队列中当前线程之前是否存在其他线程,是的话返回 true,当前线程在队列头部或者队列为空返回 false。

// FairSync
final boolean readerShouldBlock() {
    return hasQueuedPredecessors();
}
// AQS
public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

在非公平锁中,队列中存在两个节点,且第二个节点是独占的写节点,会返回 true,使得新来的读线程堵塞。

这种方式只能在第二个节点是请求写锁的情况下返回 true,避免写锁的无限等待;如果写锁的请求节点在队列的其他位置,返回 false,不影响新来的读线程获取读锁。

如果不按照这种方式处理,而按照队列中的顺序进行处理,则只要存在其他线程在读,每次来一个新的线程请求读锁,总是成功,写锁会一直等待下去。

// NonfairSync
final boolean readerShouldBlock() {
    /* As a heuristic to avoid indefinite writer starvation,
     * block if the thread that momentarily appears to be head
     * of queue, if one exists, is a waiting writer.  This is
     * only a probabilistic effect since a new reader will not
     * block if there is a waiting writer behind other enabled
     * readers that have not yet drained from the queue.
     */
    return apparentlyFirstQueuedIsExclusive();
}
// AQS
final boolean apparentlyFirstQueuedIsExclusive() {
    Node h, s;
    return (h = head) != null &&
        (s = h.next)  != null &&
        !s.isShared()         &&
        s.thread != null;
}

2.1.4 tryReadLock

fullTryAcquireShared 有相似之处,该方法总是直接去抢占锁,直到其他线程获取写锁返回失败,或者当前当前线程获取读锁返回成功。

//ReadLock
public boolean tryLock() {
    return sync.tryReadLock();
}
//Sync
final boolean tryReadLock() {
    Thread current = Thread.currentThread();
    for (;;) {
        int c = getState();
        if (exclusiveCount(c) != 0 &&
            getExclusiveOwnerThread() != current)
            return false;
        int r = sharedCount(c);
        if (r == MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        if (compareAndSetState(c, c + SHARED_UNIT)) {
            if (r == 0) {
                firstReader = current;
                firstReaderHoldCount = 1;
            } else if (firstReader == current) {
                firstReaderHoldCount++;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    cachedHoldCounter = rh = readHolds.get();
                else if (rh.count == 0)
                    readHolds.set(rh);
                rh.count++;
            }
            return true;
        }
    }
}

2.2 读锁的释放

tryReleaseShared 在 if/else 中实现了通过 first/cached/readHolds 获取相应的 HoldCounter,并修改其中的记数,记数为0则删除;在 for 中,不断自旋实现 CAS 修改状态 c,如果修改后的状态为0,表示读写锁全部释放,返回 true,否则是 false。

// ReadLockpublic void unlock() {    sync.releaseShared(1);}// AQSpublic final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}// Syncprotected final boolean tryReleaseShared(int unused) {    Thread current = Thread.currentThread();  	// 先检查 firstReader是否是当前线程    if (firstReader == current) {        // assert firstReaderHoldCount > 0;        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;      //否则,处理 cached/readHolds中的HoldCounter    } else {        HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != getThreadId(current))            rh = readHolds.get();        int count = rh.count;        if (count <= 1) {            readHolds.remove();            if (count <= 0)                throw unmatchedUnlockException();        }        --rh.count;    }  	//自旋修改 state    for (;;) {        int c = getState();        int nextc = c - SHARED_UNIT;        if (compareAndSetState(c, nextc))            // Releasing the read lock has no effect on readers,            // but it may allow waiting writers to proceed if            // both read and write locks are now free.          	//只有读写锁均释放干净,才返回true            return nextc == 0;    }}

三、写锁

3.1 写锁的获取

下面讲解 tryAcquiretryWriteLocktryWriteLock 是一种非公平的获取。

3.1.1 tryAcquire

根据注释,tryAcquire 分为三步

1.如果读记数非0,或者写记数非0且写线程不是当前线程,失败

2.写锁的获取应该被堵塞或者CAS失败,失败

3.其他情况,写重入和新来的写线程,均成功

//WriteLockpublic void lock() {    sync.acquire(1);}//AQSpublic final void acquire(int arg) {    if (!tryAcquire(arg) &&        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))        selfInterrupt();}//Syncprotected final boolean tryAcquire(int acquires) {    /*     * Walkthrough:     * 1. If read count nonzero or write count nonzero     *    and owner is a different thread, fail.     * 2. If count would saturate, fail. (This can only     *    happen if count is already nonzero.)     * 3. Otherwise, this thread is eligible for lock if     *    it is either a reentrant acquire or     *    queue policy allows it. If so, update state     *    and set owner.     */    Thread current = Thread.currentThread();    int c = getState();    int w = exclusiveCount(c);  	//c分为两部分,写和读    if (c != 0) {        // (Note: if c != 0 and w == 0 then shared count != 0)      	// c非0,w是0,则读记数非0 || 独占的写线程不是当前线程      	// 返回 false        if (w == 0 || current != getExclusiveOwnerThread())            return false;        if (w + exclusiveCount(acquires) > MAX_COUNT)            throw new Error("Maximum lock count exceeded");        // Reentrant acquire      	// 重入的情况        setState(c + acquires);        return true;    }  	// 写应该被堵塞或者CAS失败,返回false    if (writerShouldBlock() ||        !compareAndSetState(c, c + acquires))        return false;  	// 非重入,在CAS成功后,设定独占写线程为当前线程,返回true    setExclusiveOwnerThread(current);    return true;}

3.1.2 writerShouldBlock

在公平锁中,检查队列前面是否有其他线程在排队,在非公平锁中,总是返回false,即总是不堵塞。

//FairSyncfinal boolean writerShouldBlock() {    return hasQueuedPredecessors();}//NonfairSyncfinal boolean writerShouldBlock() {    return false; // writers can always barge}

3.1.3 tryWriteLock

tryAcquire 在非公平锁的写法基本一样。

final boolean tryWriteLock() {    Thread current = Thread.currentThread();    int c = getState();    if (c != 0) {        int w = exclusiveCount(c);        if (w == 0 || current != getExclusiveOwnerThread())            return false;        if (w == MAX_COUNT)            throw new Error("Maximum lock count exceeded");    }    if (!compareAndSetState(c, c + 1))        return false;    setExclusiveOwnerThread(current);    return true;}

3.2 写锁的释放

tryRelease 中,修改相应的状态,如果修改后写锁记数为0,则返回 true。

//WriteLockpublic void unlock() {    sync.release(1);}//AQSpublic final boolean release(int arg) {    if (tryRelease(arg)) {        Node h = head;        if (h != null && h.waitStatus != 0)            unparkSuccessor(h);        return true;    }    return false;}//Syncprotected final boolean tryRelease(int releases) {  	// 首先检查当前线程是否持有写锁    if (!isHeldExclusively())        throw new IllegalMonitorStateException();    int nextc = getState() - releases;  	// 根据修改后的写记数来确定free    boolean free = exclusiveCount(nextc) == 0;  	// 此时,写锁完全释放,设定写独占线程为null    if (free)        setExclusiveOwnerThread(null);    setState(nextc);  	// 返回 free    return free;}

四、锁降级

如果一个线程已经持有写锁,再去获取读锁并释放写锁,这个过程称为锁降级。

持有写锁的时候去获取读锁,只有该持有写锁的线程能够成功获取读锁,然后再释放写锁,保证此时当前线程是有读锁的;如果有写锁,先释放写锁,再获取读锁,可能暂时不能获取读锁,会在队列中排队等待。

到此这篇关于Java基础之ReentrantReadWriteLock源码解析的文章就介绍到这了,更多相关Java ReentrantReadWriteLock源码解析内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java多线程之synchronized关键字的使用

    一.使用在非静态方法上 public synchronized void syzDemo(){ System.out.println(System.currentTimeMillis()); System.out.println("进入synchronized锁:syzDemo"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } 二.使用在静态方法上 publi

  • Java多线程 ReentrantReadWriteLock原理及实例详解

    读写锁ReentrantReadWriteLock概述 读写锁ReentrantReadWriteLock,使用它比ReentrantLock效率更高. 读写锁表示两个锁,一个是读操作相关的锁,称为共享锁:另一个是写操作相关的锁,称为排他锁. 1.读和读之间不互斥,因为读操作不会有线程安全问题 2.写和写之间互斥,避免一个写操作影响另外一个写操作,引发线程安全问题 3.读和写之间互斥,避免读操作的时候写操作修改了内容,引发线程安全问题 多个Thread可以同时进行读取操作,但是同一时刻只允许一个

  • Java concurrency之共享锁和ReentrantReadWriteLock_动力节点Java学院整理

    ReadWriteLock 和 ReentrantReadWriteLock介绍 ReadWriteLock,顾名思义,是读写锁.它维护了一对相关的锁 - - "读取锁"和"写入锁",一个用于读取操作,另一个用于写入操作. "读取锁"用于只读操作,它是"共享锁",能同时被多个线程获取. "写入锁"用于写入操作,它是"独占锁",写入锁只能被一个线程锁获取. 注意:不能同时存在读取锁和写入锁

  • Java多线程之哲学家就餐问题详解

    一.题目 教材提供一个哲学家就餐问题的解决方案的框架.本问题要求通过pthreads 互斥锁来实现这个解决方案. 哲学家 首先创建 5 个哲学家,每个用数字 0~4 来标识.每个哲学家作为一个单独的 线程运行. 可使用 Pthreads 创建线程.哲学家在思考和吃饭之间交替.为了模拟这两种活动,请让线程休眠 1 到 3 秒钟.当哲学家想要吃饭时,他调用函数: pickup_forks(int philosopher _number) 其中,philosopher _number 为想吃饭哲学家的

  • Java多线程之Park和Unpark原理

    一.基本使用 它们是 LockSupport 类中的方法 // 暂停当前线程 LockSupport.park(); // 恢复某个线程的运行 LockSupport.unpark(暂停线程对象) 应用:先 park 再 unpark Thread t1 = new Thread(() -> { log.debug("start..."); sleep(1); log.debug("park..."); LockSupport.park(); log.debu

  • Java多线程之Disruptor入门

    一.Disruptor简介 Disruptor目前是世界上最快的单机消息队列,由英国外汇交易公司LMAX开发,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级).基于Disruptor开发的系统单线程能支撑每秒600万订单,2010年在QCon演讲后,获得了业界关注.2011年,企业应用软件专家Martin Fowler专门撰写长文介绍.同年它还获得了Oracle官方的Duke大奖.目前,包括Apache Storm.Camel.Log4j 2在内的很多知名项

  • Java基础之多线程的三种实现方式

    一.前言 Java多线程实现的三种方式有继承Thread类,实现Runnable接口,使用ExectorService.Callable.Future实现有返回结果的多线程.其中前两种方式线程执行完后都没有返回值,只有最后一种是带返回值的. 二.继承Thread类实现多线程 1.Thread本质上也是实现了Runnable接口的一个实例,它代表一个线程的实例,并且,启动线程的唯一方法就是通过Thread类的start()实例方法. 2.start()方法是一个native方法,它将启动一个新线程

  • Java多线程之线程的创建

    一.三种创建方式 基于什么创建 创建的方式 Thread类 继承Thread类 Runnable接口 实现Runnable接口 callable接口 实现callable接口 二.通过Thread类创建 2.1 步骤 自定义线程类继承Thread类 重写run()方法,编写线程执行体(当成main()方法用) 创建线程对象,调用start()方法启动线程 2.2 案例 创建两个线程,其中一个线程打印100以内的偶数,另一个线程打印100以内的奇数 //主方法 public class Demo0

  • Java多线程之简单模拟售票功能

    一.创建 二.完整代码 package com.ql; import lombok.SneakyThrows; import okhttp3.Call; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import java.io.IOException; public class Mythread extends Thread { public Mythread(String name)

  • Java多线程之深入理解ReentrantLock

    前言 保证线程安全的方式有很多,比如CAS操作.synchronized.原子类.volatile保证可见性和ReentrantLock等,这篇文章我们主要探讨ReentrantLock的相关内容.本文基于JDK1.8讲述ReentrantLock. 一.可重入锁 所谓可重入锁,即一个线程已经获得了某个锁,当这个线程要再次获取这个锁时,依然可以获取成功,不会发生死锁的情况.synchronized就是一个可重入锁,除此之外,JDK提供的ReentrantLock也是一种可重入锁. 二.Reent

随机推荐