Java同步框架AbstractQueuedSynchronizer详解

AbstractQueuedSynchronizer概述

AbstractQueuedSynchronizer是java中非常重要的一个框架类,它实现了最核心的多线程同步的语义,我们只要继承AbstractQueuedSynchronizer就可以非常方便的实现我们自己的线程同步器,java中的锁Lock就是基于AbstractQueuedSynchronizer来实现的。下面首先展示了AbstractQueuedSynchronizer类提供的一些方法:

AbstractQueuedSynchronizer类方法

在类结构上,AbstractQueuedSynchronizer继承了AbstractOwnableSynchronizer,AbstractOwnableSynchronizer仅有的两个方法是提供当前独占模式的线程设置:

  /**
   * The current owner of exclusive mode synchronization.
   */
  private transient Thread exclusiveOwnerThread;

  /**
   * Sets the thread that currently owns exclusive access.
   * A {@code null} argument indicates that no thread owns access.
   * This method does not otherwise impose any synchronization or
   * {@code volatile} field accesses.
   * @param thread the owner thread
   */
  protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
  }

  /**
   * Returns the thread last set by {@code setExclusiveOwnerThread},
   * or {@code null} if never set. This method does not otherwise
   * impose any synchronization or {@code volatile} field accesses.
   * @return the owner thread
   */
  protected final Thread getExclusiveOwnerThread() {
    return exclusiveOwnerThread;
  }

exclusiveOwnerThread代表的是当前获得同步的线程,因为是独占模式,在exclusiveOwnerThread持有同步的过程中其他的线程的任何同步获取请求将不能得到满足。

至此,需要说明的是,AbstractQueuedSynchronizer不仅支持独占模式下的同步实现,还支持共享模式下的同步实现。在java的锁的实现上就有共享锁和独占锁的区别,而这些实现都是基于AbstractQueuedSynchronizer对于共享同步和独占同步的支持。从上面展示的AbstractQueuedSynchronizer提供的方法中,我们可以发现AbstractQueuedSynchronizer的API大概分为三类:

  • 类似acquire(int)的一类是最基本的一类,不可中断
  • 类似acquireInterruptibly(int)的一类可以被中断
  • 类似tryAcquireNanos(int, long)的一类不仅可以被中断,而且可以设置阻塞时间

上面的三种类型的API分为独占和共享两套,我们可以根据我们的需求来使用合适的API来做多线程同步。

下面是一个继承AbstractQueuedSynchronizer来实现自己的同步器的一个示例:

class Mutex implements Lock, java.io.Serializable {

  // Our internal helper class
  private static class Sync extends AbstractQueuedSynchronizer {
   // Reports whether in locked state
   protected boolean isHeldExclusively() {
    return getState() == 1;
   }

   // Acquires the lock if state is zero
   public boolean tryAcquire(int acquires) {
    assert acquires == 1; // Otherwise unused
    if (compareAndSetState(0, 1)) {
     setExclusiveOwnerThread(Thread.currentThread());
     return true;
    }
    return false;
   }

   // Releases the lock by setting state to zero
   protected boolean tryRelease(int releases) {
    assert releases == 1; // Otherwise unused
    if (getState() == 0) throw new IllegalMonitorStateException();
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
   }

   // Provides a Condition
   Condition newCondition() { return new ConditionObject(); }

   // Deserializes properly
   private void readObject(ObjectInputStream s)
     throws IOException, ClassNotFoundException {
    s.defaultReadObject();
    setState(0); // reset to unlocked state
   }
  }

  // The sync object does all the hard work. We just forward to it.
  private final Sync sync = new Sync();

  public void lock()        { sync.acquire(1); }
  public boolean tryLock()     { return sync.tryAcquire(1); }
  public void unlock()       { sync.release(1); }
  public Condition newCondition()  { return sync.newCondition(); }
  public boolean isLocked()     { return sync.isHeldExclusively(); }
  public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
  public void lockInterruptibly() throws InterruptedException {
   sync.acquireInterruptibly(1);
  }
  public boolean tryLock(long timeout, TimeUnit unit)
    throws InterruptedException {
   return sync.tryAcquireNanos(1, unit.toNanos(timeout));
  }
 }}

Mutex实现的功能是:使用0来代表可以获得同步变量,使用1来代表需要等待同步变量被释放再获取,这是一个简单的独占锁实现,任何时刻只会有一个线程获得锁,其他请求获取锁的线程都会阻塞等待直到锁被释放,等待的线程将再次竞争来获得锁。Mutex给了我们很好的范例,我们要实现自己的线程同步器,那么就继承AbstractQueuedSynchronizer实现其三个抽象方法,然后使用该实现类来做lock和unlock的操作,可以发现,AbstractQueuedSynchronizer框架为我们铺平了道路,我们只需要做一点点改变就可以实现高效安全的线程同步去,下文中将分析AbstractQueuedSynchronizer是如何为我么提供如此强大得同步能力的。

AbstractQueuedSynchronizer实现细节

独占模式

AbstractQueuedSynchronizer使用一个volatile类型的int来作为同步变量,任何想要获得锁的线程都需要来竞争该变量,获得锁的线程可以继续业务流程的执行,而没有获得锁的线程会被放到一个FIFO的队列中去,等待再次竞争同步变量来获得锁。AbstractQueuedSynchronizer为每个没有获得锁的线程封装成一个Node再放到队列中去,下面先来分析一下Node这个数据结构:

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED = 1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL  = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

上面展示的是Node定义的四个状态,需要注意的是只有一个状态是大于0的,也就是CANCELLED,也就是被取消了,不需要为此线程协调同步变量的竞争了。其他几个的意义见注释。上一小节说到,AbstractQueuedSynchronizer提供独占式和共享式两种模式,AbstractQueuedSynchronizer使用下面的两个变量来标志是共享还是独占模式:

    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;

有趣的是,Node使用了一个变量nextWaiter来代表两种含义,当在独占模式下,nextWaiter表示下一个等在ConditionObject上的Node,在共享模式下就是SHARED,因为对于任何一个同步器来说,都不可能同时实现共享和独占两种模式的,更为专业的解释为:

    /**
     * Link to next node waiting on condition, or the special
     * value SHARED. Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;

AbstractQueuedSynchronizer使用双向链表来管理请求同步的Node,保存了链表的head和tail,新的Node将会被插到链表的尾端,而链表的head总是代表着获得锁的线程,链表头的线程释放了锁之后会通知后面的线程来竞争共享变量。下面分析一下AbstractQueuedSynchronizer是如何实现独占模式下的acquire和release的。

首先,使用方法acquire(int)可以竞争同步变量,下面是调用链路:

  public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
  }

  private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
        pred.next = node;
        return node;
      }
    }
    enq(node);
    return node;
  }

  final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
        final Node p = node.predecessor();
        if (p == head && tryAcquire(arg)) {
          setHead(node);
          p.next = null; // help GC
          failed = false;
          return interrupted;
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  } 

首先会调用方法tryAcquire来尝试获的锁,而tryAcquire这个方法是需要子类来实现的,子类的实现无非就是通过compareAndSetState、getState、setState三个方法来操作同步变量state,子类的方法实现需要根据各自的需求场景来实现。继续分析上面的acquire流程,如果tryAcquire返回true了,也就是成功改变了state的值了,也就是获得了同步锁了,那么就可以退出了。如果返回false,说明有其他的线程获得锁了,这个时候AbstractQueuedSynchronizer会使用addWaiter将当前线程添加到等待队列的尾部等待再次竞争。需要注意的是将当前线程标记为了独占模式。然后重头戏来了,方法acquireQueued使得新添加的Node在一个for死循环中不断的轮询,也就是自旋,acquireQueued方法退出的条件是:

  1. 该节点的前驱节点是头结点,头结点代表的是获得锁的节点,只有它释放了state其他线程才能获得这个变量的所有权
  2. 在条件1的前提下,方法tryAcquire返回true,也就是可以获得同步资源state

满足上面两个条件之后,这个Node就会获得锁,根据AbstractQueuedSynchronizer的规定,获得锁的Node必须是链表的头结点,所以,需要将当前节点设定为头结点。那如果不符合上面两个条件的Node会怎么样呢?看for循环里面的第二个分支,首先是shouldParkAfterFailedAcquire方法,看名字应该是说判断是否应该park当前该线程,然后是方法parkAndCheckInterrupt,这个方法是在shouldParkAfterFailedAcquire返回true的前提之下才会之下,意思就是首先判断一下是否需要park该Node,如果需要,那么就park它。关于线程的park和unpark,AbstractQueuedSynchronizer使用了偏向底层的技术来实现,在此先不做分析。现在来分析一下再什么情况下Node会被park(block):

  private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
      /*
       * This node has already set status asking a release
       * to signal it, so it can safely park.
       */
      return true;
    if (ws > 0) {
      /*
       * Predecessor was cancelled. Skip over predecessors and
       * indicate retry.
       */
      do {
        node.prev = pred = pred.prev;
      } while (pred.waitStatus > 0);
      pred.next = node;
    } else {
      /*
       * waitStatus must be 0 or PROPAGATE. Indicate that we
       * need a signal, but don't park yet. Caller will need to
       * retry to make sure it cannot acquire before parking.
       */
      compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
  }

可以发现,只有当Node的前驱节点的状态为Node.SIGNAL的时候才会返回true,也就是说,只有当前驱节点的状态变为了Node.SIGNAL,才会去通知当前节点,所以如果前驱节点是Node.SIGNAL的,那么当前节点就可以放心的park就好了,前驱节点在完成工作之后在释放资源的时候会unpark它的后继节点。下面看一下release的过程:

  public final boolean release(int arg) {
    if (tryRelease(arg)) {
      Node h = head;
      if (h != null && h.waitStatus != 0)
        unparkSuccessor(h);
      return true;
    }
    return false;
  }

  private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling. It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
      compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node. But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
          s = t;
    }
    if (s != null)
      LockSupport.unpark(s.thread);
  }

首先通过tryRelease方法来保证资源安全完整的释放了之后,如果发现节点的状态小于0,会变为0。0代表的是初始化的状态,当前的线程已经完成了工作,释放了锁,就要恢复原来的样子。然后会获取该节点的后继节点,如果没有后续节点了,或者后继节点已经被取消了,那么从尾部开始向前找第一个符合要求的节点,然后unpark它。

上面介绍了一对acquire-release,如果希望线程可以在竞争的时候被中断,可以使用acquireInterruptibly。如果希望加上获取锁的时间限制,可以使用tryAcquireNanos(int, long)方法来获取。

共享模式

和独占模式一样,分析一下acquireShared的过程:

  public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
      doAcquireShared(arg);
  }

  private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
      boolean interrupted = false;
      for (;;) {
        final Node p = node.predecessor();
        if (p == head) {
          int r = tryAcquireShared(arg);
          if (r >= 0) {
            setHeadAndPropagate(node, r);
            p.next = null; // help GC
            if (interrupted)
              selfInterrupt();
            failed = false;
            return;
          }
        }
        if (shouldParkAfterFailedAcquire(p, node) &&
          parkAndCheckInterrupt())
          interrupted = true;
      }
    } finally {
      if (failed)
        cancelAcquire(node);
    }
  }

获取锁的流程如下:

  1. 尝试使用tryAcquireShared方法,如果返回值大于等于0则表示成功,否则运行doAcquireShared方法
  2. 将当前竞争同步的线程添加到链表尾部,然后自旋
  3. 获取前驱节点,如果前驱节点是头节点,也就是说前驱节点现在持有锁,那么继续运行4,否则park该节点等待被unpark
  4. 使用tryAcquireShared方法来竞争,如果返回值大于等于0,那么就算是获取成功了,否则继续自旋尝试

共享模式下的release流程:

  public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
      doReleaseShared();
      return true;
    }
    return false;
  }

  private void doReleaseShared() {
    for (;;) {
      Node h = head;
      if (h != null && h != tail) {
        int ws = h.waitStatus;
        if (ws == Node.SIGNAL) {
          if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
            continue;      // loop to recheck cases
          unparkSuccessor(h);
        }
        else if (ws == 0 &&
             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
          continue;        // loop on failed CAS
      }
      if (h == head)          // loop if head changed
        break;
    }
  }  

  private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
      compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
      s = null;
      for (Node t = tail; t != null && t != node; t = t.prev)
        if (t.waitStatus <= 0)
          s = t;
    }
    if (s != null)
      LockSupport.unpark(s.thread);
  }

首先尝试使用tryReleaseShared方法来释放资源,如果释放失败,则返回false,如果释放成功了,那么继续执行doReleaseShared方法唤醒后续节点来竞争资源。需要注意的是,共享模式和独占模式的区别在于,独占模式只允许一个线程获得资源,而共享模式允许多个线程获得资源。所以在独占模式下只有当tryAcquire返回true的时候我们才能确定获得资源了,而在共享模式下,只要tryAcquireShared返回值大于等于0就可以说明获得资源了,所以你要确保你需要实现的需求和AbstractQueuedSynchronizer希望的是一致的。

桶独占模式一样,共享模式也有其他的两种API:

  • acquireSharedInterruptibly:支持相应中断的资源竞争
  • tryAcquireSharedNanos:可以设定时间的资源竞争

本文大概描述了AbstractQueuedSynchronizer框架的一些基本情况,具体的细节没有深究,但是AbstractQueuedSynchronizer作为Java中锁实现的底层支撑,需要好好研究一下,后续会基于AbstractQueuedSynchronizer来分析java中各种锁的实现。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 基于Java回顾之多线程同步的使用详解

    首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的"线程池",JDK为我们提供了一个很大的concurrent工具包,最后我们会对里面的内容进行探索. 为什么要线程同步? 说到线程同步,大部分情况下, 我们是在针对"单对象多线程"的情况进行讨论,一般会将其分成两部分,一部分是关于"共享变量",一部分关于"执行步骤". 共享变量 当我们在线程对象(Run

  • Java中synchronized关键字修饰方法同步的用法详解

    Java的最基本的同步方式,即使用synchronized关键字来控制一个方法的并发访问. 每一个用synchronized关键字声明的方法都是临界区.在Java中,同一个对象的临界区,在同一时间只有一个允许被访问. 静态方法则有不同的行为.用synchronized关键字声明的静态方法,同时只能够被一个执行线程访问,但是其他线程可以访问这个对象的非静态的synchronized方法.必须非常谨慎这一点,因为两个线程可以同时访问一个对象的两个不同的synchronized方法,即其中一个是静态s

  • java 中同步方法和同步代码块的区别详解

    java 中同步方法和同步代码块的区别详解 在Java语言中,每一个对象有一把锁.线程可以使用synchronized关键字来获取对象上的锁.synchronized关键字可应用在方法级别(粗粒度锁)或者是代码块级别(细粒度锁). 问题的由来: 看到这样一个面试题: //下列两个方法有什么区别 public synchronized void method1(){} public void method2(){ synchronized (obj){} } synchronized用于解决同步问

  • java多线程编程之使用Synchronized块同步变量

    下面的代码演示了如何同步特定的类方法: 复制代码 代码如下: package mythread; public class SyncThread extends Thread{ private static String sync = ""; private String methodType = ""; private static void method(String s) {  synchronized (sync)  {sync = s;System.out

  • java(jsp)整合discuz同步登录功能详解

    最近做了一个资源库系统的项目,老师说可以搭建开源论坛替代自己开发社交模块,正好在开源中国上看到了一个利用discuz的UCenter功能实现同步登录的开源项目(https://code.google.com/p/discuz-ucenter-api-for-java/),不禁大喜,于是花了几个小时照着教程操作了一遍,居然很轻松的成功了,特写此文以做纪念.. Uenter是Comsenz旗下各个产品之间信息直接传递的一个桥梁,通过UCenter站长可以无缝整合Comsenz系列产品,实现用户的一站

  • JAVA生产者消费者(线程同步)代码学习示例

    一.问题描述 生产者消费者问题是一个典型的线程同步问题.生产者生产商品放到容器中,容器有一定的容量(只能顺序放,先放后拿),消费者消费商品,当容器满了后,生产者等待,当容器为空时,消费者等待.当生产者将商品放入容器后,通知消费者:当消费者拿走商品后,通知生产者. 二.解决方案 对容器资源加锁,当取得锁后,才能对互斥资源进行操作. 复制代码 代码如下: public class ProducerConsumerTest { public static void main(String []args

  • java 实现线程同步的方式有哪些

    什么是线程同步? 当使用多个线程来访问同一个数据时,非常容易出现线程安全问题(比如多个线程都在操作同一数据导致数据不一致),所以我们用同步机制来解决这些问题. 实现同步机制有两个方法: 1.同步代码块: synchronized(同一个数据){} 同一个数据:就是N条线程同时访问一个数据. 2. 同步方法: public synchronized 数据返回类型 方法名(){} 就是使用 synchronized 来修饰某个方法,则该方法称为同步方法.对于同步方法而言,无需显示指定同步监视器,同步

  • Java 多线程同步 锁机制与synchronized深入解析

    打个比方:一个object就像一个大房子,大门永远打开.房子里有很多房间(也就是方法).这些房间有上锁的(synchronized方法), 和不上锁之分(普通方法).房门口放着一把钥匙(key),这把钥匙可以打开所有上锁的房间.另外我把所有想调用该对象方法的线程比喻成想进入这房子某个 房间的人.所有的东西就这么多了,下面我们看看这些东西之间如何作用的. 在此我们先来明确一下我们的前提条件.该对象至少有一个synchronized方法,否则这个key还有啥意义.当然也就不会有我们的这个主题了. 一

  • java多线程编程之使用Synchronized关键字同步类方法

    复制代码 代码如下: public synchronized void run(){     } 从上面的代码可以看出,只要在void和public之间加上synchronized关键字,就可以使run方法同步,也就是说,对于同一个Java类的对象实例,run方法同时只能被一个线程调用,并当前的run执行完后,才能被其他的线程调用.即使当前线程执行到了run方法中的yield方法,也只是暂停了一下.由于其他线程无法执行run方法,因此,最终还是会由当前的线程来继续执行.先看看下面的代码:sych

  • 解析Java线程同步锁的选择方法

    在需要线程同步的时候如何选择合适的线程锁?例:选择可以存入到常量池当中的对象,String对象等 复制代码 代码如下: public class SyncTest{    private String name = "name";public void method(String flag)    {        synchronized (name)        {            System.out.println(flag + ", invoke metho

随机推荐