Java 多线程并发AbstractQueuedSynchronizer详情

目录
  • AbstractQueuedSynchronizer
    • 核心思想
    • 为什么需要 AQS
  • 用法
    • 用法示例
  • AQS 底层原理
    • 父类 AbstractOwnableSynchronizer
    • CLH 队列
    • Condition
    • 用于等待的方法
    • 用于唤醒的方法
    • ConditionObject
    • Signalling methods
    • Waiting methods
    • enableWait
    • canReacquire
    • unlinkCancelledWaiters
    • 对外提供的等待方法
    • awaitUninterruptibly
    • await
    • awaitNanos
    • awaitUntil
    • await(long, TimeUnit)
    • acquire 方法

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer 简称 AQS ,抽象队列同步器,用来实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关同步器的框架。这个类旨在为大多数依赖单个原子 int 值来表示同步状态的同步器提供基础的能力封装。 例如 ReentrantLock、Semaphore 和 FutureTask 等等都是基于 AQS 实现的,我们也可以继承 AQS 实现自定义同步器。

核心思想

网络上常见的解释是:

如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。

个人理解,可以把 AQS 当成一把锁,它内部通过一个队列记录了所有要使用锁的请求线程,并且管理锁自己当前的状态(锁定、空闲等状态)。相当于 AQS 就是共享资源本身,当有线程请求这个资源是,AQS 将请求资源的线程记录当前工作线程,并将自身设置为锁定状态。后续其他线程请求这个 AQS 时,将请求线程记录到等待队列中,其他线程此时未获取到锁,进入阻塞等待状态。

为什么需要 AQS

在深入 AQS 前,我们应该持有一个疑问是为什么需要 AQS ?synchronized 关键字和 CAS 原子类都提供了丰富的同步方案了。

但在实际的需求中,对同步的需求是各式各样的,比如,我们需要对一个锁加上超时时间,那么光凭 synchronized 关键字或是 CAS 就无法实现了,需要对其进行二次封装。而 JDK 中提供了丰富的同步方案,比如 ReentrantLock ,而 ReentrantLock 是就是基于 AQS 实现的。

用法

这部分内容来自 JDK 的注释

要将此类用作同步器的基础,请在适用时重新定义以下方法,方法是使用 getState、setState 和/或 compareAndSetState 检查和/或修改同步状态:

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

默认情况下,这些方法中的每一个都会引发 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该是短暂的而不是阻塞的。 定义这些方法是使用此类的唯一受支持的方法。 所有其他方法都被声明为最终方法,因为它们不能独立变化。

您可能还会发现从 AbstractOwnableSynchronizer 继承的方法对于跟踪拥有独占同步器的线程很有用。 鼓励您使用它们——这使监视和诊断工具能够帮助用户确定哪些线程持有锁。

即使此类基于内部 FIFO 队列,它也不会自动执行 FIFO 采集策略。

独占同步的核心形式为:

   Acquire:
       while (!tryAcquire(arg)) {
          enqueue thread if it is not already queued;
          possibly block current thread;
       }
  
   Release:
       if (tryRelease(arg))
          unblock the first queued thread;

(共享模式类似,但可能涉及级联信号。)

因为在入队之前调用了获取中的检查,所以新获取的线程可能会抢在其他被阻塞和排队的线程之前。 但是,如果需要,您可以定义 tryAcquire 和/或 tryAcquireShared 以通过内部调用一个或多个检查方法来禁用插入,从而提供公平的 FIFO 获取顺序。 特别是,如果 hasQueuedPredecessors(一种专门为公平同步器使用的方法)返回 true,大多数公平同步器可以定义 tryAcquire 返回 false。 其他变化是可能的。

默认插入(也称为贪婪、放弃和避免护送)策略的吞吐量和可扩展性通常最高。 虽然这不能保证公平或无饥饿,但允许较早排队的线程在较晚的排队线程之前重新竞争,并且每次重新竞争都有无偏见的机会成功对抗传入线程。 此外,虽然获取不是通常意义上的“旋转”,但它们可能会在阻塞之前执行多次调用 tryAcquire 并穿插其他计算。 当独占同步只是短暂地保持时,这提供了自旋的大部分好处,而没有大部分责任。 如果需要,您可以通过预先调用获取具有“快速路径”检查的方法来增加这一点,可能会预先检查 hasContended 和/或 hasQueuedThreads 以仅在同步器可能不会被争用时才这样做。

此类通过将其使用范围专门用于可以依赖 int 状态、获取和释放参数以及内部 FIFO 等待队列的同步器,部分地为同步提供了高效且可扩展的基础。 如果这还不够,您可以使用原子类、您自己的自定义 java.util.Queue 类和 LockSupport 阻塞支持从较低级别构建同步器。

用法示例

这是一个不可重入互斥锁类,它使用值 0 表示未锁定状态,使用值 1 表示锁定状态。 虽然不可重入锁并不严格要求记录当前所有者线程,但无论如何,此类都会这样做以使使用情况更易于监控。

它还支持条件并公开一些检测方法:

class Mutex implements Lock, java.io.Serializable {
​
   // Our internal helper class
   private static class Sync extends AbstractQueuedSynchronizer {
     // Acquires the lock if state is zero
     public boolean tryAcquire(int acquires) {
       assert acquires == 1; // Otherwise unused
       if (compareAndSetState(0, 1)) {
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }
​
     // Releases the lock by setting state to zero
     protected boolean tryRelease(int releases) {
       assert releases == 1; // Otherwise unused
       if (!isHeldExclusively())
         throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }​
     // Reports whether in locked state
     public boolean isLocked() {
       return getState() != 0;
     }
     public boolean isHeldExclusively() {
       // a data race, but safe due to out-of-thin-air guarantees
       return getExclusiveOwnerThread() == Thread.currentThread();
     }​
     // Provides a Condition
     public Condition newCondition() {
       return new ConditionObject();
     }
     // Deserializes properly
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }
   // The sync object does all the hard work. We just forward to it.
   private final Sync sync = new Sync();
   public void lock()              { sync.acquire(1); }
   public boolean tryLock()        { return sync.tryAcquire(1); }
   public void unlock()            { sync.release(1); }
   public Condition newCondition() { return sync.newCondition(); }
   public boolean isLocked()       { return sync.isLocked(); }
   public boolean isHeldByCurrentThread() {
     return sync.isHeldExclusively();
   }
   public boolean hasQueuedThreads() {
     return sync.hasQueuedThreads();
   }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

这是一个类似于 CountDownLatch 的锁存器类,只是它只需要一个信号即可触发。 因为锁存器是非独占的,所以它使用共享的获取和释放方法。

 class BooleanLatch {​
   private static class Sync extends AbstractQueuedSynchronizer {
     boolean isSignalled() { return getState() != 0; }
     protected int tryAcquireShared(int ignore) {
       return isSignalled() ? 1 : -1;
     }
     protected boolean tryReleaseShared(int ignore) {
       setState(1);
       return true;
     }
   }
   private final Sync sync = new Sync();
   public boolean isSignalled() { return sync.isSignalled(); }
   public void signal()         { sync.releaseShared(1); }
   public void await() throws InterruptedException {
     sync.acquireSharedInterruptibly(1);
   }
 }

AQS 底层原理

父类 AbstractOwnableSynchronizer

AbstractQueuedSynchronizer 继承自 AbstractOwnableSynchronizer ,后者逻辑十分简单:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {​
    private static final long serialVersionUID = 3737899427754241961L;​
    protected AbstractOwnableSynchronizer() { }
    private transient Thread exclusiveOwnerThread;
    // 设置当前持有锁的线程
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

AbstractOwnableSynchronizer 只是定义了设置持有锁的线程的能力。

CLH 队列

AQS 的等待队列是 CLH (Craig , Landin , and Hagersten) 锁定队列的变体,CLH 锁通常用于自旋锁。AQS 将每个请求共享资源的线程封装程一个 CLH 节点来实现的,这个节点的定义是:

    /** CLH Nodes */
    abstract static class Node {
        volatile Node prev;       // initially attached via casTail
        volatile Node next;       // visibly nonnull when signallable
        Thread waiter;            // visibly nonnull when enqueued
        volatile int status;      // written by owner, atomic bit ops by others
​
        // methods for atomic operations
        final boolean casPrev(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, PREV, c, v); // 通过 CAS 确保同步设置 prev 的值
        }
        final boolean casNext(Node c, Node v) {  // for cleanQueue
            return U.weakCompareAndSetReference(this, NEXT, c, v);
        }
        final int getAndUnsetStatus(int v) {     // for signalling
            return U.getAndBitwiseAndInt(this, STATUS, ~v);
        }
        final void setPrevRelaxed(Node p) {      // for off-queue assignment
            U.putReference(this, PREV, p);
        }
        final void setStatusRelaxed(int s) {     // for off-queue assignment
            U.putInt(this, STATUS, s);
        }
        final void clearStatus() {               // for reducing unneeded signals
            U.putIntOpaque(this, STATUS, 0);
        }
        private static final long STATUS = U.objectFieldOffset(Node.class, "status");
        private static final long NEXT = U.objectFieldOffset(Node.class, "next");
        private static final long PREV = U.objectFieldOffset(Node.class, "prev");
    }

CLH 的节点的数据结构是一个双向链表的节点,只不过每个操作都是经过 CAS 确保线程安全的。要加入 CLH 锁队列,您可以将其自动拼接为新的尾部;要出队,需要设置 head 字段,以便下一个符合条件的等待节点成为新的头节点:

 +------+  prev +-------+  prev +------+
 |      | <---- |       | <---- |      |
 | head | next  | first | next  | tail |
 |      | ----> |       | ----> |      |
 +------+       +-------+       +------+

Node 中的 status 字段表示当前节点代表的线程的状态。

status 存在三种状态:

    static final int WAITING   = 1;          // must be 1
    static final int CANCELLED = 0x80000000; // must be negative
    static final int COND      = 2;          // in a condition wait
  • WAITING:表示等待状态,值为 1。
  • CANCELLED:表示当前线程被取消,为 0x80000000。
  • COND:表示当前节点在等待条件,也就是在条件等待队列中,值为 2。

在上面的 COND 中,提到了一个条件等待队列的概念。

首先,Node 是一个静态抽象类,它在 AQS 中存在三种实现类:

  • ExclusiveNode
  • SharedNode
  • ConditionNode

前两者都是空实现:

    static final class ExclusiveNode extends Node { }
    static final class SharedNode extends Node { }

而最后的 ConditionNode 多了些内容:

    static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker {
        ConditionNode nextWaiter;
        // 检查线程是否中断或当前线程的状态已取消等待。
        public final boolean isReleasable() {
            return status <= 1 || Thread.currentThread().isInterrupted();
        }
​
        public final boolean block() {
            while (!isReleasable()) LockSupport.park();
            return true;
        }
    }

ConditionNode 拓展了两个方法:

  • 检查线程状态是否处于等待。
  • 阻塞当前线程:当前线程正在等待执行,通过 LockSupport.park() 阻塞当前线程。这里通过 while 循环持续重试,尝试阻塞线程。

而到这一步,所有的信息都指向了一个相关的类 Condition 。

Condition

AQS 中的 Condition 的实现是内部类 ConditionObject :

public class ConditionObject implements Condition, java.io.Serializable 

ConditionObject 实现了 Condition 接口和序列化接口,后者说明了该类型的对象可以进行序列化。而前者 Condition 接口,定义了一些行为能力:

public interface Condition {
    void await() throws InterruptedException;​
    void awaitUninterruptibly();​
    long awaitNanos(long nanosTimeout) throws InterruptedException;​
    boolean await(long time, TimeUnit unit) throws InterruptedException;​
    boolean awaitUntil(Date deadline) throws InterruptedException;​
    void signal();
    void signalAll();
}

Condition 中定义的能力与 Java 的 Object 类中提供的同步相关方法(wait、notify 和 notifyAll) 代表的能力极为相似。前者提供了更丰富的等待方法。类比的角度来看,如果 Object 是配合 synchronized 关键字使用的,那么 Condition 就是用来配合基于 AQS 实现的锁来使用的接口。

可以将 Condition 的方法分为两组:等待和唤醒。

用于等待的方法

// 等待,当前线程在接到信号或被中断之前一直处于等待状态    
void await() throws InterruptedException;
// 等待,当前线程在接到信号之前一直处于等待状态,不响应中断
void awaitUninterruptibly();
//等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
// 此方法在行为上等效于: awaitNanos(unit.toNanos(time)) > 0
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态    
boolean awaitUntil(Date deadline) throws InterruptedException;

用于唤醒的方法

// 唤醒一个等待线程。如果所有的线程都在等待此条件,则选择其中的一个唤醒。在从 await 返回之前,该线程必须重新获取锁。
void signal();
// 唤醒所有等待线程。如果所有的线程都在等待此条件,则唤醒所有线程。在从 await 返回之前,每个线程都必须重新获取锁。
void signalAll();

ConditionObject

分析完 Condition ,继续来理解 ConditionObject。 ConditionObject 是 Condition 在 AQS 中的实现:

public class ConditionObject implements Condition, java.io.Serializable {
    /** condition 队列头节点 */
    private transient ConditionNode firstWaiter;
    /** condition 队列尾节点 */
    private transient ConditionNode lastWaiter;
    // ---- Signalling methods ----
    // 移除一个或所有等待者并将其转移到同步队列。
    private void doSignal(ConditionNode first, boolean all)
    public final void signal()
    public final void signalAll()​
    // ---- Waiting methods ----
    // 将节点添加到条件列表并释放锁定。
    private int enableWait(ConditionNode node)
    // 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。
    private boolean canReacquire(ConditionNode node) ​
    // 从条件队列中取消链接给定节点和其他非等待节点,除非已经取消链接。
    private void unlinkCancelledWaiters(ConditionNode node)
    // 实现不可中断的条件等待
    public final void awaitUninterruptibly()​
    public final void await()​
    public final long awaitNanos(long nanosTimeout)​
    public final boolean awaitUntil(Date deadline)​
    public final boolean await(long time, TimeUnit unit)​
    //  ---- support for instrumentation ----​
    // 如果此条件是由给定的同步对象创建的,则返回 true。
    final boolean isOwnedBy(AbstractQueuedSynchronizer sync)​
    // 查询是否有线程在此条件下等待。
    protected final boolean hasWaiters()​
    // 返回在此条件下等待的线程数的估计值。
    protected final int getWaitQueueLength()
    // 返回一个集合,其中包含可能正在等待此 Condition 的那些线程。
    protected final Collection<Thread> getWaitingThreads()
}

ConditionObject 实现了 Condition 能力的基础上,拓展了对 ConditionNode 相关的操作,方法通过其用途可以划分为三组:

  • Signalling
  • Waiting
  • 其他方法

Signalling methods

        public final void signal() {
            ConditionNode first = firstWaiter;
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            if (first != null)
                doSignal(first, false);
        }
        public final void signalAll() {
            ConditionNode first = firstWaiter;
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            if (first != null)
                doSignal(first, true);
        }

唤醒方法主要逻辑是通过 doSignal(ConditionNode first, boolean all) 实现的。doSignal 方法根据参数,进行一个 while 循环,

两个方法传递进来的都是头节点,也就是从 ConditionNode 双向链表的头节点开始遍历,如果第二个参数 all 设置为 false ,只执行一次遍历中逻辑。循环中的逻辑是:

// 最终都调用了这个方法
private void doSignal(ConditionNode first, boolean all) {
    while (first != null) {
        // 取出 first 的下一个节点,设置为 next
        ConditionNode next = first.nextWaiter;
        // 如果 first 是链表中唯一的一个节点,设置 lastWaiter 为 null
        if ((firstWaiter = next) == null) //
            lastWaiter = null;
        // 读取 first 的 status ,检查是否是 COND
        if ((first.getAndUnsetStatus(COND) & COND) != 0) {
            // first 处于 COND 状态,出队
            enqueue(first);
            // 通过 all 来判断是否将等待的线程都进行唤醒逻辑。
            if (!all)
                break;  
        }
        first = next; // 循环指向下一个
    }
}

关键方法 enqueue(ConditionNode) 是 AQS 中的方法:

    final void enqueue(Node node) {
        if (node != null) {
            for (;;) {
                // 获取尾节点
                Node t = tail;
                // 避免不必要的内存屏障
                node.setPrevRelaxed(t);
                if (t == null)      
                    // 空队列首先初始化一个头节点
                    tryInitializeHead();
                else if (casTail(t, node)) { // 更新 tail 指针为 node (这里不是将 t = node)
                    t.next = node; // 为节点 t 的 next 指针指向 node
                    if (t.status < 0)  // t 的状态 < 0 一般代表后续节点需要运行了
                        LockSupport.unpark(node.waiter);
                    break;
                }
            }
        }
    }

可以看出 enqueue(ConditionNode) 中本质上是通过调用 LockSupport.unpark(node.waiter); 来唤醒线程的。

Waiting methods

对外提供的等待能力的方法包括:

    // 实现不可中断的条件等待
    public final void awaitUninterruptibly()
    public final void await()​
    public final long awaitNanos(long nanosTimeout)​
    public final boolean awaitUntil(Date deadline)
    public final boolean await(long time, TimeUnit unit)

它们内部都用到了公共的逻辑:

    // 添加节点到 condition 列表并释放锁
    private int enableWait(ConditionNode node)
    private boolean canReacquire(ConditionNode node)
    private void unlinkCancelledWaiters(ConditionNode node) 

enableWait

        private int enableWait(ConditionNode node) {
            if (isHeldExclusively()) { // 如果是当前线程持有锁资源
                node.waiter = Thread.currentThread();  // 将节点的绑定的线程设置为当前线程
                node.setStatusRelaxed(COND | WAITING); // 设置节点状态
                ConditionNode last = lastWaiter;       // 获取 尾节点
                if (last == null)
                    firstWaiter = node;                // 如果列表为空, node 就是头节点
                else
                    last.nextWaiter = node;            // 否则,将尾节点的下一个节点设置为 node
                lastWaiter = node;                     // 更新 lastWaiter 指针
                int savedState = getState();           // 获取当前线程的同步状态
                if (release(savedState))               // 在当前持有锁资源的线程尝试释放锁
                    return savedState;
            }
            node.status = CANCELLED; // 当前线程未持有锁资源,更新 node 的状态为 CANCELLED
            throw new IllegalMonitorStateException(); // 并抛出 IllegalMonitorStateException
        }

这个方法对传入的节点插入到等待队列的队尾,并根据当前线程的状态进行了检查。关键方法的 release(int) :

    public final boolean release(int arg) {
        if (tryRelease(arg)) { // 尝试释放锁资源
            signalNext(head);  // 释放成功,唤醒下一个等待中的线程
            return true;
        }
        return false;
    }

唤醒给定节点的下一个节点(如果存在),通过调用 LockSupport.unpark(s.waiter) 唤醒节点对应的线程。

    private static void signalNext(Node h) {
        Node s;
        if (h != null && (s = h.next) != null && s.status != 0) {
            s.getAndUnsetStatus(WAITING);
            LockSupport.unpark(s.waiter);
        }
    }

canReacquire

检查传入的 node 是否在链表中,且不为头节点:

// 如果最初放置在条件队列中的节点现在准备好重新获取同步队列,则返回 true。
private boolean canReacquire(ConditionNode node) {
    // 检查传入的 node 是否在链表中,且不为头节点
    return node != null && node.prev != null && isEnqueued(node);
}
// in AQS
final boolean isEnqueued(Node node) {
    // 从 Node 双向链表尾部开始遍历,是否存在 node
    for (Node t = tail; t != null; t = t.prev)
        if (t == node)
            return true;
    return false;
}

unlinkCancelledWaiters

        private void unlinkCancelledWaiters(ConditionNode node) {
            // node 为空 / node 不是队尾 / node 是最后一个节点
            if (node == null || node.nextWaiter != null || node == lastWaiter) {
                ConditionNode w = firstWaiter, trail = null; // w = first , trail = null
                // /从链表头节点开始遍历
                while (w != null) {
                    ConditionNode next = w.nextWaiter;  // 取出下一个节点
                    if ((w.status & COND) == 0) {       // 当前节点的状态包含 COND
                        w.nextWaiter = null;            // 当前节点的 next 设置为 null
                        if (trail == null)              // 如果 trail 指针为空
                            firstWaiter = next;         // firstWaiter 指向 next
                        else
                            trail.nextWaiter = next;    // trail 指针不为空,尾指针的 next 指向当前节点的下一个节点
                        if (next == null)
                            lastWaiter = trail; // 最后将 lastWaiter 设置为 trail (过滤后的 trail 链表插入到队尾)
                    } else
                        trail = w; // 头节点状态不是 COND,当前节点设置为 trail 指针。
                    w = next; // 下一个循环
                }
            }
        }

这个方法遍历 ConditionNode 队列,过滤掉状态不包含 COND 的节点。

对外提供的等待方法

上面三个方法是内部处理逻辑。而对外暴露的是以下五个方法:

    public final void awaitUninterruptibly()​
    public final void await()​
    public final long awaitNanos(long nanosTimeout)​
    public final boolean awaitUntil(Date deadline)​
    public final boolean await(long time, TimeUnit unit)

除了awaitUninterruptibly() ,其他方法所代表的能力和 Condition 接口中定义的所代表的能力基本一致。

awaitUninterruptibly

awaitUninterruptibly() 是用于实现不可中断的条件等待:

        public final void awaitUninterruptibly() {
            ConditionNode node = new ConditionNode(); // 创建一个新的 node
            int savedState = enableWait(node);        // 将这个新 node 插入,并返回 node 的状态
            LockSupport.setCurrentBlocker(this);      // 设置 blocker
            boolean interrupted = false, rejected = false;  // flag:中断和拒绝
            while (!canReacquire(node)) {             // 当前线程关联的 node 不再等待队列
                if (Thread.interrupted())             // 尝试中断线程
                    interrupted = true;
                else if ((node.status & COND) != 0) {  // 中断线程不成功的情况下,如果 node 状态包含 COND
                    // 尝试阻塞线程
                    try {
                        if (rejected)  
                            node.block(); // 实际上也是 LockSupport.park
                        else
                            ForkJoinPool.managedBlock(node);
                    } catch (RejectedExecutionException ex) {
                        rejected = true;    // 拒绝执行
                    } catch (InterruptedException ie) {
                        interrupted = true;   // 中断
                    }
                } else
                    Thread.onSpinWait();        // 当前线程无法继续执行
            }
            // 不是队列中的唯一节点时执行下面逻辑
            LockSupport.setCurrentBlocker(null);
            node.clearStatus();   // 清除 node 的 status
            acquire(node, savedState, false, false, false, 0L); // 【*】重点方法
            if (interrupted)
                Thread.currentThread().interrupt();
        }

在这个方法中,首先讲解两个方法:

  • Thread.onSpinWait() 表示调用者暂时无法继续,直到其他活动发生一个或多个动作。 通过在自旋等待循环构造的每次迭代中调用此方法,调用线程向运行时指示它正忙于等待。 运行时可能会采取措施来提高调用自旋等待循环构造的性能。
  • ForkJoinPool.managedBlock(node) 则是通过 Blocker 来检查线程的运行状态,然后尝试阻塞线程。

最后是最关键的方法 acquire ,它的详细逻辑放到最后讲解, 这个方法的作用就是,当前线程进入等待后,需要将关联的线程开启一个自旋,挂起后能够持续去尝试获取锁资源。

await

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            ConditionNode node = new ConditionNode();
            int savedState = enableWait(node);
            LockSupport.setCurrentBlocker(this); // for back-compatibility
            boolean interrupted = false, cancelled = false, rejected = false;
            while (!canReacquire(node)) {
                if (interrupted |= Thread.interrupted()) {
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;              // else interrupted after signal
                } else if ((node.status & COND) != 0) {
                    try {
                        if (rejected)
                            node.block();
                        else
                            ForkJoinPool.managedBlock(node);
                    } catch (RejectedExecutionException ex) {
                        rejected = true;
                    } catch (InterruptedException ie) {
                        interrupted = true;
                    }
                } else
                    Thread.onSpinWait();    // awoke while enqueuing
            }
            LockSupport.setCurrentBlocker(null);
            node.clearStatus();
            acquire(node, savedState, false, false, false, 0L);
            if (interrupted) {
                if (cancelled) {
                    unlinkCancelledWaiters(node);
                    throw new InterruptedException();
                }
                Thread.currentThread().interrupt();
            }
        }

await() 方法相较于 awaitUninterruptibly(),while 逻辑基本一致,最后多了一步 cancelled 状态检查,如果 cancelled = true ,调用 unlinkCancelledWaiters(node),去清理等待队列。

awaitNanos

awaitNanos(long) 在 await() 之上多了对超时时间的计算和处理逻辑:

        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            ConditionNode node = new ConditionNode();
            int savedState = enableWait(node);
            long nanos = (nanosTimeout < 0L) ? 0L : nanosTimeout;
            long deadline = System.nanoTime() + nanos;
            boolean cancelled = false, interrupted = false;
            while (!canReacquire(node)) {
                if ((interrupted |= Thread.interrupted()) ||
                    (nanos = deadline - System.nanoTime()) <= 0L) { // 多了一个超时条件
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;
                } else
                    LockSupport.parkNanos(this, nanos);
            }
            node.clearStatus();
            acquire(node, savedState, false, false, false, 0L);
            if (cancelled) {
                unlinkCancelledWaiters(node);
                if (interrupted)
                    throw new InterruptedException();
            } else if (interrupted)
                Thread.currentThread().interrupt();
            long remaining = deadline - System.nanoTime(); // avoid overflow
            return (remaining <= nanosTimeout) ? remaining : Long.MIN_VALUE;
        }

awaitUntil

awaitUntil(Date) 和 awaitNanos(long) 同理,只是将超时计算改成了日期计算:

            long abstime = deadline.getTime();
            // ...
            boolean cancelled = false, interrupted = false;
            while (!canReacquire(node)) {
                if ((interrupted |= Thread.interrupted()) ||
                    System.currentTimeMillis() >= abstime) { // 时间检查
                    if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
                        break;
                } else
                    LockSupport.parkUntil(this, abstime);
            }

await(long, TimeUnit)

await(long, TimeUnit) 则是逻辑更加与 awaitNanos(long) 相似了, 只是多了一步计算 awaitNanos(long nanosTimeout) 中的参数 nanosTimeout 的操作:

long nanosTimeout = unit.toNanos(time);

acquire 方法

在 wait 方法组中,最终都会调用到这个逻辑:

    final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) {
        Thread current = Thread.currentThread();
        byte spins = 0, postSpins = 0;   // 在取消第一个线程时重试
        boolean interrupted = false, first = false;
        Node pred = null;                // 入队时节点的前一个指针
        /*
         * 反复执行:
         *  检查当前节点是否是 first
         *  若是, 确保 head 稳定,否则确保有效的 prev
         *  如果节点是第一个或尚未入队,尝试获取
         *  否则,如果节点尚未创建,则创建这个它
         *  否则,如果节点尚未入队,尝试入队一次
         *  否则,如果通过 park 唤醒,重试,最多 postSpins 次
         *  否则,如果 WAITING 状态未设置,设置并重试
         *  否则,park 并且清除 WAITING 状态, 检查取消逻辑
         */
        for (;;) {
            if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) {
                if (pred.status < 0) {
                    cleanQueue();           // predecessor cancelled
                    continue;
                } else if (pred.prev == null) {
                    Thread.onSpinWait();    // ensure serialization
                    continue;
                }
            }
            if (first || pred == null) {
                boolean acquired;
                try {
                    if (shared)
                        acquired = (tryAcquireShared(arg) >= 0);
                    else
                        acquired = tryAcquire(arg);
                } catch (Throwable ex) {
                    cancelAcquire(node, interrupted, false);
                    throw ex;
                }
                if (acquired) {
                    if (first) {
                        node.prev = null;
                        head = node;
                        pred.next = null;
                        node.waiter = null;
                        if (shared)
                            signalNextIfShared(node);
                        if (interrupted)
                            current.interrupt();
                    }
                    return 1;
                }
            }
            if (node == null) {                 // allocate; retry before enqueue
                if (shared)
                    node = new SharedNode();
                else
                    node = new ExclusiveNode();
            } else if (pred == null) {          // try to enqueue
                node.waiter = current;
                Node t = tail;
                node.setPrevRelaxed(t);         // avoid unnecessary fence
                if (t == null)
                    tryInitializeHead();
                else if (!casTail(t, node))
                    node.setPrevRelaxed(null);  // back out
                else
                    t.next = node;
            } else if (first && spins != 0) {
                --spins;                        // reduce unfairness on rewaits
                Thread.onSpinWait();
            } else if (node.status == 0) {
                node.status = WAITING;          // enable signal and recheck
            } else {
                long nanos;
                spins = postSpins = (byte)((postSpins << 1) | 1);
                if (!timed)
                    LockSupport.park(this);
                else if ((nanos = time - System.nanoTime()) > 0L)
                    LockSupport.parkNanos(this, nanos);
                else
                    break;
                node.clearStatus();
                if ((interrupted |= Thread.interrupted()) && interruptible)
                    break;
            }
        }
        return cancelAcquire(node, interrupted, interruptible);
    }

这个方法会在 Node 关联的线程让出锁资源后,开启一个死循环尝试通过 tryAcquire 尝试获取锁资源,最后如果超时或尝试次数超出限制,会通过 LockSupport.park 阻塞自身。

到此这篇关于Java 多线程并发AbstractQueuedSynchronizer详情的文章就介绍到这了,更多相关Java AbstractQueuedSynchronizer内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java并发系列之AbstractQueuedSynchronizer源码分析(条件队列)

    通过前面三篇的分析,我们深入了解了AbstractQueuedSynchronizer的内部结构和一些设计理念,知道了AbstractQueuedSynchronizer内部维护了一个同步状态和两个排队区,这两个排队区分别是同步队列和条件队列.我们还是拿公共厕所做比喻,同步队列是主要的排队区,如果公共厕所没开放,所有想要进入厕所的人都得在这里排队.而条件队列主要是为条件等待设置的,我们想象一下如果一个人通过排队终于成功获取锁进入了厕所,但在方便之前发现自己没带手纸,碰到这种情况虽然很无奈,但是它

  • java同步器AQS架构AbstractQueuedSynchronizer原理解析

    目录 引导语 1.整体架构 1.1.类注释 1.2.类定义 1.3.基本属性 1.3.1.简单属性 1.3.2.同步队列属性 1.3.3.条件队列的属性 1.3.4.Node 1.3.5.共享锁和排它锁的区别 1.4.Condition 2.同步器的状态 3.获取锁 3.1.acquire排它锁 3.1.1.addWaiter 3.1.2.acquireQueued 3.2.acquireShared获取共享锁 4.总结 引导语 AbstractQueuedSynchronizer 中文翻译叫做

  • Java并发系列之AbstractQueuedSynchronizer源码分析(概要分析)

    学习Java并发编程不得不去了解一下java.util.concurrent这个包,这个包下面有许多我们经常用到的并发工具类,例如:ReentrantLock, CountDownLatch, CyclicBarrier, Semaphore等.而这些类的底层实现都依赖于AbstractQueuedSynchronizer这个类,由此可见这个类的重要性.所以在Java并发系列文章中我首先对AbstractQueuedSynchronizer这个类进行分析,由于这个类比较重要,而且代码比较长,为了

  • java同步器AQS架构AbstractQueuedSynchronizer原理解析下

    目录 引导语 1.释放锁 1.1.释放排它锁release 1.2.释放共享锁releaseShared 2.条件队列的重要方法 2.1.入队列等待await 2.1.1.addConditionWaiter 2.1.2.unlinkCancelledWaiters 2.2.单个唤醒signal 2.3.全部唤醒signalAll 3.总结 引导语 AQS 的内容太多,所以我们分成了两个章节,没有看过 AQS 上半章节的同学可以回首看一下哈,上半章节里面说了很多锁的基本概念,基本属性,如何获得锁

  • Java并发系列之AbstractQueuedSynchronizer源码分析(共享模式)

    通过上一篇的分析,我们知道了独占模式获取锁有三种方式,分别是不响应线程中断获取,响应线程中断获取,设置超时时间获取.在共享模式下获取锁的方式也是这三种,而且基本上都是大同小异,我们搞清楚了一种就能很快的理解其他的方式.虽然说AbstractQueuedSynchronizer源码有一千多行,但是重复的也比较多,所以读者不要刚开始的时候被吓到,只要耐着性子去看慢慢的自然能够渐渐领悟.就我个人经验来说,阅读AbstractQueuedSynchronizer源码有几个比较关键的地方需要弄明白,分别是

  • Java并发系列之AbstractQueuedSynchronizer源码分析(独占模式)

    在上一篇<Java并发系列[1]----AbstractQueuedSynchronizer源码分析之概要分析>中我们介绍了AbstractQueuedSynchronizer基本的一些概念,主要讲了AQS的排队区是怎样实现的,什么是独占模式和共享模式以及如何理解结点的等待状态.理解并掌握这些内容是后续阅读AQS源码的关键,所以建议读者先看完我的上一篇文章再回过头来看这篇就比较容易理解.在本篇中会介绍在独占模式下结点是怎样进入同步队列排队的,以及离开同步队列之前会进行哪些操作.AQS为在独占模

  • Java同步框架AbstractQueuedSynchronizer详解

    AbstractQueuedSynchronizer概述 AbstractQueuedSynchronizer是java中非常重要的一个框架类,它实现了最核心的多线程同步的语义,我们只要继承AbstractQueuedSynchronizer就可以非常方便的实现我们自己的线程同步器,java中的锁Lock就是基于AbstractQueuedSynchronizer来实现的.下面首先展示了AbstractQueuedSynchronizer类提供的一些方法: AbstractQueuedSynch

  • Java 多线程并发AbstractQueuedSynchronizer详情

    目录 AbstractQueuedSynchronizer 核心思想 为什么需要 AQS 用法 用法示例 AQS 底层原理 父类 AbstractOwnableSynchronizer CLH 队列 Condition 用于等待的方法 用于唤醒的方法 ConditionObject Signalling methods Waiting methods enableWait canReacquire unlinkCancelledWaiters 对外提供的等待方法 awaitUninterrupt

  • Java 多线程并发 ReentrantReadWriteLock详情

    目录 前言 ReadWriteLock ReentrantReadWriteLock 源码分析 类关系 Sync HoldCounter ThreadLocalHoldCounter 属性 构造方法 核心方法 锁的计数方法 读写锁阻塞检查方法 公平策略实现 FairSync 和非公平策略实现 NonfairSync NonfairSync 非公平策略 FairSync 公平策略 Release 和 Acquire 方法组 ReadLock WriteLock 读写锁降级 总结 前言 Reentr

  • Java 多线程并发编程_动力节点Java学院整理

    一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

  • Java 多线程并发ReentrantLock

    目录 背景 ReentrantLock 可重入特性 公平锁设置参数 源码分析 Lock 接口 加锁操作 内部类 Sync tryLock initialTryLock lock lockInterruptibly tryLockNanos tryRelease newCondition NonfairSync 非公平锁 FairSync 构造函数 核心属性和方法 总结 背景 在 Java 中实现线程安全的传统方式是 synchronized 关键字,虽然它提供了一定的同步能力,但它在使用上是严格

  • Java多线程并发开发之DelayQueue使用示例

    在学习Java 多线程并发开发过程中,了解到DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走.这种队列是有序的,即队头对象的延迟到期时间最长.注意:不能将null元素放置到这种队列中. Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象.此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序. 在网上看到了一些

  • Java多线程并发编程 Volatile关键字

    volatile 关键字是一个神秘的关键字,也许在 J2EE 上的 JAVA 程序员会了解多一点,但在 Android 上的 JAVA 程序员大多不了解这个关键字.只要稍了解不当就好容易导致一些并发上的错误发生,例如好多人把 volatile 理解成变量的锁.(并不是) volatile 的特性: 具备可见性 保证不同线程对被 volatile 修饰的变量的可见性. 有一被 volatile 修饰的变量 i,在一个线程中修改了此变量 i,对于其他线程来说 i 的修改是立即可见的. 如: vola

  • Java多线程并发编程(互斥锁Reentrant Lock)

    Java 中的锁通常分为两种: 通过关键字 synchronized 获取的锁,我们称为同步锁,上一篇有介绍到:Java 多线程并发编程 Synchronized 关键字. java.util.concurrent(JUC)包里的锁,如通过继承接口 Lock 而实现的 ReentrantLock(互斥锁),继承 ReadWriteLock 实现的 ReentrantReadWriteLock(读写锁). 本篇主要介绍 ReentrantLock(互斥锁). ReentrantLock(互斥锁)

  • Java多线程并发编程和锁原理解析

    这篇文章主要介绍了Java多线程并发编程和锁原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.前言 最近项目遇到多线程并发的情景(并发抢单&恢复库存并行),代码在正常情况下运行没有什么问题,在高并发压测下会出现:库存超发/总库存与sku库存对不上等各种问题. 在运用了 限流/加锁等方案后,问题得到解决. 加锁方案见下文. 二.乐观锁 & 悲观锁 1.乐观锁 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁

  • Java多线程并发与并行和线程与进程案例

    目录 一.并发与并行 二.线程与进程 三.创建线程类 前言: 程序在没有跳转语句的前提下,都是由上至下依次执行,那现在想要设计一个程序,边打游戏边听歌,怎么设计? 要解决上述问题,咱们得使用多进程或者多线程来解决. 一.并发与并行 并发:指两个或多个事件在同一个时间段内发生. 并行:指两个或多个事件在同一时刻发生(同时发生). 在操作系统中,安装了多个程序,并发指的是在一段时间内宏观上有多个程序同时运行,这在单 CPU 系统中,每一时刻只能有一道程序执行,即微观上这些程序是分时的交替运行,只不过

  • Java多线程并发synchronized 关键字

    目录 基础 修饰普通方法 修饰静态方法 Synchronized 加锁原理 monitorenter monitorexit synchronized 修饰静态方法 优点.缺点及优化 其他说明 基础 Java 在虚拟机层面提供了 synchronized 关键字供开发者快速实现互斥同步的重量级锁来保障线程安全. synchronized 关键字可用于两种场景: 修饰方法. 持有一个对象,并执行一个代码块. 而根据加锁的对象不同,又分为两种情况: 对象锁 类对象锁 以下代码示例是 synchron

随机推荐