深入理解java线程通信

前言

开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景。

或者是线程 A 在执行到某个条件通知线程 B 执行某个操作。

可以通过以下几种方式实现:

等待通知机制

等待通知模式是 Java 中比较经典的线程通信方式。
两个线程通过对同一对象调用等待 wait() 和通知 notify() 方法来进行通讯。

如两个线程交替打印奇偶数:

public class TwoThreadWaitNotify {
private int start = 1;
private boolean flag = false;
public static void main(String[] args) {
TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();
Thread t1 = new Thread(new OuNum(twoThread));
t1.setName("A");
Thread t2 = new Thread(new JiNum(twoThread));
t2.setName("B");
t1.start();
t2.start();
}
/**
* 偶数线程
*/
public static class OuNum implements Runnable {
private TwoThreadWaitNotify number;
public OuNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("偶数线程抢到锁了");
if (number.flag) {
System.out.println(Thread.currentThread().getName() + "+-+偶数" + number.start);
number.start++;
number.flag = false;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
/**
* 奇数线程
*/
public static class JiNum implements Runnable {
private TwoThreadWaitNotify number;
public JiNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println("奇数线程抢到锁了");
if (!number.flag) {
System.out.println(Thread.currentThread().getName() + "+-+奇数" + number.start);
number.start++;
number.flag = true;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}

输出结果:

t2+-+奇数93
t1+-+偶数94
t2+-+奇数95
t1+-+偶数96
t2+-+奇数97
t1+-+偶数98
t2+-+奇数99
t1+-+偶数100

这里的线程 A 和线程 B 都对同一个对象 TwoThreadWaitNotify.class 获取锁,A 线程调用了同步对象的 wait() 方法释放了锁并进入 WAITING 状态。

B 线程调用了 notify() 方法,这样 A 线程收到通知之后就可以从 wait() 方法中返回。

这里利用了 TwoThreadWaitNotify.class 对象完成了通信。

有一些需要注意:

  • wait() 、nofify() 、nofityAll() 调用的前提都是获得了对象的锁(也可称为对象监视器)。
  • 调用 wait() 方法后线程会释放锁,进入 WAITING 状态,该线程也会被移动到等待队列中。
  • 调用 notify() 方法会将等待队列中的线程移动到同步队列中,线程状态也会更新为 BLOCKED
  • 从 wait() 方法返回的前提是调用 notify() 方法的线程释放锁,wait() 方法的线程获得锁。

等待通知有着一个经典范式:

线程 A 作为消费者:

1.获取对象的锁。

2.进入 while(判断条件),并调用 wait() 方法。

3.当条件满足跳出循环执行具体处理逻辑。

线程 B 作为生产者:

1.获取对象锁。

2.更改与线程 A 共用的判断条件。

3.调用 notify() 方法。

伪代码如下:

//Thread A
synchronized(Object){
while(条件){
Object.wait();
}
//do something
}
//Thread B
synchronized(Object){
条件=false;//改变条件
Object.notify();
}

join() 方法

private static void join() throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
t1.start();
t2.start();
//等待线程1终止
t1.join();
//等待线程2终止
t2.join();
LOGGER.info("main over");
}

输出结果:

2018-03-16 20:21:30.967 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 20:21:30.967 [Thread-0] INFO c.c.actual.ThreadCommunication - running
2018-03-16 20:21:34.972 [main] INFO c.c.actual.ThreadCommunication - main over

t1.join() 时会一直阻塞到 t1 执行完毕,所以最终主线程会等待 t1 和 t2 线程执行完毕。

其实从源码可以看出,join() 也是利用的等待通知机制:

核心逻辑:

while (isAlive()) {
wait(0);
}

在 join 线程完成后会调用 notifyAll() 方法,是在 JVM 实现中调用,所以这里看不出来。

volatile 共享内存

因为 Java 是采用共享内存的方式进行线程通信的,所以可以采用以下方式用主线程关闭 A 线程:

public class Volatile implements Runnable{
private static volatile boolean flag = true ;
@Override
public void run() {
while (flag){
System.out.println(Thread.currentThread().getName() + "正在运行。。。");
}
System.out.println(Thread.currentThread().getName() +"执行完毕");
}
public static void main(String[] args) throws InterruptedException {
Volatile aVolatile = new Volatile();
new Thread(aVolatile,"thread A").start();
System.out.println("main 线程正在运行") ;
TimeUnit.MILLISECONDS.sleep(100) ;
aVolatile.stopThread();
}
private void stopThread(){
flag = false ;
}
}

输出结果:

thread A正在运行。。。
thread A正在运行。。。
thread A正在运行。。。
thread A正在运行。。。
thread A执行完毕

这里的 flag 存放于主内存中,所以主线程和线程 A 都可以看到。

flag 采用 volatile 修饰主要是为了内存可见性,更多内容可以查看这里。

CountDownLatch 并发工具

CountDownLatch 可以实现 join 相同的功能,但是更加的灵活。

private static void countDownLatch() throws Exception{
int thread = 3 ;
long start = System.currentTimeMillis();
final CountDownLatch countDown = new CountDownLatch(thread);
for (int i= 0 ;i<thread ; i++){
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
Thread.sleep(2000);
countDown.countDown();
LOGGER.info("thread end");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
countDown.await();
long stop = System.currentTimeMillis();
LOGGER.info("main over total time={}",stop-start);
}

输出结果:

2018-03-16 20:19:44.126 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:46.136 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [main] INFO c.c.actual.ThreadCommunication - main over total time=2012

CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 实现的,更多实现参考 ReentrantLock 实现原理

  • 初始化一个 CountDownLatch 时告诉并发的线程,然后在每个线程处理完毕之后调用 countDown() 方法。
  • 该方法会将 AQS 内置的一个 state 状态 -1 。
  • 最终在主线程调用 await() 方法,它会阻塞直到state == 0的时候返回。

CyclicBarrier 并发工具

private static void cyclicBarrier() throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("thread run");
try {
Thread.sleep(5000);
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info("thread end do something");
}
}).start();
LOGGER.info("main thread");
}

CyclicBarrier 中文名叫做屏障或者是栅栏,也可以用于线程间通信。

它可以等待 N 个线程都达到某个状态后继续运行的效果。

1.首先初始化线程参与者。

2.调用await()将会在所有参与者线程都调用之前等待。

3.直到所有参与者都调用了await()后,所有线程从await()返回继续后续逻辑。

运行结果:

2018-03-18 22:40:00.731 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [main] INFO c.c.actual.ThreadCommunication - main thread
2018-03-18 22:40:05.741 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end do something

可以看出由于其中一个线程休眠了五秒,所有其余所有的线程都得等待这个线程调用 await() 。

该工具可以实现 CountDownLatch 同样的功能,但是要更加灵活。甚至可以调用 reset() 方法重置 CyclicBarrier (需要自行捕获 BrokenBarrierException 处理) 然后重新执行。

线程响应中断

public class StopThread implements Runnable {
@Override
public void run() {
while ( !Thread.currentThread().isInterrupted()) {
// 线程执行具体逻辑
System.out.println(Thread.currentThread().getName() + "运行中。。");
}
System.out.println(Thread.currentThread().getName() + "退出。。");
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new StopThread(), "thread A");
thread.start();
System.out.println("main 线程正在运行") ;
TimeUnit.MILLISECONDS.sleep(10) ;
thread.interrupt();
}
}

输出结果:

thread A运行中。。
thread A运行中。。
thread A退出。。

可以采用中断线程的方式来通信,调用了 thread.interrupt() 方法其实就是将 thread 中的一个标志属性置为了 true。

并不是说调用了该方法就可以中断线程,如果不对这个标志进行响应其实是没有什么作用(这里对这个标志进行了判断)。

但是如果抛出了 InterruptedException 异常,该标志就会被 JVM 重置为 false。

线程池 awaitTermination() 方法

如果是用线程池来管理线程,可以使用以下方式来让主线程等待线程池中所有任务执行完毕:

private static void executorService() throws Exception{
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.shutdown();
while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
LOGGER.info("线程还在执行。。。");
}
LOGGER.info("main over");
}

输出结果:

2018-03-16 20:18:01.273 [pool-1-thread-2] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:03.278 [main] INFO c.c.actual.ThreadCommunication - 线程还在执行。。。
2018-03-16 20:18:04.278 [main] INFO c.c.actual.ThreadCommunication - main over

使用这个 awaitTermination() 方法的前提需要关闭线程池,如调用了 shutdown() 方法。

调用了 shutdown() 之后线程池会停止接受新任务,并且会平滑的关闭线程池中现有的任务。

管道通信

public static void piped() throws IOException {
//面向于字符 PipedInputStream 面向于字节
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
//输入输出流建立连接
writer.connect(reader);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running");
try {
for (int i = 0; i < 10; i++) {
writer.write(i+"");
Thread.sleep(10);
}
} catch (Exception e) {
} finally {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info("running2");
int msg = 0;
try {
while ((msg = reader.read()) != -1) {
LOGGER.info("msg={}", (char) msg);
}
} catch (Exception e) {
}
}
});
t1.start();
t2.start();
}

输出结果:

2018-03-16 19:56:43.014 [Thread-0] INFO c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=9

Java 虽说是基于内存通信的,但也可以使用管道通信。

需要注意的是,输入流和输出流需要首先建立连接。这样线程 B 就可以收到线程 A 发出的消息了。

实际开发中可以灵活根据需求选择最适合的线程通信方式。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java Socket实现单线程通信的方法示例

    本文实例讲述了Java Socket实现单线程通信的方法.分享给大家供大家参考,具体如下: 现在做Java直接使用Socket的情况是越来越少,因为有很多的选择可选,比如说可以用spring,其中就可以支持很多种远程连接的操作,另外jboss的remoting也是不错的选择,还有Apache的Mina等等,但是在有些时候一些特殊情况仍然逃脱不了直接写Socket的情况,比如公司内部一些莫名其妙的游戏规则. 废话不说了,下面就看看如果自己写Socket应该怎么做吧. 首先是写一个Server类,这

  • Java编程中实现Condition控制线程通信

    java中控制线程通信的方法 1.传统的方式:利用synchronized关键字来保证同步,结合wait(),notify(),notifyAll()控制线程通信.不灵活. 2.利用Condition控制线程通信,灵活. 3.利用管道pipe进行线程通信,不推荐 4.利用BlockingQueue控制线程通信 本文就讲解利用Condition控制线程通信,非常灵活的方式. Condition类是用来保持Lock对象的协调调用. 对Lock不了解的可以参考:Java线程同步Lock同步锁代码示例

  • Java多线程之线程通信生产者消费者模式及等待唤醒机制代码详解

    前言 前面的例子都是多个线程在做相同的操作,比如4个线程都对共享数据做tickets–操作.大多情况下,程序中需要不同的线程做不同的事,比如一个线程对共享变量做tickets++操作,另一个线程对共享变量做tickets–操作,这就是大名鼎鼎的生产者和消费者模式. 正文 一,生产者-消费者模式也是多线程 生产者和消费者模式也是多线程的范例.所以其编程需要遵循多线程的规矩. 首先,既然是多线程,就必然要使用同步.上回说到,synchronized关键字在修饰函数的时候,使用的是"this"

  • Java Socket实现多线程通信功能示例

    本文实例讲述了Java Socket实现多线程通信功能的方法.分享给大家供大家参考,具体如下: 前面的文章<Java Socket实现单线程通信的方法示例>说到怎样写一个最简单的Java Socket通信,但是文章中的例子有一个问题就是Server只能接受一个Client请求,当第一个Client连接后就占据了这个位置,后续Client不能再继续连接,所以需要做些改动,当Server没接受到一个Client连接请求之后,都把处理流程放到一个独立的线程里去运行,然后等待下一个Client连接请求

  • Java线程通信详解

    线程通信用来保证线程协调运行,一般在做线程同步的时候才需要考虑线程通信的问题. 1.传统的线程通信 通常利用Objeclt类提供的三个方法: wait() 导致当前线程等待,并释放该同步监视器的锁定,直到其它线程调用该同步监视器的notify()或者notifyAll()方法唤醒线程. notify(),唤醒在此同步监视器上等待的线程,如果有多个会任意选择一个唤醒 notifyAll() 唤醒在此同步监视器上等待的所有线程,这些线程通过调度竞争资源后,某个线程获取此同步监视器的锁,然后得以运行.

  • 深入理解Java 线程通信

    当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但 Java 也提供了一些机制来保证线程协调运行. 传统的线程通信 假设现在系统中有两个线程,这两个线程分别代表存款者和取钱者--现在假设系统有一种特殊的要求,系统要求存款者和取钱者不断地重复存款.取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱.不允许存款者连续两次存钱,也不允许取钱者连续两次取钱. 为了实现这种功能,可以借助于 Object 类提供的 wait(). notify()

  • 深入理解java线程通信

    前言 开发中不免会遇到需要所有子线程执行完毕通知主线程处理某些逻辑的场景. 或者是线程 A 在执行到某个条件通知线程 B 执行某个操作. 可以通过以下几种方式实现: 等待通知机制 等待通知模式是 Java 中比较经典的线程通信方式. 两个线程通过对同一对象调用等待 wait() 和通知 notify() 方法来进行通讯. 如两个线程交替打印奇偶数: public class TwoThreadWaitNotify { private int start = 1; private boolean

  • JAVA 线程通信相关知识汇总

    两个线程之间的通信 多线程环境下CPU会随机的在线程之间进行切换,如果想让两个线程有规律的去执行,那就需要两个线程之间进行通信,在Object类中的两个方法wait和notify可以实现通信. wait方法可以使当前线程进入到等待状态,在没有被唤醒的情况下,线程会一直保持等待状态. notify方法可以随机唤醒单个在等待状态下的线程. 来实现这样的一个功能: 让两个线程交替在控制台输出一行文字 定义一个Print类,有两个方法print1和print2,分别打印一行不同的内容 package c

  • 深入理解Java 线程池

    线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的.在jdk1.5之后这一情况有了很大的改观.Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用.为我们在开发中处理线程的问题提供了非常大的帮助. 线程池的作用: 线程池作用就是限制系统中执行线程的数量.      根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果:少了浪费了系统资源,多了造成系统拥挤效率不高.用

  • 深入理解Java线程池从设计思想到源码解读

    线程池:从设计思想到源码解析 前言初识线程池线程池优势线程池设计思路 深入线程池构造方法任务队列拒绝策略线程池状态初始化&容量调整&关闭 使用线程池ThreadPoolExecutorExecutors封装线程池 解读线程池execute()addWorker()Worker类runWorker()processWorkerExit() 前言 各位小伙伴儿,春节已经结束了,在此献上一篇肝了一个春节假期的迟来的拜年之作,希望读者朋友们都能有收获. 根据穆氏哲学,投入越多,收获越大.我作此文时

  • Java线程通信及线程虚假唤醒知识总结

    线程通信 线程在内部运行时,线程调度具有一定的透明性,程序通常无法控制线程的轮换执行.但Java本身提供了一些机制来保证线程协调运行. 假设目前系统中有两个线程,分别代表存款和取钱.当钱存进去,立马就取出来挪入指定账户.这涉及到线程间的协作,使用到Object类提供的wait().notify().notifyAll()三个方法,其不属于Thread类,而属于Object,而这三个方法必须由监视器对象来调用: synchronized修饰的方法,因为该类的默认实例(this)就是同步监视器,因此

  • 如何理解Java线程池及其使用方法

    目录 一.前言 二.总体的架构 三.研读ThreadPoolExecutor 3.1.任务缓存队列 3.2.拒绝策略 3.3.线程池的任务处理策略 3.4.线程池的关闭 3.5.源码分析 四.常见的四种线程池 4.1.newFixedThreadPool 4.2.newSingleThreadExecutor 4.3.newCachedThreadPool 4.4.newScheduledThreadPool 五.使用实例 5.1.newFixedThreadPool实例 5.2.newCach

  • 手把手带你理解java线程池之工作队列workQueue

    目录 线程池之工作队列 ArrayBlockingQueue SynchronousQueue LinkedBlockingDeque LinkedBlockingQueue LinkedTransferQueue PriorityBlockingQueue 线程池之工作队列 ArrayBlockingQueue 采用数组来实现,并采用可重入锁ReentrantLock来做并发控制,无论是添加还是读取,都先要获得锁才能进行操作 可看出进行读写操作都使用了ReentrantLock,ArrayBl

  • Java线程通信中关于生产者与消费者案例分析

    相关方法: wait():一旦执行此方法,当前线程就进入阻塞状态,并释放同步监视器. notify():一旦执行此方法,就会唤醒被wait的一个线程,如果有多个线程被wait,就唤醒优先级高的那个. notifyAll():一旦执行此方法,就会唤醒所有被wait的线程. 说明: 1.wait(),notify(),notifyAll()三个方法必须使用在同步代码块或同步方法中. 2.wait(),notify(),notifyAll()三个方法的调用者必须是同步代码块或同步方法中的同步监视器.

随机推荐