Java多线程之条件对象Condition

1 简介

Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。

不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

2 Condition的实现分析

Condition是同步器AbstractQueuedSynchronized的内部类,因为Condition的操作需要获取相关的锁,所以作为同步器的内部类比较合理。每个Condition对象都包含着一个队列(等待队列),该队列是Condition对象实现等待/通知功能的关键。

等待队列:
等待队列是一个FIFO的队列,队列的每一个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了await()方法,该线程就会释放锁、构造成节点进入等待队列并进入等待状态。

这里的节点定义也就是AbstractQueuedSynchronizer.Node的定义。

一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法时,将会以当前线程构造节点,并将节点从尾部加入等待队列。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而Lock(同步器)拥有一个同步队列和多个等待队列。

等待(await):AbstractQueuedLongSynchronizer中实现
调用Condition的await()方法,会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。

从队列的角度来看,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过Condition.signal()方法唤醒,而是对等待线程进行中断,则抛出InterruptedException。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            long savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

Condition等待通知的本质
总的来说,Condition的本质就是等待队列和同步队列的交互:

当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:

构造一个新的等待队列节点加入到等待队列队尾
释放锁,也就是将它的同步队列节点从同步队列队首移除
自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal())或被中断
阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:
从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论:

如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4.
如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4.
通知(signal):AbstractQueuedLongSynchronizer中实现
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,将等待队列中的节点全部移动到同步队列中,并唤醒每个节点的线程。

public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

最后还要注意,Java 中有 signal 和 signalAll 两种方法,signal 是随机解除一个等待集中的线程的阻塞状态,signalAll 是解除所有等待集中的线程的阻塞状态。signal 方法的效率会比 signalAll 高,但是它存在危险,因为它一次只解除一个线程的阻塞状态,因此,如果等待集中有多个线程都满足了条件,也只能唤醒一个,其他的线程可能会导致死锁

3 Condition 实例

消费生产者模式
public class ConditionTest {
    public static void main(String[] args) {
        // 仓库
        Depot depot = new Depot(100);
        // 消费者
        Consumer consumer = new Consumer(depot);
        // 生产者
        Produce produce = new Produce(depot);
        produce.produceThing(5);
        consumer.consumerThing(5);
        produce.produceThing(2);
        consumer.consumerThing(5);
        produce.produceThing(3);
    }
}

class Depot {
    private int capacity;
    private int size;
    private Lock lock;
    private Condition consumerCond;
    private Condition produceCond;

public Depot(int capacity) {
        this.capacity = capacity;
        this.size = 0;
        this.lock = new ReentrantLock();
        this.consumerCond = lock.newCondition();
        this.produceCond = lock.newCondition();
    }

public void produce(int val) {
        lock.lock();
        try {
            int left = val;
            while (left > 0) {
                while (size >= capacity) {
                    produceCond.await();
                }
                int produce = (left+size) > capacity ? (capacity-size) : left;
                size += produce;
                left -= produce;
                System.out.println(Thread.currentThread().getName() + ", ProduceVal=" + val + ", produce=" + produce + ", size=" + size);
                consumerCond.signalAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

public void consumer(int val) {
        lock.lock();
        try {
            int left = val;
            while (left > 0) {
                while (size <= 0) {
                    consumerCond.await();
                }
                int consumer = (size <= left) ? size : left;
                size -= consumer;
                left -= consumer;
                System.out.println(Thread.currentThread().getName() + ", ConsumerVal=" + val + ", consumer=" + consumer + ", size=" + size);
                produceCond.signalAll();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
class Consumer {
    private Depot depot;
    public Consumer(Depot depot) {
        this.depot = depot;
    }

public void consumerThing(final int amount) {
        new Thread(new Runnable() {
            public void run() {
                depot.consumer(amount);
            }
        }).start();
    }
}
class Produce {
    private Depot depot;
    public Produce(Depot depot) {
        this.depot = depot;
    }

public void produceThing(final int amount) {
        new Thread(new Runnable() {
            public void run() {
                depot.produce(amount);
            }
        }).start();
    }
}

Thread-0, ProduceVal=5, produce=5, size=5
Thread-1, ConsumerVal=5, consumer=5, size=0
Thread-2, ProduceVal=2, produce=2, size=2
Thread-3, ConsumerVal=5, consumer=2, size=0
Thread-4, ProduceVal=3, produce=3, size=3
Thread-3, ConsumerVal=5, consumer=3, size=0

输出结果中,Thread-3出现两次,就是因为要消费5个产品,但仓库中只有2个产品,所以先将库存的2个产品全部消费,然后这个线程进入等待队列,等待生产,随后生产出了3个产品,生产者生产后又执行signalAll方法将等待队列中所有的线程都唤醒,Thread-3继续消费还需要的3个产品。

三个线程依次打印ABC

class Business {
    private Lock lock = new ReentrantLock();
    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();
    private String type = "A"; //内部状态

/*
     * 方法的基本要求为:
     * 1、该方法必须为原子的。
     * 2、当前状态必须满足条件。若不满足,则等待;满足,则执行业务代码。
     * 3、业务执行完毕后,修改状态,并唤醒指定条件下的线程。
     */
    public void printA() {
        lock.lock(); //锁,保证了线程安全。
        try {
            while (type != "A") { //type不为A,
                try {
                    conditionA.await(); //将当前线程阻塞于conditionA对象上,将被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

//type为A,则执行。
            System.out.println(Thread.currentThread().getName() + " 正在打印A");
            type = "B"; //将type设置为B。
            conditionB.signal(); //唤醒在等待conditionB对象上的一个线程。将信号传递出去。
        } finally {
            lock.unlock(); //解锁
        }
    }

public void printB() {
        lock.lock(); //锁
        try {
            while (type != "B") { //type不为B,
                try {
                    conditionB.await(); //将当前线程阻塞于conditionB对象上,将被阻塞。
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

//type为B,则执行。
            System.out.println(Thread.currentThread().getName() + " 正在打印B");
            type = "C"; //将type设置为C。
            conditionC.signal(); //唤醒在等待conditionC对象上的一个线程。将信号传递出去。
        } finally {
            lock.unlock(); //解锁
        }
    }

public void printC() {
        lock.lock(); //锁
        try {
            while (type != "C") {
                try {
                    conditionC.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

System.out.println(Thread.currentThread().getName() + " 正在打印C");
            type = "A";
            conditionA.signal();
        } finally {
            lock.unlock(); //解锁
        }
    }
}

public class ConditionTest{

public static void main(String[] args) {
        final Business business = new Business();//业务对象。

//线程1号,打印10次A。
        Thread ta = new Thread(new Runnable() {

@Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printA();
                }
            }
        });

//线程2号,打印10次B。
        Thread tb = new Thread(new Runnable() {

@Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printB();
                }
            }
        });

//线程3号,打印10次C。
        Thread tc = new Thread(new Runnable() {

@Override
            public void run() {
                for(int i=0;i<10;i++){
                    business.printC();
                }
            }
        });

//执行3条线程。
        ta.start();
        tb.start();
        tc.start();
    }

}

Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C
Thread-0 正在打印A
Thread-1 正在打印B
Thread-2 正在打印C

虚假唤醒

所谓"虚假唤醒",即其他地方的代码触发了condition.signal(),唤醒condition上等待的线程。但被唤醒的线程仍然不满足执行条件。

condition通常与条件语句一起使用:

if(!条件){
    condition.await(); //不满足条件,当前线程等待;
}

更好的方法是使用while:

while(!条件){
    condition.await(); //不满足条件,当前线程等待;
}

在等待Condition时,允许发生"虚假唤醒",这通常作为对基础平台语义的让步。若使用"if(!条件)"则被"虚假唤醒"的线程可能继续执行。所以"while(!条件)"可以防止"虚假唤醒"。建议总是假定这些"虚假唤醒"可能发生,因此总是在一个循环中等待。

总结

如果知道Object的等待通知机制,Condition的使用是比较容易掌握的,因为和Object等待通知的使用基本一致。

对Condition的源码理解,主要就是理解等待队列,等待队列可以类比同步队列,而且等待队列比同步队列要简单,因为等待队列是单向队列,同步队列是双向队列。

以下是笔者对等待队列是单向队列、同步队列是双向队列的一些思考,欢迎提出不同意见:

之所以同步队列要设计成双向的,是因为在同步队列中,节点唤醒是接力式的,由每一个节点唤醒它的下一个节点,如果是由next指针获取下一个节点,是有可能获取失败的,因为虚拟队列每添加一个节点,是先用CAS把tail设置为新节点,然后才修改原tail的next指针到新节点的。因此用next向后遍历是不安全的,但是如果在设置新节点为tail前,为新节点设置prev,则可以保证从tail往前遍历是安全的。因此要安全的获取一个节点Node的下一个节点,先要看next是不是null,如果是null,还要从tail往前遍历看看能不能遍历到Node。

而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,因此每次要执行唤醒操作的时候,直接唤醒等待队列的首节点就行了。等待队列的实现中不需要遍历队列,因此也不需要prev指针。

(0)

相关推荐

  • Java多线程Condition接口原理介绍

    Condition接口提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的 Condition接口详解 Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁.Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的. Lock lock = new ReentrantL

  • Java多线程中ReentrantLock与Condition详解

    一.ReentrantLock类 1.1什么是reentrantlock java.util.concurrent.lock中的Lock框架是锁定的一个抽象,它允许把锁定的实现作为Java类,而不是作为语言的特性来实现.这就为Lock的多种实现留下了空间,各种实现可能有不同的调度算法.性能特性或者锁定语义.ReentrantLock类实现了Lock,它拥有与synchronized相同的并发性和内存语义,但是添加了类似锁投票.定时锁等候和可中断锁等候的一些特性.此外,它还提供了在激烈争用情况下更

  • java多线程加锁以及Condition类的使用实例解析

    这篇文章主要介绍了java多线程加锁以及Condition类的使用实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 看了网上非常多的运行代码,很多都是重复的再说一件事,可能对于java老鸟来说,理解java的多线程是非常容易的事情,但是对于我这样的菜鸟来说,这个实在有点难,可能是我太菜了,网上重复的陈述对于我理解这个问题一点帮助都没有.所以这里我写下我对于这个问题的理解,目的是为了防止我忘记. 还是从代码实例开始讲起: 代码 import

  • Java多线程编程中使用Condition类操作锁的方法详解

    Condition的作用是对锁进行更精确的控制.Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法.不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的:而Condition是需要与"互斥

  • Java多线程之条件对象Condition

    1 简介 Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法. 不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的:而Condition是需要与"互斥锁"/"共享锁&

  • 图解Java ReentrantLock的条件变量Condition机制

    目录 概述 ReentrantLock条件变量使用 图解实现原理 await过程 signal过程 概述 想必大家都使用过wait()和notify()这两个方法把,这两个方法主要用于多线程间的协同处理,即控制线程之间的等待.通知.切换及唤醒.而RenentrantLock也支持这样条件变量的能力,而且相对于synchronized 更加强大,能够支持多个条件变量. 最好可以先阅读ReentrantLock系列文章: 图解Java ReentrantLock公平锁和非公平锁的实现 详解Java 

  • Java并发之条件阻塞Condition的应用代码示例

    本文研究的主要是Java并发之条件阻塞Condition的应用示例代码,具体如下. Condition将Object监视器方法(wait.notify 和 notifyAll)分解成截然不同的对象,以便通过将这些对象与任意Lock实现组合使用,为每个对象提供多个等待 set(wait-set).其中,Lock 替代了synchronized方法和语句的使用,Condition替代了Object监视器方法的使用. 1. Condition的基本使用 由于Condition可以用来替代wait.no

  • 学习Java多线程之同步

    如果你的java基础较弱,或者不大了解java多线程请先看这篇文章<学习Java多线程之线程定义.状态和属性> 同步一直是java多线程的难点,在我们做android开发时也很少应用,但这并不是我们不熟悉同步的理由.希望这篇文章能使更多的人能够了解并且应用java的同步. 在多线程的应用中,两个或者两个以上的线程需要共享对同一个数据的存取.如果两个线程存取相同的对象,并且每一个线程都调用了修改该对象的方法,这种情况通常成为竞争条件. 竞争条件最容易理解的例子就是:比如火车卖票,火车票是一定的,

  • Java多线程中不同条件下编写生产消费者模型方法介绍

    简介: 生产者.消费者模型是多线程编程的常见问题,最简单的一个生产者.一个消费者线程模型大多数人都能够写出来,但是一旦条件发生变化,我们就很容易掉进多线程的bug中.这篇文章主要讲解了生产者和消费者的数量,商品缓存位置数量,商品数量等多个条件的不同组合下,写出正确的生产者消费者模型的方法. 欢迎探讨,如有错误敬请指正 生产消费者模型 生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品.生产消费者模式

  • 详解Java同步—线程锁和条件对象

    线程锁和条件对象 在大多数多线程应用中,都是两个及以上线程需要共享对同一数据的存取,所以有可能出现两个线程同时访问同一个资源的情况,这种情况叫做:竞争条件. 在Java中为了解决并发的数据访问问题,一般使用锁这个概念来解决. 有几种机制防止代码收到并发访问的干扰: 1.synchronized关键字(自动创建一个锁及相关的条件) 2.ReentrantLock类+Java.util.concurrent包中的lock接口(在Java5.0的时候引入) ReentrantLock的使用 publi

随机推荐