ReentrantLock从源码解析Java多线程同步学习

目录
  • 前言
  • 管程
    • 管程模型
  • MESA模型
    • 主要特点
  • AQS
    • 共享变量
    • 资源访问方式
    • 主要方法
    • 队列
    • node节点等待状态
  • ReentrantLock源码分析
    • 实例化ReentrantLock
    • 加锁
    • A线程加锁成功
    • B线程尝试加锁
  • 释放锁
  • 总结

前言

如今多线程编程已成为了现代软件开发中的重要部分,而并发编程中的线程同步问题更是一道难以逾越的坎。在Java语言中,synchronized是最基本的同步机制,但它也存在着许多问题,比如可重入性不足、死锁等等。为了解决这些问题,Java提供了更加高级的同步机制——ReentrantLock。

管程

管程(Monitor)是一种用于实现多线程同步的抽象数据类型,它可以用来协调不同线程之间的互斥和同步访问共享资源。通俗地说,管程就像一个门卫,控制着进入某个共享资源区域的线程数量和时间,以避免多个线程同时访问导致的数据竞争和混乱。

管程模型

  • Mesa管程模型:由美国计算机科学家Dijkstra提出,是最流行的管程模型之一。在Mesa管程模型中,每个管程也有一个条件变量和等待队列,但与Hoare管程不同的是,当一个线程请求进入管程时,如果条件不满足,该线程并不会立即被阻塞,而是继续执行后续操作,直到该线程主动放弃锁资源或者其他线程唤醒它。
  • Hoare管程模型:由英国计算机科学家C.A.R. Hoare提出,是最早的管程模型之一。在Hoare管程模型中,每个管程都有一个条件变量和一个等待队列,当一个线程请求进入管程时,如果条件不满足,该线程就会被阻塞并加入等待队列,直到条件满足后才被唤醒。
  • Brinch Hansen管程模型:由丹麦计算机科学家Per Brinch Hansen提出,是一种改进的管程模型。在Brinch Hansen管程模型中,每个管程也有一个条件变量和等待队列,但与其他管程模型不同的是,它允许多个线程同时在管程中等待,并且不需要像Hoare管程那样每次只唤醒一个等待线程。

在Java中,采用的是基于Mesa管程模型实现的管程机制。具体地,Java中的synchronized关键字就是基于Mesa管程模型实现的,包括Java中的AbstractQueuedSynchronizer(AQS)可以被看作是一种基于管程模型实现的同步框架。

MESA模型

主要特点

  • 互斥访问

MESA模型采用了互斥访问的机制,即同一时刻只能有一个线程进入管程执行代码。

  • 条件变量

MESA模型还引入了条件变量的概念,用于实现线程间的等待和唤醒操作。条件变量提供了一种机制,使得线程可以在等待某个条件成立时挂起,并在条件成立时被唤醒。

  • 等待队列

MESA模型使用等待队列来维护处于等待状态的线程,这些线程都在等待条件变量成立。等待队列由一个或多个条件变量组成,每个条件变量都有自己的等待队列。

  • 原子操作

MESA模型要求管程中的所有操作都是原子操作,即一旦进入管程,就不能被中断,直到操作执行完毕。

AQS

在讲ReentrantLock之前先说一下AQS,AQS(AbstractQueuedSynchronizer)是Java中的一个同步器,它是许多同步类(如ReentrantLock、Semaphore、CountDownLatch等)的基础。AQS提供了一种实现同步操作的框架,其中包括独占模式和共享模式,以及一个等待队列来管理线程的等待和唤醒。AQS也借鉴了Mesa模型的思想。

共享变量

AQS内部维护了属性volatile int state表示资源的可用状态
state三种访问方式:

  • getState()
  • setState()
  • compareAndSetState()

资源访问方式

Exclusive-独占,只有一个线程能执行,如ReentrantLock

Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

主要方法

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

队列

  • 同步等待队列: 主要用于维护获取锁失败时入队的线程。
  • 条件等待队列: 调用await()的时候会释放锁,然后线程会加入到条件队列,调用signal()唤醒的时候会把条件队列中的线程节点移动到同步队列中,等待再次获得锁。

node节点等待状态

  • 值为0,初始化状态,表示当前节点在sync队列中,等待着获取锁。
  • CANCELLED,值为1,表示当前的线程被取消;
  • SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark;
  • CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
  • PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;

ReentrantLock源码分析

在ReentrantLock中有一个内部类Sync会继承 AQS然后将同步器所有调用都映射到Sync对应的方法。

实例化ReentrantLock

/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}
/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock还提供了一个传布尔值的实例化方式,这个传true用来创建一个公平锁的,默认是创建非公平锁。非公平锁的话 sync是用NonfairSync来进行实例化,公平锁sync是用FairSync来进行实例化。

加锁

现在假设有AB两个线程来竞争锁

A线程加锁成功

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        //CAS修改state状态
        if (compareAndSetState(0, 1))
           //修改成功设置exclusiveOwnerThread
            setExclusiveOwnerThread(Thread.currentThread());
        else
           //尝试获取资源
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

假定A线程先CAS修改成功,他会设置exclusiveOwnerThread为A线程

B线程尝试加锁

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

我们先看tryAcquire()方法,这里体现出了他的可重入性。

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    //获取当前资源标识
    int c = getState();
    if (c == 0) {
        //如果资源没被占有CAS尝试加锁
        if (compareAndSetState(0, acquires)) {
            //修改成功设置exclusiveOwnerThread
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //资源被占有要判断占有资源的线程是不是当前线程,加锁成功设置的exclusiveOwnerThread值在这里就派上了用处
    else if (current == getExclusiveOwnerThread()) {
        //这下面就是将重入次数设置到资源标识里
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

根据上面源码我们可以看出B线程尝试加锁是失败的,接下来看尝试加锁失败后的方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg),该方法实现分为两个部分:

  • addWaiter(Node.EXCLUSIVE):该方法会将当前线程加入到等待队列(即 Sync 中的 queue)的尾部,并返回该节点。这个节点的模式是独占模式(Node.EXCLUSIVE),表示当前线程想要获取独占锁。
  • acquireQueued(Node node, int arg):该方法是一个循环方法,用于等待和获取锁。

我们先解析addWaiter

private Node addWaiter(Node mode) {
    //构建一个当前线程的node节点 这里prev 和 next 都为null
    Node node = new Node(Thread.currentThread(), mode);
    // 指向双向链表的尾节点的引用
    Node pred = tail;
    //B线程进来目前还未构建任何队列这里肯定是空的
    if (pred != null) {
        //如果已经构建过队列会把当前线程的node节点的上一个node节点指向tail尾节点
        node.prev = pred;
        //CAS操作把当前线程的node节点设置为新的tail尾节点
        if (compareAndSetTail(pred, node)) {
            //把旧的tail尾节点的下一个node节点指向当前线程的node节点
            pred.next = node;
            return node;
        }
    }
    //尾节点为空执行
    enq(node);
    return node;
}
private Node enq(final Node node) {
    //死循环当tail 尾节点不为空才会跳出
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            //用CAS构建出一个head空的node节点
            if (compareAndSetHead(new Node()))
            //将当前空的node节点交给尾节点(下一次循环就会走else分支)
                tail = head;
        } else {
            //把我们addWaiter中创建的node节点的prev指向了当前线程node节点
            node.prev = t;
            //将tail尾节点更改为当前线程的node节点
            if (compareAndSetTail(t, node)) {
                //将t的下一个节点指向当前线程创建的node节点
                t.next = node;
                return t;
            }
        }
    }
}

执行完这里初步的一个等待队列就构建好了

解析完addWaiter我们再来解析acquireQueued,addWaiter执行完后的结果会返回一个双向列表的node节点

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        //中断标志位
        boolean interrupted = false;
        for (;;) {
            //获取当前线程node节点的上一个节点
            final Node p = node.predecessor();
            //如果上一个节点就是head节点说明当前线程其实是处在队列第一位然后就会再次尝试加锁
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //这里是重点 这个方法来判断当前线程是否应该进入等待状态
            if (shouldParkAfterFailedAcquire(p, node) &&
                //调用LockSupport.park(this)阻塞了当前线程等待被唤醒
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取上一个节点的等待状态
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)//表示当前节点的后继节点包含的线程需要运行
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {//当前线程被取消
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//设置等待状态为-1
    }
    return false;
}

运行到parkAndCheckInterrupt()B线程就会被阻塞了,后续的逻辑我们在解锁操作unlock之后再继续说

释放锁

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
   //尝试释放锁
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //head节点不为空并且等待状态不是0就去进行unpark操作
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    //head节点等待状态
    int ws = node.waitStatus;
    //
    if (ws < 0)
       //将头节点的等待状态修改为0
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    //获取头节点的下一个节点(也就是B节点)
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {//节点为空或者线程处于取消状态
        s = null;
        //从尾节点往上找符合条件的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //对该线程进行unpark唤醒(B节点)
        LockSupport.unpark(s.thread);
}

唤醒之后我们的B线程就能继续往下走了,我们继续看刚刚的acquireQueued()方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            //这里尝试获取锁由于A线程释放了锁这里是肯定获取成功的
            if (p == head && tryAcquire(arg)) {
                //把head设置为当前节点(也就是往前移一位,并且把上一个节点指向指为null)
                setHead(node);
                //把刚刚的上一个节点也指向为null (这里他就没引用了会被GC回收掉)
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                //刚刚在这里阻塞现在被唤醒
                parkAndCheckInterrupt())
                //设置标志中断位为true 然后开始下一次循环
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

总结

  • 锁的实现方式:ReentrantLock内部通过维护一个state变量来表示锁的状态,其中高16位表示持有锁的线程ID,低16位表示重入次数。使用CAS操作来获取锁,如果当前锁未被持有,则将state的高16位设置为当前线程ID,并将低16位设置为1,表示重入次数为1;如果当前锁已经被持有,则判断持有锁的线程是否为当前线程,如果是,则将state的低16位加1表示重入,如果不是,则进入等待队列。
  • 等待队列:ReentrantLock中的等待队列采用了CLH队列的实现方式,每个等待线程会被封装成一个Node节点,节点中维护了前继节点、后继节点和等待状态等信息。当一个线程需要等待锁时,会将自己封装成一个Node节点插入到等待队列的尾部,并在自旋等待时自动阻塞。
  • 公平锁与非公平锁:ReentrantLock可以通过构造函数中的fair参数来指定锁的公平性,当fair为true时表示该锁是公平锁,即等待队列中的线程会按照先进先出的顺序获取锁;当fair为false时表示该锁是非公平锁,即等待队列中的线程会按照随机顺序获取锁。
  • 锁的释放:ReentrantLock中的锁释放采用了state变量的递减来实现,当一个线程释放锁时,会将state的低16位减1,如果减1后低16位变为0,则表示当前线程已经完全释放了锁,此时会将高16位清零,表示锁变为了可获取状态,等待队列中的线程可以继续竞争锁。

以上就是ReentrantLock从源码解析Java多线程同步学习的详细内容,更多关于Java多线程ReentrantLock的资料请关注我们其它相关文章!

(0)

相关推荐

  • java锁机制ReentrantLock源码实例分析

    目录 一:简述 二:ReentrantLock类图 三:流程简图 四:源码分析 lock()源码分析: 非公平实现: 公平锁实现: tryAcquire()方法 公平锁实现: 非公平锁实现: addWaiter() acquireQueued() shouldParkAfterFailedAcquire() parkAndCheckInterrupt() unlock()方法源码分析: tryRelease() unparkSuccessor() 五:总结 一:简述 ReentrantLock是

  • 详解Java中的ReentrantLock锁

    ReentrantLock锁 ReentrantLock是Java中常用的锁,属于乐观锁类型,多线程并发情况下.能保证共享数据安全性,线程间有序性 ReentrantLock通过原子操作和阻塞实现锁原理,一般使用lock获取锁,unlock释放锁, 下面说一下锁的基本使用和底层基本实现原理,lock和unlock底层 lock的时候可能被其他线程获得所,那么此线程会阻塞自己,关键原理底层用到Unsafe类的API: CAS和park 使用 java.util.concurrent.locks.R

  • java ReentrantLock并发锁使用详解

    目录 一.ReentrantLock是什么 1-1.ReentrantLock和synchronized区别 1-2.ReentrantLock的使用 1-2-1.ReentrantLock同步执行,类似synchronized 1-2-2.可重入锁 1-2-3.锁中断 1-2-4.获得锁超时失败 1-2-5.公平锁 一.ReentrantLock是什么 ReentrantLock是一种基于AQS框架的应用实现,是JDK中的一种线程并发访问的同步手段,它的功能类似于synchronized是一种

  • java ReentrantLock条件锁实现原理示例详解

    目录 引言 条件锁的使用 ReentrantLock.newCondition() Condition.await Condition.signal 引言 在前两篇文章中,我们了解了ReentrantLock内部公平锁和非公平锁的实现原理,可以知道其底层基于AQS,使用双向链表实现,同时在线程间通信方式(2)中我们了解到ReentrantLock也是支持条件锁的,接下来我们来看下,其内部条件锁的实现原理. 条件锁的使用 public static void main(String[] args)

  • java底层组合AQS实现类kReentrantLock源码解析

    目录 不啰嗦,我们直接开始! 引导语 上两小节我们学习了 AQS,本章我们就要来学习一下第一个 AQS 的实现类:ReentrantLock,看看其底层是如何组合 AQS ,实现了自己的那些功能. 本章的描述思路是先描述清楚 ReentrantLock 的构成组件,然后使用加锁和释放锁的方法把这些组件串起来. 1.类注释 ReentrantLock 中文我们习惯叫做可重入互斥锁,可重入的意思是同一个线程可以对同一个共享资源重复的加锁或释放锁,互斥就是 AQS 中的排它锁的意思,只允许一个线程获得

  • Java如何使用ReentrantLock实现长轮询

    Java代码 1. ReentrantLock 加锁阻塞,一个condition对应一个线程,以便于唤醒时使用该condition一定会唤醒该线程 /** * 获取探测点数据,长轮询实现 * @param messageId * @return */ public JSONObject getToutData(String messageId) { Message message = toutMessageCache.get(messageId); if (message == null) {

  • ReentrantLock从源码解析Java多线程同步学习

    目录 前言 管程 管程模型 MESA模型 主要特点 AQS 共享变量 资源访问方式 主要方法 队列 node节点等待状态 ReentrantLock源码分析 实例化ReentrantLock 加锁 A线程加锁成功 B线程尝试加锁 释放锁 总结 前言 如今多线程编程已成为了现代软件开发中的重要部分,而并发编程中的线程同步问题更是一道难以逾越的坎.在Java语言中,synchronized是最基本的同步机制,但它也存在着许多问题,比如可重入性不足.死锁等等.为了解决这些问题,Java提供了更加高级的

  • 深度源码解析Java 线程池的实现原理

    java 系统的运行归根到底是程序的运行,程序的运行归根到底是代码的执行,代码的执行归根到底是虚拟机的执行,虚拟机的执行其实就是操作系统的线程在执行,并且会占用一定的系统资源,如CPU.内存.磁盘.网络等等.所以,如何高效的使用这些资源就是程序员在平时写代码时候的一个努力的方向.本文要说的线程池就是一种对 CPU 利用的优化手段. 线程池,百度百科是这么解释的: 线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程.每个线程都使用默认的

  • Java并发之ReentrantLock类源码解析

    ReentrantLock内部由Sync类实例实现. Sync类定义于ReentrantLock内部. Sync继承于AbstractQueuedSynchronizer. AbstractQueuedSynchronizer继承于AbstractOwnableSynchronizer. AbstractOwnableSynchronizer类中只定义了一个exclusiveOwnerThread变量,表示当前拥有的线程. 除了Sync类,ReentrantLock内部还定义了两个实现类. No

  • 源码解析Java类加载器

    参考内容: 深入理解Java虚拟机(JVM高级特性与最佳实践) --周志明老师 尚硅谷深入理解JVM教学视频--宋红康老师 我们都知道Java的类加载器结构为下图所示(JDK8及之前,JDK9进行了模块化): 关于三层类加载器.双亲委派机制,本文不再板书,读者可自行百度. 那么在JDK的源码中,三层结构的具体实现是怎么样的呢? Bootstrap ClassLoader(引导类加载器) 引导类加载器是由C++实现的,并非Java代码实现,所以在Java代码中是无法获取到该类加载器的. 一般大家都

  • 深入剖析Java ReentrantLock的源码

    目录 1. ReentrantLock的使用 2. ReentrantLock类结构 3. ReentrantLock源码解析 3.1 ReentrantLock构造方法 3.2 非公平锁源码 3.3 公平锁源码 4. 总结 ReentrantLock和Synchronized都是Java开发中最常用的锁,与Synchronized这种JVM内置锁不同的是,ReentrantLock提供了更丰富的语义.可以创建公平锁或非公平锁.响应中断.超时等待.按条件唤醒等.在某些场景下,使用Reentran

  • Java源码解析之详解ReentrantLock

    ReentrantLock ReentrantLock是一种可重入的互斥锁,它的行为和作用与关键字synchronized有些类似,在并发场景下可以让多个线程按照一定的顺序访问同一资源.相比synchronized,ReentrantLock多了可扩展的能力,比如我们可以创建一个名为MyReentrantLock的类继承ReentrantLock,并重写部分方法使其更加高效. 当一个线程调用ReentrantLock.lock()方法时,如果ReentrantLock没有被其他线程持有,且不存在

  • Java源码解析之object类

    在源码的阅读过程中,可以了解别人实现某个功能的涉及思路,看看他们是怎么想,怎么做的.接下来,我们看看这篇Java源码解析之object的详细内容. Java基类Object java.lang.Object,Java所有类的父类,在你编写一个类的时候,若无指定父类(没有显式extends一个父类)编译器(一般编译器完成该步骤)会默认的添加Object为该类的父类(可以将该类反编译看其字节码,不过貌似Java7自带的反编译javap现在看不到了). 再说的详细点:假如类A,没有显式继承其他类,编译

  • Java源码解析之ConcurrentHashMap

    早期 ConcurrentHashMap,其实现是基于: 分离锁,也就是将内部进行分段(Segment),里面则是 HashEntry 的数组,和 HashMap 类似,哈希相同的条目也是以链表形式存放. HashEntry 内部使用 volatile 的 value 字段来保证可见性,也利用了不可变对象的机制以改进利用 Unsafe 提供的底层能力,比如 volatile access,去直接完成部分操作,以最优化性能,毕竟 Unsafe 中的很多操作都是 JVM intrinsic 优化过的

  • Java 线程池ThreadPoolExecutor源码解析

    目录 引导语 1.整体架构图 1.1.类结构 1.2.类注释 1.3.ThreadPoolExecutor重要属性 2.线程池的任务提交 3.线程执行完任务之后都在干啥 4.总结 引导语 线程池我们在工作中经常会用到.在请求量大时,使用线程池,可以充分利用机器资源,增加请求的处理速度,本章节我们就和大家一起来学习线程池. 本章的顺序,先说源码,弄懂原理,接着看一看面试题,最后看看实际工作中是如何运用线程池的. 1.整体架构图 我们画了线程池的整体图,如下: 本小节主要就按照这个图来进行 Thre

  • java底层AQS实现类kReentrantLock锁的构成及源码解析

    目录 引导语 1.类注释 2.类结构 3.构造器 4.Sync同步器 4.1.nonfairTryAcquire 4.2.tryRelease 5.FairSync公平锁 6.NonfairSync非公平锁 7.如何串起来 7.1lock加锁 7.2tryLock尝试加锁 7.3unlock释放锁 7.4Condition 8.总结 引导语 本章的描述思路是先描述清楚 ReentrantLock 的构成组件,然后使用加锁和释放锁的方法把这些组件串起来. 1.类注释 ReentrantLock 中

随机推荐