Java concurrency之Condition条件_动力节点Java学院整理

Condition介绍

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。

Condition函数列表

// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await()
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit)
// 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout)
// 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly()
// 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline)
// 唤醒一个等待线程。
void signal()
// 唤醒所有等待线程。
void signalAll()

Condition示例

示例1是通过Object的wait(), notify()来演示线程的休眠/唤醒功能。

示例2是通过Condition的await(), signal()来演示线程的休眠/唤醒功能。

示例3是通过Condition的高级功能。

示例1

public class WaitTest1 {
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    synchronized(ta) { // 通过synchronized(ta)获取“对象ta的同步锁”
      try {
        System.out.println(Thread.currentThread().getName()+" start ta");
        ta.start();
        System.out.println(Thread.currentThread().getName()+" block");
        ta.wait();  // 等待
        System.out.println(Thread.currentThread().getName()+" continue");
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      synchronized (this) { // 通过synchronized(this)获取“当前对象的同步锁”
        System.out.println(Thread.currentThread().getName()+" wakup others");
        notify();  // 唤醒“当前对象上的等待线程”
      }
    }
  }
}

示例2

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionTest1 {
  private static Lock lock = new ReentrantLock();
  private static Condition condition = lock.newCondition();
  public static void main(String[] args) {
    ThreadA ta = new ThreadA("ta");
    lock.lock(); // 获取锁
    try {
      System.out.println(Thread.currentThread().getName()+" start ta");
      ta.start();
      System.out.println(Thread.currentThread().getName()+" block");
      condition.await();  // 等待
      System.out.println(Thread.currentThread().getName()+" continue");
    } catch (InterruptedException e) {
      e.printStackTrace();
    } finally {
      lock.unlock();  // 释放锁
    }
  }
  static class ThreadA extends Thread{
    public ThreadA(String name) {
      super(name);
    }
    public void run() {
      lock.lock();  // 获取锁
      try {
        System.out.println(Thread.currentThread().getName()+" wakup others");
        condition.signal();  // 唤醒“condition所在锁上的其它线程”
      } finally {
        lock.unlock();  // 释放锁
      }
    }
  }
}

运行结果:

main start ta
main block
ta wakup others
main continue

通过“示例1”和“示例2”,我们知道Condition和Object的方法有一下对应关系:

Object      Condition 
休眠          wait        await
唤醒个线程     notify      signal
唤醒所有线程   notifyAll   signalAll

Condition除了支持上面的功能之外,它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。

例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒"读线程";当从缓冲区读出数据之后,唤醒"写线程";并且当缓冲区满的时候,"写线程"需要等待;当缓冲区为空时,"读线程"需要等待。         如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒"读线程"时,不可能通过notify()或notifyAll()明确的指定唤醒"读线程",而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。  但是,通过Condition,就能明确的指定唤醒读线程。

看看下面的示例3,可能对这个概念有更深刻的理解。

示例3

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
  final Lock lock = new ReentrantLock();
  final Condition notFull = lock.newCondition();
  final Condition notEmpty = lock.newCondition();
  final Object[] items = new Object[5];
  int putptr, takeptr, count;
  public void put(Object x) throws InterruptedException {
    lock.lock();  //获取锁
    try {
      // 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
      while (count == items.length)
        notFull.await();
      // 将x添加到缓冲中
      items[putptr] = x;
      // 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
      if (++putptr == items.length) putptr = 0;
      // 将“缓冲”数量+1
      ++count;
      // 唤醒take线程,因为take线程通过notEmpty.await()等待
      notEmpty.signal();
      // 打印写入的数据
      System.out.println(Thread.currentThread().getName() + " put "+ (Integer)x);
    } finally {
      lock.unlock();  // 释放锁
    }
  }
  public Object take() throws InterruptedException {
    lock.lock();  //获取锁
    try {
      // 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
      while (count == 0)
        notEmpty.await();
      // 将x从缓冲中取出
      Object x = items[takeptr];
      // 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
      if (++takeptr == items.length) takeptr = 0;
      // 将“缓冲”数量-1
      --count;
      // 唤醒put线程,因为put线程通过notFull.await()等待
      notFull.signal();
      // 打印取出的数据
      System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
      return x;
    } finally {
      lock.unlock();  // 释放锁
    }
  }
}
public class ConditionTest2 {
  private static BoundedBuffer bb = new BoundedBuffer();
  public static void main(String[] args) {
    // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
    // 启动10个“读线程”,从BoundedBuffer中不断的读数据。
    for (int i=0; i<10; i++) {
      new PutThread("p"+i, i).start();
      new TakeThread("t"+i).start();
    }
  }
  static class PutThread extends Thread {
    private int num;
    public PutThread(String name, int num) {
      super(name);
      this.num = num;
    }
    public void run() {
      try {
        Thread.sleep(1);  // 线程休眠1ms
        bb.put(num);    // 向BoundedBuffer中写入数据
      } catch (InterruptedException e) {
      }
    }
  }
  static class TakeThread extends Thread {
    public TakeThread(String name) {
      super(name);
    }
    public void run() {
      try {
        Thread.sleep(10);          // 线程休眠1ms
        Integer num = (Integer)bb.take();  // 从BoundedBuffer中取出数据
      } catch (InterruptedException e) {
      }
    }
  }
}

(某一次)运行结果:

p1 put  1
p4 put  4
p5 put  5
p0 put  0
p2 put  2
t0 take 1
p3 put  3
t1 take 4
p6 put  6
t2 take 5
p7 put  7
t3 take 0
p8 put  8
t4 take 2
p9 put  9
t5 take 3
t6 take 6
t7 take 7
t8 take 8
t9 take 9

结果说明:

(01) BoundedBuffer 是容量为5的缓冲,缓冲中存储的是Object对象,支持多线程的读/写缓冲。多个线程操作“一个BoundedBuffer对象”时,它们通过互斥锁lock对缓冲区items进行互斥访问;而且同一个BoundedBuffer对象下的全部线程共用“notFull”和“notEmpty”这两个Condition。

notFull用于控制写缓冲,notEmpty用于控制读缓冲。当缓冲已满的时候,调用put的线程会执行notFull.await()进行等待;当缓冲区不是满的状态时,就将对象添加到缓冲区并将缓冲区的容量count+1,最后,调用notEmpty.signal()缓冲notEmpty上的等待线程(调用notEmpty.await的线程)。 简言之,notFull控制“缓冲区的写入”,当往缓冲区写入数据之后会唤醒notEmpty上的等待线程。

同理,notEmpty控制“缓冲区的读取”,当读取了缓冲区数据之后会唤醒notFull上的等待线程。

(02) 在ConditionTest2的main函数中,启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);同时,也启动10个“读线程”,从BoundedBuffer中不断的读数据。

(03) 简单分析一下运行结果。

1, p1线程向缓冲中写入1。    此时,缓冲区数据:   | 1 |   |   |   |   |

2, p4线程向缓冲中写入4。    此时,缓冲区数据:   | 1 | 4 |   |   |   |

3, p5线程向缓冲中写入5。    此时,缓冲区数据:   | 1 | 4 | 5 |   |   |

4, p0线程向缓冲中写入0。    此时,缓冲区数据:   | 1 | 4 | 5 | 0 |   |

5, p2线程向缓冲中写入2。    此时,缓冲区数据:   | 1 | 4 | 5 | 0 | 2 |

此时,缓冲区容量为5;缓冲区已满!如果此时,还有“写线程”想往缓冲中写入数据,会调用put中的notFull.await()等待,直接缓冲区非满状态,才能继续运行。

6, t0线程从缓冲中取出数据1。此时,缓冲区数据:   |   | 4 | 5 | 0 | 2 |

7, p3线程向缓冲中写入3。    此时,缓冲区数据:   | 3 | 4 | 5 | 0 | 2 |

8, t1线程从缓冲中取出数据4。此时,缓冲区数据:   | 3 |   | 5 | 0 | 2 |

9, p6线程向缓冲中写入6。    此时,缓冲区数据:   | 3 | 6 | 5 | 0 | 2 |
     ...

(0)

相关推荐

  • Java concurrency之CountDownLatch原理和示例_动力节点Java学院整理

    CountDownLatch简介 CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. CountDownLatch和CyclicBarrier的区别 (01) CountDownLatch的作用是允许1或N个线程等待其他线程完成执行:而CyclicBarrier则是允许N个线程相互等待. (02) CountDownLatch的计数器无法被重置:CyclicBarrier的计数器可以被重置后使用,因此它被称为是循环的barrier

  • Java concurrency集合之CopyOnWriteArraySet_动力节点Java学院整理

    CopyOnWriteArraySet介绍 它是线程安全的无序的集合,可以将它理解成线程安全的HashSet.有意思的是,CopyOnWriteArraySet和HashSet虽然都继承于共同的父类AbstractSet:但是,HashSet是通过"散列表(HashMap)"实现的,而CopyOnWriteArraySet则是通过"动态数组(CopyOnWriteArrayList)"实现的,并不是散列表. 和CopyOnWriteArrayList类似,CopyO

  • java线程并发countdownlatch类使用示例

    复制代码 代码如下: package com.yao; import java.util.concurrent.CountDownLatch;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors; /** * CountDownLatch是个计数器,它有一个初始数, * 等待这个计数器的线程必须等到计数器倒数到零时才可继续. */public class CountDownLatchTe

  • Java concurrency之共享锁和ReentrantReadWriteLock_动力节点Java学院整理

    ReadWriteLock 和 ReentrantReadWriteLock介绍 ReadWriteLock,顾名思义,是读写锁.它维护了一对相关的锁 - - "读取锁"和"写入锁",一个用于读取操作,另一个用于写入操作. "读取锁"用于只读操作,它是"共享锁",能同时被多个线程获取. "写入锁"用于写入操作,它是"独占锁",写入锁只能被一个线程锁获取. 注意:不能同时存在读取锁和写入锁

  • Java concurrency之公平锁(二)_动力节点Java学院整理

    释放公平锁(基于JDK1.7.0_40) 1. unlock() unlock()在ReentrantLock.java中实现的,源码如下: public void unlock() { sync.release(1); } 说明: unlock()是解锁函数,它是通过AQS的release()函数来实现的. 在这里,"1"的含义和"获取锁的函数acquire(1)的含义"一样,它是设置"释放锁的状态"的参数.由于"公平锁"是

  • Java concurrency之AtomicLong原子类_动力节点Java学院整理

    AtomicLong介绍和函数列表 AtomicLong是作用是对长整形进行原子操作. 在32位操作系统中,64位的long 和 double 变量由于会被JVM当作两个分离的32位来进行操作,所以不具有原子性.而使用AtomicLong能让long的操作保持原子型. AtomicLong函数列表 // 构造函数 AtomicLong() // 创建值为initialValue的AtomicLong对象 AtomicLong(long initialValue) // 以原子方式设置当前值为ne

  • Java多线程编程之CountDownLatch同步工具使用实例

    好像倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当到达0时,所有等待者就开始执行. java.util.concurrent.CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待.用给定的计数初始化CountDownLatch.由于调用了countDown()方法,所以在当前计数到达零之前,await方法会一直受阻塞.之后,会释放所有等待的线程,await的所有后续调用都将立即返回.这种现

  • Java concurrency之AtomicReference原子类_动力节点Java学院整理

    AtomicReference介绍和函数列表 AtomicReference是作用是对"对象"进行原子操作. AtomicReference函数列表 // 使用 null 初始值创建新的 AtomicReference. AtomicReference() // 使用给定的初始值创建新的 AtomicReference. AtomicReference(V initialValue) // 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值. boolean compare

  • Java concurrency之非公平锁_动力节点Java学院整理

    获取非公平锁(基于JDK1.7.0_40) 非公平锁和公平锁在获取锁的方法上,流程是一样的:它们的区别主要表现在"尝试获取锁的机制不同".简单点说,"公平锁"在每次尝试获取锁时,都是采用公平策略(根据等待队列依次排序等待):而"非公平锁"在每次尝试获取锁时,都是采用的非公平策略(无视等待队列,直接尝试获取锁,如果锁是空闲的,即可获取状态,则获取锁). 1. lock() lock()在ReentrantLock.java的NonfairSync类

  • 详解Java多线程编程中CountDownLatch阻塞线程的方法

    直译过来就是倒计数(CountDown)门闩(Latch).倒计数不用说,门闩的意思顾名思义就是阻止前进.在这里就是指 CountDownLatch.await() 方法在倒计数为0之前会阻塞当前线程. CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待. CountDownLatch 的作用和 Thread.join() 方法类似,可用于一组线程和另外一组线程的协作.例如,主线程在做一项工作之前需要一系列的准备工作,只有这些准备工

随机推荐