Java多线程并发生产者消费者设计模式实例解析

一、两个线程一个生产者一个消费者

需求情景

两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个。

涉及问题

  • 同步问题:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用标记或加锁机制。
  • wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
  • wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行。
  • notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

代码实现(共三个类和一个main方法的测试类)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    if (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    if (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

Producer.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer implements Runnable{

  private Resource resource;

  public Producer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.create();
    }

  }
}

Consumer.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer implements Runnable{

  private Resource resource;

  public Consumer(Resource resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      resource.destroy();
    }

  }
}

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

打印结果:

以上打印结果可以看出没有任何问题。

二、多个线程,多个生产者和多个消费者的问题

需求情景

四个线程,两个个负责生产,两个个负责消费,生产者生产一个,消费者消费一个。

涉及问题

notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的所有线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

再次测试代码

ProducerConsumerTest.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest {

  public static void main(String args[]) {
    Resource resource = new Resource();
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Producer(resource)).start();//生产者线程
    new Thread(new Consumer(resource)).start();//消费者线程
    new Thread(new Consumer(resource)).start();//消费者线程

  }
}

运行结果:

通过以上打印结果发现问题

147生产了一次,消费了两次。169生产了,而没有消费。

原因分析

当两个线程同时操作生产者生产或者消费者消费时,如果有生产者或消费者的两个线程都wait()时,再次notify(),由于其中一个线程已经改变了标记而另外一个线程再次往下直接执行的时候没有判断标记而导致的。if判断标记,只有一次,会导致不该运行的线程运行了。出现了数据错误的情况。

解决方案

while判断标记,解决了线程获取执行权后,是否要运行!也就是每次wait()后再notify()时先再次判断标记。

代码改进(Resource中的 if -> while)

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notify();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notify();
  }
}

运行结果:

再次发现问题

打印到某个值比如生产完187,程序运行卡死了,好像锁死了一样。

原因分析

notify:只能唤醒一个线程,如果本方唤醒了本方,没有意义。而且while判断标记+notify会导致”死锁”。

解决方案

notifyAll解决了本方线程一定会唤醒对方线程的问题。

最后代码改进(Resource中的 notify() -> notifyAll())

Resource.java

package com.demo.ProducerConsumer;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  /**
   * 生产资源
   */
  public synchronized void create() {
    while (flag) {//先判断标记是否已经生产了,如果已经生产,等待消费;
      try {
        wait();//让生产线程等待
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
    number++;//生产一个
    System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
    flag = true;//将资源标记为已经生产
    notifyAll();//唤醒在等待操作资源的线程(队列)
  }

  /**
   * 消费资源
   */
  public synchronized void destroy() {
    while (!flag) {
      try {
        wait();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    System.out.println(Thread.currentThread().getName() + "消费者****" + number);

    flag = false;
    notifyAll();
  }
}

运行结果:

以上就大功告成了,没有任何问题。

再来梳理一下整个流程。按照示例,生产者消费者交替运行,每次生产后都有对应的消费者,测试类创建实例,如果是生产者先运行,进入run()方法,进入create()方法,flag默认为false,number+1,生产者生产一个产品,flag置为true,同时调用notifyAll()方法,唤醒所有正在等待的线程,接下来如果还是生产者运行呢?这是flag为true,进入while循环,执行wait()方法,接下来如果是消费者运行的话,调用destroy()方法,这时flag为true,消费者购买了一次产品,随即将flag置为false,并唤醒所有正在等待的线程。这就是一次完整的多生产者对应多消费者的问题。

三、使用Lock和Condition来解决生产者消费者问题

上面的代码有一个问题,就是我们为了避免所有的线程都处于等待的状态,使用了notifyAll方法来唤醒所有的线程,即notifyAll唤醒的是自己方和对方线程。如果我需要只是唤醒对方的线程,比如:生产者只能唤醒消费者的线程,消费者只能唤醒生产者的线程。

在jdk1.5当中为我们提供了多线程的升级解决方案:

1. 将同步synchronized替换成了Lock操作。

2. 将Object中的wait,notify,notifyAll方法替换成了Condition对象。

3. 可以只唤醒对方的线程。

完整代码:

Resource1.java

package com.demo.ProducerConsumer;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 资源
 * @author lixiaoxi
 *
 */
public class Resource1 {

  /*资源序号*/
  private int number = 0;
  /*资源标记*/
  private boolean flag = false;

  private Lock lock = new ReentrantLock();
  //使用lock建立生产者的condition对象
  private Condition condition_pro = lock.newCondition();
  //使用lock建立消费者的condition对象
  private Condition condition_con = lock.newCondition(); 

  /**
   * 生产资源
   */
  public void create() throws InterruptedException {

    try{
      lock.lock();
      //先判断标记是否已经生产了,如果已经生产,等待消费
      while(flag){
        //生产者等待
        condition_pro.await();
      }
      //生产一个
      number++;
      System.out.println(Thread.currentThread().getName() + "生产者------------" + number);
      //将资源标记为已经生产
      flag = true;
      //生产者生产完毕后,唤醒消费者的线程(注意这里不是signalAll)
      condition_con.signal();
    }finally{
      lock.unlock();
    }
  }

  /**
   * 消费资源
   */
  public void destroy() throws InterruptedException{

    try{
      lock.lock();
      //先判断标记是否已经消费了,如果已经消费,等待生产
      while(!flag){
        //消费者等待
        condition_con.await();
      }

      System.out.println(Thread.currentThread().getName() + "消费者****" + number);
      //将资源标记为已经消费
      flag = false;
      //消费者消费完毕后,唤醒生产者的线程
      condition_pro.signal();
    }finally{
      lock.unlock();
    }
  }
}

Producer1.java

package com.demo.ProducerConsumer;

/**
 * 生产者
 * @author lixiaoxi
 *
 */
public class Producer1 implements Runnable{

  private Resource1 resource;

  public Producer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.create();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

Consumer1.java

package com.demo.ProducerConsumer;

/**
 * 消费者
 * @author lixiaoxi
 *
 */
public class Consumer1 implements Runnable{

  private Resource1 resource;

  public Consumer1(Resource1 resource) {
    this.resource = resource;
  }

  @Override
  public void run() {
    while (true) {
      try {
        Thread.sleep(10);
        resource.destroy();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

ProducerConsumerTest1.java

package com.demo.ProducerConsumer;

public class ProducerConsumerTest1 {

  public static void main(String args[]) {
    Resource1 resource = new Resource1();
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Producer1(resource)).start();//生产者线程
    new Thread(new Consumer1(resource)).start();//消费者线程
    new Thread(new Consumer1(resource)).start();//消费者线程

  }
}

运行结果:

四、总结

1、如果生产者、消费者都是1个,那么flag标记可以用if判断。这里有多个,必须用while判断。

2、在while判断的同时,notify函数可能唤醒本类线程(如一个消费者唤醒另一个消费者),这会导致所有消费者忙等待,程序无法继续往下执行。使用notifyAll函数代替notify可以解决这个问题,notifyAll可以保证非本类线程被唤醒(消费者线程能唤醒生产者线程,反之也可以),解决了忙等待问题。

小心假死

生产者/消费者模型最终达到的目的是平衡生产者和消费者的处理能力,达到这个目的的过程中,并不要求只有一个生产者和一个消费者。可以多个生产者对应多个消费者,可以一个生产者对应一个消费者,可以多个生产者对应一个消费者。

假死就发生在上面三种场景下。假死指的是全部线程都进入了WAITING状态,那么程序就不再执行任何业务功能了,整个项目呈现停滞状态。

比方说有生产者A和生产者B,缓冲区由于空了,消费者处于WAITING。生产者B处于WAITING,生产者A被消费者通知生产,生产者A生产出来的产品本应该通知消费者,结果通知了生产者B,生产者B被唤醒,发现缓冲区满了,于是继续WAITING。至此,两个生产者线程处于WAITING,消费者处于WAITING,系统假死。

上面的分析可以看出,假死出现的原因是因为notify的是同类,所以非单生产者/单消费者的场景,可以采取两种方法解决这个问题:

(1)synchronized用notifyAll()唤醒所有线程、ReentrantLock用signalAll()唤醒所有线程。

(2)用ReentrantLock定义两个Condition,一个表示生产者的Condition,一个表示消费者的Condition,唤醒的时候调用相应的Condition的signal()方法就可以了。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java多线程模拟电影售票过程

    用多线程模拟电影售票过程(Java实训) 实训目的: 多线程的实现.线程同步 实训要求: 总票数和售票窗口数由键盘输入,用每个线程处理一个窗口的售票. Test.java package program5; import java.util.Scanner; public class Test { public static void main(String[] args) { // TODO Auto-generated method stub Scanner sc=new Scanner(S

  • Java多线程文件分片下载实现的示例代码

    多线程下载介绍 多线程下载技术是很常见的一种下载方案,这种方式充分利用了多线程的优势,在同一时间段内通过多个线程发起下载请求,将需要下载的数据分割成多个部分,每一个线程只负责下载其中一个部分,然后将下载后的数据组装成完整的数据文件,这样便大大加快了下载效率.常见的下载器,迅雷,QQ旋风等都采用了这种技术. 分片下载 所谓分片下载就是要利用多线程的优势,将要下载的文件一块一块的分配到各个线程中去下载,这样就极大的提高了下载速度. 技术难点 并不能说是什么难点,只能说没接触过不知道罢了. 1.如何请

  • Java多线程状态及方法实例解析

    这篇文章主要介绍了Java多线程状态及方法实例解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 首先介绍线程的五种状态: 新生态:New Thread() 就绪态:准备抢CPU时间片 运行态:抢到了CPU时间片 阻塞态:放弃已经抢到的CPU时间片,且暂时不参与争抢 死亡态:Run运行完了之后 接下来介绍三种方法:线程的阻塞,线程的优先级设置,线程的礼让 public class MutliThreadDemo4 { public static

  • Java模拟多线程实现抢票代码实例

    这篇文章主要介绍了Java模拟多线程实现抢票,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 实现100张票抢购的demo 这里需要一个变量,来保存100张 局部变量: 定义在方法内,方法运行存在,方法运行结束销毁,无法保存一个持久化数据!!! 成员变量: 保存在类对象内,创建对象之后存在,对象不销毁成员变量也不会被内存收回.因为 在每一个类对象中,都存在一个对应的成员变量,这些成员变量不是同一个数据.不是 共享资源,不合适!!! 静态成员变量:

  • 基于Java实现多线程下载并允许断点续传

    完整代码:https://github.com/iyuanyb/Downloader 多线程下载及断点续传的实现是使用 HTTP/1.1 引入的 Range 请求参数,可以访问Web资源的指定区间的内容.虽然实现了多线程及断点续传,但还有很多不完善的地方. 包含四个类: Downloader: 主类,负责分配任务给各个子线程,及检测进度DownloadFile: 表示要下载的哪个文件,为了能写输入到文件的指定位置,使用 RandomAccessFile 类操作文件,多个线程写同一个文件需要保证线

  • java多线程关键字final和static详解

    这篇文章主要介绍了java多线程关键字final和static详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 final关键字 1.final关键字在单线程中的特点: 1)final修饰的静态成员:必须在进行显示初始化或静态代码块赋值,并且仅能赋值一次. 2)final修饰的类成员变量,可以在三个地方进行赋值:显示初始化.构造代码块和构造方法,并且仅能赋值一次. 3)final修饰的局部变量,必须在使用之前进行显示初始化(并不一定要在定义是

  • Java多线程生产者消费者模式实现过程解析

    单生产者与单消费者 示例: public class ProduceConsume { public static void main(String[] args) { String lock = new String(""); Produce produce = new Produce(lock); Consume consume = new Consume(lock); new Thread(() -> { while (true) { produce.setValue();

  • Java多线程 线程状态原理详解

    这篇文章主要介绍了Java多线程 线程状态原理详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 java.lang.Thread.State枚举定义了6种线程状态. NEW: 尚未启动(start)的线程的线程状态 RUNNABLE: 运行状态,但线程可能正在JVM中执行,也可能在等待CPU调度 BLOCKED: 线程阻塞,等待监视器锁以进入同步代码块/方法 WAITING: 等待状态.使用以下不带超时的方式时会进入:Object.wait.

  • Java多线程并发生产者消费者设计模式实例解析

    一.两个线程一个生产者一个消费者 需求情景 两个线程,一个负责生产,一个负责消费,生产者生产一个,消费者消费一个. 涉及问题 同步问题:如何保证同一资源被多个线程并发访问时的完整性.常用的同步方法是采用标记或加锁机制. wait() / nofity() 方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制. wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等待状态,让其他线程执行. not

  • JAVA多线程实现生产者消费者的实例详解

    JAVA多线程实现生产者消费者的实例详解 下面的代码实现了生产者消费者的问题 Product.Java package consumerProducer; public class Product { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } public Product(String id) { this.id=id; } publ

  • java多线程解决生产者消费者问题

    本文实例讲述了java多线程解决生产者消费者问题的方法.分享给大家供大家参考.具体分析如下: 题目是这样的: 采用Java 多线程技术,设计实现一个符合生产者和消费者问题的程序.对一个对象(枪膛)进行操作,其最大容量是12颗子弹.生产者线程是一个压入线程,它不断向枪膛中压入子弹:消费者线程是一个射出线程,它不断从枪膛中射出子弹. 要求: (1)给出分析过程说明. (2)程序输出,要模拟体现对枪膛的压入和射出操作: (2)设计程序时应考虑到两个线程的同步问题. 这个和著名的生产者消费者问题几乎是一

  • Java多线程并发编程和锁原理解析

    这篇文章主要介绍了Java多线程并发编程和锁原理解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.前言 最近项目遇到多线程并发的情景(并发抢单&恢复库存并行),代码在正常情况下运行没有什么问题,在高并发压测下会出现:库存超发/总库存与sku库存对不上等各种问题. 在运用了 限流/加锁等方案后,问题得到解决. 加锁方案见下文. 二.乐观锁 & 悲观锁 1.乐观锁 顾名思义,就是很乐观,每次去拿数据的时候都认为别人不会修改,所以不会上锁

  • Java多线程之生产者消费者模式详解

    目录 1.生产者消费者模型 2.实现生产者消费者模型 3.生产者消费者模型的作用是什么? 总结 问题: 1.什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型? 2. 生产者消费者模型的作用是什么? 1. 生产者消费者模型 在生产者-消费者模式中,通常有两类线程,即生产者线程(若干个)和消费者线程(若干个).生产者线程向消息队列加入数据,消费者线程则从消息队列消耗数据.生产者和消费者.消息队列之间的关系结构图如图: (1) 消息队列可以用来平衡生产和消费的线程资源: (2) 生产者仅负责产

  • Java实现Kafka生产者消费者代码实例

    Kafka的结构与RabbitMQ类似,消息生产者向Kafka服务器发送消息,Kafka接收消息后,再投递给消费者. 生产者的消费会被发送到Topic中,Topic中保存着各类数据,每一条数据都使用键.值进行保存. 每一个Topic中都包含一个或多个物理分区(Partition),分区维护着消息的内容和索引,它们有可能被保存在不同服务器. 新建一个Maven项目,pom.xml 加入依赖: <dependency> <groupId>org.apache.kafka</gro

  • Java多线程并发执行demo代码实例

    主类:MultiThread,执行并发类 package java8test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Execut

  • Java实现简易生产者消费者模型过程解析

    一.概述 一共两个线程,一个线程生产产品,一个线程消费产品,使用同步代码块方法,同步两个线程.当产品没有时,通知生产者生产,生产者生产后,通知消费者消费,并等待消费者消费完. 需要注意的是,有可能出现,停止生产产品后,消费者还没未来得及消费生产者生产的最后一个产品,就结束消费,导致最后一个产品没有被消费. 本例使用synchronize以及wait().notify()实现简易版的线程者消费者模型. 二.测试用例 这里的产品用笔来演示,每只笔都有其编号code 一共有四个类:分别是生产者类,产品

  • Java多线程锁机制相关原理实例解析

    上下文:程序运行需要的环境(外部变量) 上下文切换:将之前的程序需要的外部变量复制保存,然后切换到新的程序运行环境 系统调用:(用户态陷入操作系统,通过操作系统执行内核态指令,执行完回到用户态)用户态--内核态--用户态:两次上下文切换 线程wait()方法:将自身加入等待队列,发生了一次上下文切换 notify()方法:将线程唤醒,也发生了上下文切换 Java线程中的锁:偏向锁.轻量级锁.重量级锁. 注意:偏向锁和轻量级锁都没有发生竞争,重量级锁发生了竞争. 偏向锁:可重入和经常使用某一个线程

  • Java 多线程并发编程_动力节点Java学院整理

    一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

随机推荐