Java并发编程之线程间的通信

一、概念简介

1、线程通信

在操作系统中,线程是个独立的个体,但是在线程执行过程中,如果处理同一个业务逻辑,可能会产生资源争抢,导致并发问题,通常使用互斥锁来控制该逻辑。但是在还有这样一类场景,任务执行是有顺序控制的,例如常见的报表数据生成:

  • 启动数据分析任务,生成报表数据;
  • 报表数据存入指定位置数据容器;
  • 通知数据搬运任务,把数据写入报表库;

该场景在相对复杂的系统中非常常见,如果基于多线程来描述该过程,则需要线程之间通信协作,才能有条不紊的处理该场景业务。

2、等待通知机制

如上的业务场景,如果线程A生成数据过程中,线程B一直在访问数据容器,判断该过程的数据是否已经生成,则会造成资源浪费。正常的流程应该如图,线程A和线程B同时启动,线程A开始处理数据生成任务,线程B尝试获取容器数据,数据还没过来,线程B则进入等待状态,当线程A的任务处理完成,则通知线程B去容器中获取数据,这样基于线程等待和通知的机制来协作完成任务。

3、基础方法

等待/通知机制的相关方法是Java中Object层级的基础方法,任何对象都有该方法:

  • notify:随机通知一个在该对象上等待的线程,使其结束wait状态返回;
  • notifyAll:唤醒在该对象上所有等待的线程,进入对象锁争抢队列中;
  • wait:线程进入waiting等待状态,不会争抢锁对象,也可以设置等待时间;

线程的等待通知机制,就是基于这几个基础方法。

二、等待通知原理

1、基本原理

等待/通知机制,该模式下指线程A在不满足任务执行的情况下调用对象wait()方法进入等待状态,线程B修改了线程A的执行条件,并调用对象notify()或者notifyAll()方法,线程A收到通知后从wait状态返回,进而执行后续操作。两个线程通过基于对象提供的wait()/notify()/notifyAll()等方法完成等待和通知间交互,提高程序的可伸缩性。

2、实现案例

通过线程通信解决上述数据生成和存储任务的解耦流程。

public class NotifyThread01 {

    static Object lock = new Object() ;
    static volatile List<String> dataList = new ArrayList<>();

    public static void main(String[] args) throws Exception {
        Thread saveThread = new Thread(new SaveData(),"SaveData");
        saveThread.start();
        TimeUnit.SECONDS.sleep(3);
        Thread dataThread = new Thread(new AnalyData(),"AnalyData");
        dataThread.start();
    }
    // 等待数据生成,保存
    static class SaveData implements Runnable {
        @Override
        public void run() {
            synchronized (lock){
                while (dataList.size()==0){
                    try {
                        System.out.println(Thread.currentThread().getName()+"等待...");
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("SaveData .."+ dataList.get(0)+dataList.get(1));
            }
        }
    }
    // 生成数据,通知保存
    static class AnalyData implements Runnable {
        @Override
        public void run() {
            synchronized (lock){
                dataList.add("hello,");
                dataList.add("java");
                lock.notify();
                System.out.println("AnalyData End...");
            }
        }
    }
}

注意:除了dataList满足写条件,还要在AnalyData线程执行通知操作。

三、管道流通信

1、管道流简介

基本概念

管道流主要用于在不同线程间直接传送数据,一个线程发送数据到输出管道,另一个线程从输入管道中读取数据,进而实现不同线程间的通信。

实现分类

管道字节流:PipedInputStream和PipedOutputStream;

管道字符流:PipedWriter和PipedReader;

新IO管道流:Pipe.SinkChannel和Pipe.SourceChannel;

2、使用案例

public class NotifyThread02 {
    public static void main(String[] args) throws Exception {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream();
        // 链接输入流和输出流
        pos.connect(pis);
        // 写数据线程
        new Thread(new Runnable() {
            public void run() {
                BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                // 将从键盘读取的数据写入管道流
                PrintStream ps = new PrintStream(pos);
                while (true) {
                    try {
                        System.out.print(Thread.currentThread().getName());
                        ps.println(br.readLine());
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "输入数据线程:").start();
        // 读数据线程
        new Thread(new Runnable() {
            public void run() {
                BufferedReader br = new BufferedReader(new InputStreamReader(pis));
                while (true) {
                    try {
                        System.out.println(Thread.currentThread().getName() + br.readLine());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }, "输出数据线程:").start();
    }
}

写线程向管道流写入数据,读线程读取数据,完成基本通信流程。

四、生产消费模式

1、业务场景

基于线程等待通知机制:实现工厂生产一件商品,通知商店卖出一件商品的业务流程。

2、代码实现

public class NotifyThread03 {
    public static void main(String[] args) {
        Product product = new Product();
        ProductFactory productFactory = new ProductFactory(product);
        ProductShop productShop = new ProductShop(product);
        productFactory.start();
        productShop.start();
    }
}
// 产品
class Product {
    public String name ;
    public double price ;
    // 产品是否生产完毕,默认没有
    boolean flag ;
}
// 产品工厂:生产
class ProductFactory extends Thread {
    Product product ;
    public ProductFactory (Product product){
        this.product = product;
    }
    @Override
    public void run() {
        int i = 0 ;
        while (i < 20) {
            synchronized (product) {
                if (!product.flag){
                    if (i%2 == 0){
                        product.name = "鼠标";
                        product.price = 79.99;
                    } else {
                        product.name = "键盘";
                        product.price = 89.99;
                    }
                    System.out.println("产品:"+product.name+"【价格:"+product.price+"】出厂...");
                    product.flag = true ;
                    i++;
                    // 通知消费者
                    product.notifyAll();
                } else {
                    try {
                        // 进入等待状态
                        product.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}
// 产品商店:销售
class ProductShop extends Thread {
    Product product ;
    public ProductShop (Product product){
        this.product = product ;
    }
    @Override
    public void run() {
        while (true) {
            synchronized (product) {
                if (product.flag == true ){
                    System.out.println("产品:"+product.name+"【价格"+(product.price*2)+"】卖出...");
                    product.flag = false ;
                    product.notifyAll(); //唤醒生产者
                } else {
                    try {
                        product.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
}

流程描述:ProductFactory生成一件商品,通知商店售卖,通过flag标识判断控制是否进入等待状态,商店卖出商品后,再次通知工厂生产商品。

五、源代码地址

GitHub·地址
https://github.com/cicadasmile/java-base-parent
GitEE·地址
https://gitee.com/cicadasmile/java-base-parent

以上就是Java并发编程之线程间的通信的详细内容,更多关于Java 线程间通信的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java实现多线程聊天室

    本文实例为大家分享了Java实现多线程聊天室的具体代码,供大家参考,具体内容如下 用多线程来实现,功能会比单线程聊天室更加齐全,也更人性化一点. 多线程版本的聊天室 1. 功能分析: 实现用户注册,上线,下线 实现群聊和私聊 统计当前在线人数 2. 服务端实现 1.维护所有的在线用户 2.注册功能:客户端名称,添加到服务器的客户端集合里 3.群聊功能:客户端发送消息,所有的客户端都能接收到 4.私聊功能:客户端与指定客户端进发送和接收消息 5.退出功能: 从服务器客户端集合中移除客户端 3. 客

  • 详解Java线程的创建及休眠

    一.进程vs线程 1.进程是系统分配资源的最小单位:线程是系统调度的最小单位 2.一个进程中至少要包含一个线程 3.线程必须要依附于继承,线程是进程实质工作的一个最小单位 二.线程的创建方式 继承Thread类 实现线程的创建(2种写法) 1种写法 public class ThreadDemo03 { static class MyThread extends Thread{ @Override public void run(){ System.out.println("线程名称:"

  • Java线程通信及线程虚假唤醒知识总结

    线程通信 线程在内部运行时,线程调度具有一定的透明性,程序通常无法控制线程的轮换执行.但Java本身提供了一些机制来保证线程协调运行. 假设目前系统中有两个线程,分别代表存款和取钱.当钱存进去,立马就取出来挪入指定账户.这涉及到线程间的协作,使用到Object类提供的wait().notify().notifyAll()三个方法,其不属于Thread类,而属于Object,而这三个方法必须由监视器对象来调用: synchronized修饰的方法,因为该类的默认实例(this)就是同步监视器,因此

  • 详解在Java中如何创建多线程程序

    创建多线程程序的第一种方式:创建Thread类的子类 java.lang.Thread类:是描述线程的类,我们想要实现多线程程序,就必须继承Thread类 实现步骤: 1.创建一个Thread类的子类 2.在Thread类的子类中重写Thread类中的run方法,设置线程任务(开启线程要做什么?) 3.创建Thread类的子类对象 4.调用Thread类中的方法start方法,开启新的线程,执行run方法 void start()使该线程开始执行;Java虚拟机调用该线程的run方法. 结果是两

  • Java经典面试题汇总--多线程

    目录 1. 并行和并发有什么区别? 2. 线程和进程的区别? 3. 守护线程是什么? 4. 实现多线程的方式有哪些? 5. 说一下 runnable 和 callable 有什么区别? 6. sleep() 和 wait() 有什么区别? 7. 线程有哪些状态? 8. notify()和 notifyAll()有什么区别? 9. 线程的 run() 和 start() 有什么区别? 10. 创建线程池有哪几种方式? 11. 线程池中 submit() 和 execute() 方法有什么区别? 1

  • Java并发编程之线程间的通信

    一.概念简介 1.线程通信 在操作系统中,线程是个独立的个体,但是在线程执行过程中,如果处理同一个业务逻辑,可能会产生资源争抢,导致并发问题,通常使用互斥锁来控制该逻辑.但是在还有这样一类场景,任务执行是有顺序控制的,例如常见的报表数据生成: 启动数据分析任务,生成报表数据: 报表数据存入指定位置数据容器: 通知数据搬运任务,把数据写入报表库: 该场景在相对复杂的系统中非常常见,如果基于多线程来描述该过程,则需要线程之间通信协作,才能有条不紊的处理该场景业务. 2.等待通知机制 如上的业务场景,

  • java多线程编程学习(线程间通信)

    一.概要 线程是操作系统中独立的个体,但这些个体如果不经过特殊的处理就不能成为一个整体,线程间的通信就是成为整体的必用方案之一.可以说,使线程进行通信后,系统之间的交互性会更强大,在大大提高cpu利用率的同时还会使程序员对各线程任务在处理过程中进行有效的把控和监督. 二.等待/通知机制 1."wait/notify"机制:等待/通知机制,wait使线程暂停运行,而notify 使暂停的线程继续运行.用一个厨师和服务员的交互来说明: (1) 服务员取到菜的时间取决于厨师,所以服务员就有&

  • Java并发编程之线程之间的共享和协作

    一.线程间的共享 1.1 ynchronized内置锁 用处 Java支持多个线程同时访问一个对象或者对象的成员变量 关键字synchronized可以修饰方法或者以同步块的形式来进行使用 它主要确保多个线程在同一个时刻,只能有一个线程处于方法或者同步块中 它保证了线程对变量访问的可见性和排他性(原子性.可见性.有序性),又称为内置锁机制. 对象锁和类锁 对象锁是用于对象实例方法,或者一个对象实例上的 类锁是用于类的静态方法或者一个类的class对象上的 类的对象实例可以有很多个,但是每个类只有

  • java并发编程_线程池的使用方法(详解)

    一.任务和执行策略之间的隐性耦合 Executor可以将任务的提交和任务的执行策略解耦 只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题 1.线程饥饿死锁 类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁:表现为池不够 定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁 2.线程池大小 注意:线程池的大小还受其他的限制,如其他资源池:

  • Java 并发编程之线程挂起、恢复与终止

    挂起和恢复线程 Thread 的API中包含两个被淘汰的方法,它们用于临时挂起和重启某个线程,这些方法已经被淘汰,因为它们是不安全的,不稳定的.如果在不合适的时候挂起线程(比如,锁定共享资源时),此时便可能会发生死锁条件--其他线程在等待该线程释放锁,但该线程却被挂起了,便会发生死锁.另外,在长时间计算期间挂起线程也可能导致问题. 下面的代码演示了通过休眠来延缓运行,模拟长时间运行的情况,使线程更可能在不适当的时候被挂起: public class DeprecatedSuspendResume

  • Java并发编程之线程中断

    目录 线程中断: void interrupted()方法:中断线程,例如,当线程A运行时,线程B可以调用线程A的interrupted()方法来设置线程的中断标志为true并立即返回.设置标志仅仅是为了设置标志,线程A实际并没有被中断,它会继续往下执行,如果线程A因为调用了wait()方法,join()方法或者sleep()方法而引起的阻塞挂起,这时候若线程B调用线程A的interrupted()方法,线程A回调用这些方法的地方会抛出InterruptedException异常而返回. boo

  • Java并发编程之线程安全性

    目录 1.什么是线程安全性 2.原子性 2.1 竞争条件 2.2 复合操作 3.加锁机制 3.1 内置锁 3.2 重入 4.用锁保护状态 5.活跃性与性能 1.什么是线程安全性 当多个线程访问某个类时,不管运行时环境采用何种调用方式或者这些线程将如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类是线程安全的. 无状态的对象一定是线程安全的,比如:Servlet. 2.原子性 2.1 竞争条件 由于不恰当的执行时序而出现不正确的结果的情况,就是竞争

  • Java并发编程之线程状态介绍

    目录 线程状态概述 睡眠sleep方法 等待和唤醒 等待唤醒的一个小例子 线程状态概述 线程由生到死的完整过程: 当线程被创建并启动以后,它既不是一启动就进入了执行状态,也不是一直处于执行状态.在线程的生命周期中,有几种状态呢?在API中java.lang.Thread.State这个枚举中给出了六种线程状态: 线程状态 导致状态发生条件 NEW(新建) 线程刚被创建,但是并未启动.还没调用start方法.MyThread t = new MyThread只有线程对象,没有线程特征. Runna

  • Java并发编程之线程创建介绍

    目录 1.线程与进程 2.线程的创建与运行 1.线程与进程 进程是代码在数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,线程则是一个实体,一个进程中至少有一个线程,是CPU调度和分配的基本单位,进程中的多个线程共享进程的资源. 进程的三个特征: 动态性 : 进程是运行中的程序,要动态的占用内存,CPU和网络等资源. 独立性 : 进程与进程之间是相互独立的,彼此有自己的独立内存区域. 并发性 : 假如CPU是单核,同一个时刻其实内存中只有一个进程在被执行.CPU会分时轮询切换依次为每

  • Java 并发编程:volatile的使用及其原理解析

    Java并发编程系列[未完]: •Java 并发编程:核心理论 •Java并发编程:Synchronized及其实现原理 •Java并发编程:Synchronized底层优化(轻量级锁.偏向锁) •Java 并发编程:线程间的协作(wait/notify/sleep/yield/join) •Java 并发编程:volatile的使用及其原理 一.volatile的作用 在<Java并发编程:核心理论>一文中,我们已经提到过可见性.有序性及原子性问题,通常情况下我们可以通过Synchroniz

随机推荐