Java并发编程之Condition源码分析(推荐)

Condition介绍

上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充。ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活"。Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现。

常用方法

Condition中主要的方法有2个

  1. (1)await()方法可以阻塞当前线程,并释放锁。
  2. (2)在获取锁后可以调用signal()通知被await()阻塞的线程"激活"。

这里的await(),signal()必须在ReentrantLock#lock()和ReentrantLock#unlock()之间调用。

Condition实现分析

Condition的实现也是利用AbstractQueuedSynchronizer队列来实现,await()在被调用后先将当前线程加入到等待队列中,然后释放锁,最后阻塞当前线程。signal()在被调用后会先获取等待队列中第一个节点,并将这个节点转化成ReentrantLock中的节点并加入到同步阻塞队列的结尾,这样此节点的上个节点线程释放锁后会激活此节点线程取来获取锁。

await()方法源码分析

await()源码如下

public final void await() throws InterruptedException {
		//判断是否当前线程是否被中断中断则抛出中断异常
      if (Thread.interrupted())
        throw new InterruptedException();
		//加入等待队列
      Node node = addConditionWaiter();
		//释放当前线程锁
      int savedState = fullyRelease(node);
      int interruptMode = 0;
		//判断是否在同步阻塞队列,如果不在一直循环到被加入
      while (!isOnSyncQueue(node)) {
		//阻塞当前线程
        LockSupport.park(this);
		//判断是否被中断
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
          break;
      }
		//获取锁,如果获取中被中断则设置中断状态
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
		//清除等待队列中被"激活"的节点
      if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
		//如果当前线程被中断,处理中断逻辑
      if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }

主要分以下几步

  1. (1)先判断是否当前线程是否被中断中断则抛出中断异常如果未中断调用addConditionWaiter()加入等待队列
  2. (2)调用fullyRelease(node)释放锁使同步阻塞队列的下个节点线程能获取锁。
  3. (3)调用isOnSyncQueue(node)判断是否在同步阻塞队列,这里的加入同步阻塞队列操作是在另一个线程调用signal()后加入,如果不在同步阻塞队列会进行阻塞直到被激活。
  4. (4)如果被激活然后调用checkInterruptWhileWaiting(node)判断是否被中断并获取中断模式。
  5. (5)继续调用isOnSyncQueue(node)判断是否在同步阻塞队列。
  6. (6)是则调用acquireQueued(node, savedState) 获取锁,这里如果获取不到也会被阻塞,获取不到原因是在第一次调用isOnSyncQueue(node)前,可能另一个线程已经调用signal()后加入到同步阻塞队列,然后调用acquireQueued(node, savedState) 获取不到锁并阻塞。acquireQueued(node, savedState)也会返回当前线程是否被中断,如果被中断设置中断模式。
  7. (7)在激活后调用unlinkCancelledWaiters()清理等待队列的已经被激活的节点。
  8. (8)最后判断当前线程是否被中断,如果被中断则对中断线程做处理。

下面来看下addConditionWaiter()实现

private Node addConditionWaiter() {
 		//获取等待队列尾部节点
      Node t = lastWaiter;
      //如果尾部状态不为CONDITION,如果已经被"激活",清理之,然后重新获取尾部节点
      if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
      }
		//创建以当前线程为基础的节点,并将节点模式设置成CONDITION
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
		//如果尾节点不存在,说明队列为空,将头节点设置成当前节点
      if (t == null)
        firstWaiter = node;
		//如果尾节点存在,将此节点设置成尾节点的下个节点
      else
        t.nextWaiter = node;
		//将尾节点设置成当前节点
      lastWaiter = node;
      return node;
    }

addConditionWaiter()的逻辑很简单,就是创建以当前线程为基础的节点并把节点加入等待队列的尾部待其他线程处理。

下面来看下fullyRelease(Node node)实现

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
		//获取阻塞队列中当前线程节点的锁状态值
      int savedState = getState();
		//释放当前线程节点锁
      if (release(savedState)) {
        failed = false;
        return savedState;
      } else {
        throw new IllegalMonitorStateException();
      }
    } finally {
		//释放失败讲节点等待状态设置成关闭
      if (failed)
        node.waitStatus = Node.CANCELLED;
    }
  }

调用getState()先获取阻塞队列中当前线程节点的锁状态值,这个值可能大于1表示多次重入,然后调用release(savedState)释放所有锁,如果释放成功返回锁状态值。

下面来看下isOnSyncQueue(Node node)实现

final boolean isOnSyncQueue(Node node) {
		//判断当前节点是否是CONDITION或者前置节点是否为空如果为空直接返回false
    if (node.waitStatus == Node.CONDITION || node.prev == null)
      return false;
		//如果下个节点存在,则在同步阻塞队列中返回true
    if (node.next != null) // If has successor, it must be on queue
      return true;
		//遍历查找当前节点是否在同步阻塞队列中
    return findNodeFromTail(node);
  }
  private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
      if (t == node)
        return true;
      if (t == null)
        return false;
      t = t.prev;
    }
  }

此方法的功能是查找当前节点是否在同步阻塞队列中,方法先是快速判断,判断不了再进行遍历查找。

  1. (1)第一步先判断次节点是否CONDITION状态或者前置节点是否存在,如果是表明不在队列中返回false,阻塞队列中的状态一般是0或者SIGNAL状态而且如果当前如果当前节点在队列阻塞中且未被激活前置节点一定不为空。
  2. (2)第二步判断节点的下个节点是否存在,如果存在则表明当前当前节点已加入到阻塞队列中。
  3. (3)如果以上2点都没法判断,也有可能刚刚加入到同步阻塞队列中,所以调用findNodeFromTail(Node node)做最后的遍历查找。查找从队列尾部开始查,从尾部开始查的原因是可能刚刚加入到同步阻塞队列中,从尾部能快速定位。

下面看下checkInterruptWhileWaiting(Node node)实现

private int checkInterruptWhileWaiting(Node node) {
      return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
    }

final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
      enq(node);
      return true;
    }
    while (!isOnSyncQueue(node))
      Thread.yield();
    return false;
  }

此方法在线程被激活后被调用,主要功能就是判断被激活的线程是否被中断。此方法会返回2种中断状态THROW_IE和REINTERRUPT,THROW_IE是调用signal()前被中断返回,REINTERRUPT在调用signal()后被中断返回。 此方法先判断是否被标记中断,是的话再调用transferAfterCancelledWait(node)取判断是那种中断状态,transferAfterCancelledWait(node)方法分2步

  1. (1)用CAS方式将节点状态改错等待状态改成CONDITION,并加入到同步阻塞队列中返回true
  2. (2)如果不能加入到同步阻塞队列就自旋一直等待加入

如果使用await()方法上面2步其实是没什么作用其最后一定会返回false,因为await()被激活只能调用 signal()方法,而signal()方法肯定已经将节点加入到同步阻塞队列中。所以以上逻辑是给await(long time, TimeUnit unit)等带超时激活方法用的。

acquireQueued(node, savedState)方法再上一章节已经讲过这边就不重复了,下面分析下unlinkCancelledWaiters()方法

private void unlinkCancelledWaiters() {
		//获取等待队列头节点
      Node t = firstWaiter;
      Node trail = null;
      while (t != null) {
		//获取下个节点
        Node next = t.nextWaiter;
		//如果状态不为CONDITION说明已经加入阻塞队列需要清理掉
        if (t.waitStatus != Node.CONDITION) {
          t.nextWaiter = null;
          if (trail == null)
            firstWaiter = next;
          else
			//获取下个节点
            trail.nextWaiter = next;
          if (next == null)
            lastWaiter = trail;
        }
        else
          trail = t;
        t = next;
      }
    }

此方法就是从头开始查找状态不为CONDITION的节点并清理,状态不为CONDITION节点说明此节点已经加入到阻塞队列,已经不需要维护。

下面来看下reportInterruptAfterWait(interruptMode)方法

private void reportInterruptAfterWait(int interruptMode)
      throws InterruptedException {
		//如果是THROW_IE模式直接抛出异常
      if (interruptMode == THROW_IE)
        throw new InterruptedException();
		//如果是REINTERRUPT模式标记线程中断由上层处理中断
      else if (interruptMode == REINTERRUPT)
        selfInterrupt();
    }

此方法处理中断逻辑。如果是THROW_IE模式直接抛出异常,如果是REINTERRUPT模式标记线程中断由上层处理中断。

signal()方法源码分析

signal()源码如下

public final void signal() {
		//是否当前线程持有锁
      if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
      Node first = firstWaiter;
		//通知"激活"头节点线程
      if (first != null)
        doSignal(first);
    }

先调用isHeldExclusively()判断锁是否被当前线程持有,然后检查等待队列是否为空,不为空就是可以取第一个节点调用doSignal(first)去"激活",这里激活不是真正的激活而只是将节点加入到同步阻塞队列尾部,所以上下文中带""的激活都是这种解释。

下面看下isHeldExclusively()实现

 protected final boolean isHeldExclusively() {
      return getExclusiveOwnerThread() == Thread.currentThread();
    }

实现就是比较下当前线程和持有锁的线程是否同一个

下面看下doSignal(first)的实现

private void doSignal(Node first) {
      do {
		//头指头后移一位,如果后面的节点为空,则将尾指头也指向空,说明队列为空了
        if ( (firstWaiter = first.nextWaiter) == null)
          lastWaiter = null;
		//清空头节点的下个节点
        first.nextWaiter = null;
		//如果"激活"失败者取下个继续,直到成功或者遍历完
      } while (!transferForSignal(first) &&
           (first = firstWaiter) != null);
    }

此方法就是取当前头节点一直去尝试"激活",直到成功或者遍历完。

下面来看下transferForSignal(first)方法

final boolean transferForSignal(Node node) {
		//将CONDITION状态设置成0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
      return false;
		//加入到同步阻塞队列
    Node p = enq(node);
    int ws = p.waitStatus;
		//状态异常直接激活
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      LockSupport.unpark(node.thread);
    return true;
  }

(1)此方法先先将CONDITION状态设置成0,因为如果是CONDITION状态加入到同步阻塞队列,激活的时候是不识别的。
(2)加入到同步阻塞队列的尾部。所以同步阻塞队列中前面如果有多个在排队,调用unlock()不会马上激活此节点。
(3)状态异常直接调用unpark激活,这边按理说如果状态异常情况下激活,await()在调用unlock()被激活后会进行相应的异常处理,但看await()代码没有处理则是正常执行。

这个方法主要就是把节点加入到同步阻塞队列的,真正的激活则是调用unlock()去处理。

以上所述是小编给大家介绍的Java并发编程之Condition源码分析详解整合,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对我们网站的支持!

(0)

相关推荐

  • 举例讲解Java中Piped管道输入输出流的线程通信控制

    PipedOutputStream和PipedInputStream 在java中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流. 它们的作用是让多线程可以通过管道进行线程间的通讯.在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用. 使用管道通信时,大致的流程是:我们在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的Pip

  • Java多线程编程中使用Condition类操作锁的方法详解

    Condition的作用是对锁进行更精确的控制.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法.不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的:而Condition是需要与"互斥

  • Java多线程中ReentrantLock与Condition详解

    一.ReentrantLock类 1.1什么是reentrantlock java.util.concurrent.lock中的Lock框架是锁定的一个抽象,它允许把锁定的实现作为Java类,而不是作为语言的特性来实现.这就为Lock的多种实现留下了空间,各种实现可能有不同的调度算法.性能特性或者锁定语义.ReentrantLock类实现了Lock,它拥有与synchronized相同的并发性和内存语义,但是添加了类似锁投票.定时锁等候和可中断锁等候的一些特性.此外,它还提供了在激烈争用情况下更

  • Java线程通信详解

    线程通信用来保证线程协调运行,一般在做线程同步的时候才需要考虑线程通信的问题. 1.传统的线程通信 通常利用Objeclt类提供的三个方法: wait() 导致当前线程等待,并释放该同步监视器的锁定,直到其它线程调用该同步监视器的notify()或者notifyAll()方法唤醒线程. notify(),唤醒在此同步监视器上等待的线程,如果有多个会任意选择一个唤醒 notifyAll() 唤醒在此同步监视器上等待的所有线程,这些线程通过调度竞争资源后,某个线程获取此同步监视器的锁,然后得以运行.

  • Java编程中实现Condition控制线程通信

    java中控制线程通信的方法 1.传统的方式:利用synchronized关键字来保证同步,结合wait(),notify(),notifyAll()控制线程通信.不灵活. 2.利用Condition控制线程通信,灵活. 3.利用管道pipe进行线程通信,不推荐 4.利用BlockingQueue控制线程通信 本文就讲解利用Condition控制线程通信,非常灵活的方式. Condition类是用来保持Lock对象的协调调用. 对Lock不了解的可以参考:Java线程同步Lock同步锁代码示例

  • Java使用Condition控制线程通信的方法实例详解

    本文实例讲述了Java使用Condition控制线程通信的方法.分享给大家供大家参考,具体如下: 一 点睛 当使用Lock对象来保证同步时,Java提供了一个Condition类来保持协调,使用Condition可以让那些已经得到Lock对象.却无法继续执行的线程释放Lock对象,Condtion对象也可以唤醒其他处于等待的线程. Condition 将同步监视锁方法(wait.notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与Lock对象组合使用,为每个对象提供多

  • Java并发之条件阻塞Condition的应用代码示例

    本文研究的主要是Java并发之条件阻塞Condition的应用示例代码,具体如下. Condition将Object监视器方法(wait.notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用,为每个对象提供多个等待 set(wait-set).其中,Lock 替代了synchronized方法和语句的使用,Condition替代了Object监视器方法的使用. 1. Condition的基本使用 由于Condition可以用来替代wait.no

  • java 多线程-线程通信实例讲解

    线程通信的目标是使线程间能够互相发送信号.另一方面,线程通信使线程能够等待其他线程的信号. 通过共享对象通信 忙等待 wait(),notify()和 notifyAll() 丢失的信号 假唤醒 多线程等待相同信号 不要对常量字符串或全局对象调用 wait() 通过共享对象通信 线程间发送信号的一个简单方式是在共享对象的变量里设置信号值.线程 A 在一个同步块里设置 boolean 型成员变量 hasDataToProcess 为 true,线程 B 也在同步块里读取 hasDataToProc

  • Java使用阻塞队列控制线程通信的方法实例详解

    本文实例讲述了Java使用阻塞队列控制线程通信的方法.分享给大家供大家参考,具体如下: 一 点睛 阻塞队列主要用在生产者/消费者的场景,下面这幅图展示了一个线程生产.一个线程消费的场景: 负责生产的线程不断的制造新对象并插入到阻塞队列中,直到达到这个队列的上限值.队列达到上限值之后生产线程将会被阻塞,直到消费的线程对这个队列进行消费.同理,负责消费的线程不断的从队列中消费对象,直到这个队列为空,当队列为空时,消费线程将会被阻塞,除非队列中有新的对象被插入. BlockingQueue的核心方法:

  • 深入理解java线程通信

    前言 开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景. 或者是线程 A 在执行到某个条件通知线程 B 执行某个操作. 可以通过以下几种方式实现: 等待通知机制 等待通知模式是 Java 中比较经典的线程通信方式. 两个线程通过对同一对象调用等待 wait() 和通知 notify() 方法来进行通讯. 如两个线程交替打印奇偶数: public class TwoThreadWaitNotify { private int start = 1; private boolean

随机推荐