基于Java 生产者消费者模式(详细分析)

生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗。虽然它们任务不同,但处理的资源是相同的,这体现的是一种线程间通信方式。

本文将先说明单生产者单消费者的情况,之后再说明多生产者多消费者模式的情况。还会分别使用wait()/nofity()/nofityAll()机制、lock()/unlock()机制实现这两种模式。

在开始介绍模式之前,先解释下wait()、notify()和notifyAll()方法的用法细节以及改进的lock()/unlock()、await()/signal()/signalAll()的用法。

1.等待、唤醒机制的原理

wait()、notify()和notifyAll()分别表示让线程进入睡眠、唤醒睡眠线程以及唤醒所有睡眠的线程。但是,对象是哪个线程呢?另外,在API文档中描述这三个方法都必须在有效监视器(可理解为持有锁)的前提下使用。这三个方法和锁有什么关系呢?

以同步代码块synchronized(obj){}或同步函数为例,在它们的代码结构中可以使用wait()、notify()以及notifyAll(),因为它们都持有锁。

对于下面的两个同步代码块来说,分别使用的是锁obj1和锁obj2,其中线程1、线程2执行的是obj1对应的同步代码,线程3、线程4执行的是obj2对应的同步代码。

class MyLock implements Runnable {
 public int flag = 0;
 Object obj1 = new Object();
 Object obj2 = new Object();
 public void run(){
 while(true){
  if(flag%2=0){
  synchronized(obj1){ //线程t1和t2执行此同步任务
   //try{obj1.wait();}catch(InterruptedException i){}
   //obj1.notify()
   //obj1.notifyAll()
  }
  } else {
  synchronized(obj2){ //线程t3和t4执行此同步任务
   //try{obj2.wait();}catch(InterruptedException i){}
   //obj2.notify()
   //obj2.notifyAll()
  }
  }
 }
 }
}
class Demo {
 public static void main(String[] args){
 MyLock ml = new MyLock();
 Thread t1 = new Thread(ml);
 Thread t2 = new Thread(ml);
 Thread t3 = new Thread(ml);
 Thread t4 = new Thread(ml);
 t1.start();
 t2.start();
 try{Thread.sleep(1)}catch(InterruptedException i){};
 ml.flag++;
 t3.start();
 t4.start();
 }
}

当t1开始执行到wait()时,它将进入睡眠状态,但却不是一般的睡眠,而是在一个被obj1标识的线程池中睡眠(实际上是监视器对应线程池,只不过此时的监视器和锁是绑定在一起的)。当t2开始执行,它发现锁obj1被其他线程持有,它将进入睡眠态,这次睡眠是因为锁资源等待而非wait()进入的睡眠。因为t2已经判断过它要申请的是obj1锁,因此它也会进入obj1这个线程池睡眠,而不是普通的睡眠。同理t3和t4,这两个线程会进入obj2线程池睡眠。

当某个线程执行到notify()时,这个notify()将 随机 唤醒它 所属锁对应线程池 中的 任意一个 线程。例如,obj1.notify()将唤醒obj1线程池中任意一个睡眠的线程(当然,如果没有睡眠线程则什么也不做)。同理notifyAll()则是唤醒所属锁对应线程池中所有睡眠的线程。

必须要搞清楚的是"对应锁",因为在调用wait()、notify()和notifyAll()时都必须明确指定锁。例如,obj1.wait()。如果省略了所属锁,则表示的是this这个对象,也就是说,只有在非静态的同步函数中才能省略这三个方法的前缀。

简而言之,当使用了同步,就使用了锁,线程也就有了归属,它的所有依据都由所属锁来决定。例如,线程同步时,判断锁是否空闲以决定是否执行后面的代码,亦决定是否去特定的线程池中睡眠,当唤醒时也只会唤醒所属锁对应线程池中的线程。

这几个方法在应用上,一般在一次任务中,wait()和notify()/notifyAll()是成对出现且择一执行的。换句话说,就是这一轮原子性同步执行过程中,要么执行wait()进入睡眠,要么执行notify()唤醒线程池中的睡眠线程。要如何实现择一执行,可以考虑使用标记的方式来作为判断依据。参考后文的例子。

2.Lock和Condition

wait()系列的三个方法局限性很大,因为无论是睡眠还是唤醒的动作,都完全和锁耦合在一起了。例如,锁obj1关联的线程只能唤醒obj1线程池中的线程,而无法唤醒锁obj2关联的线程;再例如,在原来synchronized同步时,锁是在开始同步时隐式地自动获取的,且是在执行完一整个任务后,又隐式地自动释放锁,也就是说获取锁和释放锁的动作无法人为控制。

从JDK 1.5开始,java提供了java.util.concurrent.locks包,这个包中提供了Lock接口、Condition接口和ReadWriteLock接口,前两个接口将锁和监视器方法(睡眠、唤醒操作)解耦了。其中Lock接口只提供锁,通过锁方法newConditon()可以生成一个或多个与该锁关联的监视器,每个监视器都有自己的睡眠、唤醒方法。也就是说Lock替代了synchronized方法和同步代码块的使用,Condition替代了Object监视器方法的使用。

如下图:

当某线程执行condition1.await()时,该线程将进入condition1监视器对应的线程池睡眠,当执行condition1.signal()时,将随机唤醒condition1线程池中的任意一个线程,当执行condition1.signalAll()时,将唤醒condition1线程池中的所有线程。同理,对于condition2监视器也是一样的。

即使有多个监视器,但只要它们关联的是同一个锁对象,就可以跨监视器操作对方线程。例如condition1中的线程可以执行condition2.signal()来唤醒condition2线程池中的某个线程。

要使用这种锁、监视器的关联方式,参考如下步骤:

import java.util.concurrent.locks.*;
Lock l = new ReentrantLock();
Condition con1 = l.newCondition();
condition con2 = l.newCondition();
l.lock();
try{
 //包含await()、signal()或signalAll()的代码段...
} finally {
 l.unlock(); //由于代码段可能异常,但unlock()是必须执行的,所以必须使用try,且将unlock()放进finally段
}

具体用法见后文关于Lock、condition的示例代码。

3.单生产者单消费者模式

一个生产者线程,一个消费者线程,生产者每生产一个面包放进盘子里,消费者从盘子里取出面包进行消费。其中生产者判断是否继续生产的依据是盘子里没有面包,而消费者判断是否消费的依据是盘子里有面包。由于这个模式中,盘子一直只放一个面包,因此可以把盘子省略掉,生产者和消费者直接手把手地交递面包即可。

首先需要描述这三个类,一是多线程共同操作的资源(此处即面包),二是生产者,三是消费者。在下面的例子中,我把生产面包和消费面包的方法分别封装到了生产者和消费者类中,如果把它们封装在面包类中则更容易理解。

//描述资源:面包的名称和编号,由编号决定面包的号码
class Bread {
 public String name;
 public int count = 1;
 public boolean flag = false; //该标记为wait()和notify()提供判断标记
}
//生产者和消费者先后处理的面包资源是同一个,要确保这一点,
//可以按单例模式来设计面包类,也可以将同一个面包对象通过构造方法传递给生产者和消费者,此处使用后一种方式。
//描述生产者
class Producer implements Runnable {
 private Bread b; //生产者的成员:它要处理的资源
 Producer(Bread b){
 this.b = b;
 }
 //提供生产面包的方法
 public void produce(String name){
 b.name = name + b.count;
 b.count++;
 }
 public void run(){
 while(true){
  synchronized(Bread.class){ //使用Bread.class作为锁标识,使得生产者和消费者的同步代码块可以使用同一个锁
  if(b.flag){  //wait()必须在同步代码块内部,不仅因为必须持有锁才能睡眠,而且对锁这个资源的判断会出现混乱
   try{Bread.class.wait();}catch(InterruptedException i){}
  }
  produce("面包");
  System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name);
  try{Thread.sleep(10);}catch(InterruptedException i){}
  b.flag = true;  //标记的切换也必须在保持同步
  Bread.class.notify(); //notify()也必须同步,否则锁都已经释放了,就无法做唤醒动作
  //ps:一次同步任务中,wait()和notify()应当只能其中一个执行,否则对方线程会混乱
  }
 }
 }
}
//描述消费者
class Consumer implements Runnable {
 private Bread b; //消费者的成员:它要处理的资源
 Consumer(Bread b){
 this.b = b;
 }
 //提供消费面包的方法
 public String consume(){
 return b.name;
 }
 public void run(){
 while(true){
  synchronized(Bread.class){
  if(!b.flag){
   try{Bread.class.wait();}catch(InterruptedException i){}
  }
  System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume());
  try{Thread.sleep(10);}catch(InterruptedException i){}
  b.flag = false;
  Bread.class.notify();
  }
 }
 }
}
public class ProduceConsume_1{
 public static void main(String[] args) {
 //1.创建资源对象
 Bread b = new Bread();
 //2.创建生产者和消费者对象,将同一个面包对象传递给生产者和消费者
 Producer pro = new Producer(b);
 Consumer con = new Consumer(b);
 //3.创建线程对象
 Thread pro_t = new Thread(pro);
 Thread con_t = new Thread(con);
 pro_t.start();
 con_t.start();
 }
}

最后的执行结果应当生产一个、消费一个,如此不断循环。如下:

Thread-0----生产者------面包1
Thread-1----消费者-------------面包1
Thread-0----生产者------面包2
Thread-1----消费者-------------面包2
Thread-0----生产者------面包3
Thread-1----消费者-------------面包3
Thread-0----生产者------面包4
Thread-1----消费者-------------面包4
Thread-0----生产者------面包5
Thread-1----消费者-------------面包5
Thread-0----生产者------面包6
Thread-1----消费者-------------面包6

4.使用Lock和Condition实现单生产单消费模式

代码如下:

import java.util.concurrent.locks.*;
class Bread {
 public String name;
 public int count = 1;
 public boolean flag = false;
 //为生产者和消费者提供同一个锁对象以及同一个Condition对象
 public static Lock lock = new ReentrantLock();
 public static Condition condition = lock.newCondition();
}
class Producer implements Runnable {
 private Bread b;
 Producer(Bread b){
 this.b = b;
 }
 public void produce(String name){
 b.name = name + b.count;
 b.count++;
 }
 public void run(){
 while(true){
  //使用Bread.lock来锁住资源
  Bread.lock.lock();
  try{
  if(b.flag){
   try{Bread.condition.await();}catch(InterruptedException i){}
  }
  produce("面包");
  System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name);
  try{Thread.sleep(10);}catch(InterruptedException i){}
  b.flag = true;
  Bread.condition.signal();
  } finally {
  Bread.lock.unlock();
  }
 }
 }
}
class Consumer implements Runnable {
 private Bread b;
 Consumer(Bread b){
 this.b = b;
 }
 public String consume(){
 return b.name;
 }
 public void run(){
 while(true){
  //使用Bread.lock来锁住资源
  Bread.lock.lock();
  try{
  if(!b.flag){
   try{Bread.condition.await();}catch(InterruptedException i){}
  }
  System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume());
  try{Thread.sleep(10);}catch(InterruptedException i){}
  b.flag = false;
  Bread.condition.signal();
  } finally {
  Bread.lock.unlock();
  }
 }
 }
}
public class ProduceConsume_1{
 public static void main(String[] args) {
 //1.创建资源对象
 Bread b = new Bread();
 //2.创建生产者和消费者对象,将同一个面包对象传递给生产者和消费者
 Producer pro = new Producer(b);
 Consumer con = new Consumer(b);
 //3.创建线程对象
 Thread pro_t = new Thread(pro);
 Thread con_t = new Thread(con);
 pro_t.start();
 con_t.start();
 }
}

5.多生产多消费模式(单面包)

这里先说明多生产者多消费者,但同一个时刻最多只能有一个面包的模式,这个模式在实际中可能是不理想的,但为了引出后面真实的多生产多消费模式,我觉得有必要在这里解释这种模式,并且分析这种模式以及如何从单生产单消费的代码演变而来。

如下图:

从单生产单消费到多生产多消费,因为多线程安全问题和死锁问题,所以有两个方面的问题需要考虑:

对于某一方来说,如何让多线程达到和单线程同样的生产或消费能力?也就是说,如何让多线程看上去就是单线程。多线程和单线程最大的区别在于多线程安全问题,因此,只要保证多线程执行的任务能够同步即可。

第1个问题考虑的是某一方多线程的问题,第2个问题考虑的是两方如何能和谐配合完成生产消费问题。也就是如何保证生产方和消费方一方活动的同时另一方睡眠。只需在某一方执行完同步任务时,唤醒另一方即可。

其实从单线程到多线程,就两个问题需要考虑:不同步和死锁。(1)当生产方和消费方都出现了多线程,可以将生产方的多线程看成一个线程整体、消费方的多线程也看成一个整体,这解决的是线程安全问题。(2)再将生产方整体和消费方整体两方结合起来看成多线程,来解决死锁问题,而java中解决死锁的方式就是唤醒对方或唤醒所有。

问题是如何保证某一方的多线程之间同步?以多线程执行单消费方的代码为例进行分析。

while(true){
 synchronized(Bread.class){
 if(!b.flag){
  try{Bread.class.wait();}catch(InterruptedException i){}
 }
 System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume());
 try{Thread.sleep(10);}catch(InterruptedException i){}
 b.flag = false;
 Bread.class.notify();
 }
}

假设消费线程1消费完一个面包后唤醒了消费线程2,并继续循环,判断if(!flag),它将wait,于是锁被释放。假设CPU正好选中了消费线程2,那么消费线程2也将进入wait。当生产方生产了一个面包后,假设唤醒了消费线程1,它将从wait语句处继续向下消费刚生产完的面包,假设正好再次唤醒了消费线程2,当消费线程2被CPU选中后,消费线程2也将从wait语句处向下消费,消费的也是刚才生产的面包,问题再此出现了,连续唤醒的消费线程1和2消费的是同一个面包,也就是说面包被重复消费了。这又是多线程不同步问题。

说了一大段,其实将视线放大后分析就很简单了,只要某一方的2个或多个线程都因为判断b.flag而wait,那么这两个或多个线程有可能会被连续唤醒而继续向下生产或消费。这造成了多线程不同步问题。

不安全的问题就出在同一方的多个线程在连续唤醒后继续向下生产或消费。这是if语句引起的,如果能够让wait的线程在唤醒后还回头判断b.flag是否为true,就能让其决定是否继续wait还是向下生产或消费。

可以将if语句替换为while语句来满足要求。这样一来,无论某一方的多个线程是否被连续唤醒,它们都将回头判断b.flag。

while(true){
 synchronized(Bread.class){
 while(!b.flag){
  try{Bread.class.wait();}catch(InterruptedException i){}
 }
 System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume());
 try{Thread.sleep(10);}catch(InterruptedException i){}
 b.flag = false;
 Bread.class.notify();
 }
}

解决了第一个多线程安全的问题,但会出现死锁问题。这很容易分析,将生产方看作一个整体,将消费方也看作一个整体,当生产方线程都wait了(生产方的线程被连续唤醒时会出现该方线程全部wait),消费方也都wait了,死锁就出现了。其实放大了看,将生产方、消费方分别看作一个线程,这两个线程组成多线程,当某一方wait后无法唤醒另一方,另一方也一定会wait,于是就死锁了。

对于双方死锁的问题,只要保证能唤醒对方,而非本方连续唤醒就能解决。使用notifyAll()或signalAll()即可,也可以通过signal()唤醒对方线程解决,见下面的第二段代码。

根据上面的分析,将单生产、单消费模式的代码改进一下,就可以变为多生产多消费单面包模式。

//代码段1
class Bread {
 public String name;
 public int count = 1;
 public boolean flag = false;
}
//描述生产者
class Producer implements Runnable {
 private Bread b;
 Producer(Bread b){
  this.b = b;
 }
 public void produce(String name){
  b.name = name + b.count;
  b.count++;
 }
 public void run(){
  while(true){
    synchronized(Bread.class){
     while(b.flag){
      try{Bread.class.wait();}catch(InterruptedException i){}
     }
     produce("面包");
     System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name);
     try{Thread.sleep(10);}catch(InterruptedException i){}
     b.flag = true;
     Bread.class.notifyAll();
   }
  }
 }
}
//描述消费者
class Consumer implements Runnable {
 private Bread b;
 Consumer(Bread b){
  this.b = b;
 }
 public String consume(){
  return b.name;
 }
 public void run(){
  while(true){
    synchronized(Bread.class){
     while(!b.flag){
      try{Bread.class.wait();}catch(InterruptedException i){}
     }
     System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume());
     try{Thread.sleep(10);}catch(InterruptedException i){}
     b.flag = false;
     Bread.class.notifyAll();
    }
  }
 }
}
public class ProduceConsume_5 {
 public static void main(String[] args) {
  //1.创建资源对象
  Bread b = new Bread();
  //2.创建生产者和消费者对象
  Producer pro = new Producer(b);
  Consumer con = new Consumer(b);
  //3.创建线程对象
  Thread pro_t1 = new Thread(pro); //生产线程1
  Thread pro_t2 = new Thread(pro); //生产线程2
  Thread con_t1 = new Thread(con); //消费线程1
  Thread con_t2 = new Thread(con); //消费线程2
  pro_t1.start();
  pro_t2.start();
  con_t1.start();
  con_t2.start();
 }
}

以下是采用Lock和Conditon重构后的代码,使用的是signal()唤醒对方线程的方法。

//代码段2
import java.util.concurrent.locks.*;
class Bread {
 public String name;
 public int count = 1;
 public boolean flag = false;
 public static Lock lock = new ReentrantLock();
 public static Condition pro_con = lock.newCondition();
 public static Condition con_con = lock.newCondition();
}
//描述生产者
class Producer implements Runnable {
 private Bread b;
 Producer(Bread b){
  this.b = b;
 }
 public void produce(String name){
  b.name = name + b.count;
  b.count++;
 }
 public void run(){
  while(true){
   Bread.lock.lock();
   try{
    while(b.flag){
     try{Bread.pro_con.await();}catch(InterruptedException i){}
    }
    produce("面包");
    System.out.println(Thread.currentThread().getName()+"----生产者------"+b.name);
    try{Thread.sleep(10);}catch(InterruptedException i){}
    b.flag = true;
    Bread.con_con.signal(); //唤醒的是consumer线程
   } finally {
    Bread.lock.unlock();
   }
  }
 }
}
//描述消费者
class Consumer implements Runnable {
 private Bread b;
 Consumer(Bread b){
  this.b = b;
 }
 public String consume(){
  return b.name;
 }
 public void run(){
  while(true){
   Bread.lock.lock();
   try{
    while(!b.flag){
     try{Bread.con_con.await();}catch(InterruptedException i){}
    }
    System.out.println(Thread.currentThread().getName()+"----消费者-------------"+consume());
    try{Thread.sleep(10);}catch(InterruptedException i){}
    b.flag = false;
    Bread.pro_con.signal();  //唤醒的是producer线程
   } finally {
    Bread.lock.unlock();
   }
  }
 }
}
public class ProduceConsume_6 {
 public static void main(String[] args) {
  //1.创建资源对象
  Bread b = new Bread();
  //2.创建生产者和消费者对象
  Producer pro = new Producer(b);
  Consumer con = new Consumer(b);
  //3.创建线程对象
  Thread pro_t1 = new Thread(pro);
  Thread pro_t2 = new Thread(pro);
  Thread con_t1 = new Thread(con);
  Thread con_t2 = new Thread(con);
  pro_t1.start();
  pro_t2.start();
  con_t1.start();
  con_t2.start();
 }
}

关于多生产、多消费问题做个总结:

(1).解决某一方多线程不同步的方案是使用while(flag)来判断是否wait;

(2).解决双方死锁问题的方案是唤醒对方,可以使用notifyAll(),signalAll()或对方监视器的signal()方法。

6.多生产多消费模式

有多个生产者线程,多个消费者线程,生产者将生产的面包放进篮子(集合或数组)里,消费者从篮子里取出面包。生产者判断继续生产的依据是篮子已经满了,消费者判断继续消费的依据是篮子是否空了。此外,当消费者取出面包后,对应的位置又空了,生产者可以回头从篮子的起始位置继续生产,这可以通过重置篮子的指针来实现。

在这个模式里,除了描述生产者、消费者、面包,还需要描述篮子这个容器。假设使用数组作为容器,生产者每生产一个,生产指针向后移位,消费者每消费一个,消费指针向后移位。

代码如下:可参考API-->Condition类中给出的示例代码

import java.util.concurrent.locks.*;
class Basket {
 private Bread[] arr;
 //the size of basket
 Basket(int size){
  arr = new Bread[size];
 }
 //the pointer of in and out
 private int in_ptr,out_ptr;
 //how many breads left in basket
 private int left;
 private Lock lock = new ReentrantLock();
 private Condition full = lock.newCondition();
 private Condition empty = lock.newCondition();
 //bread into basket
 public void in(){
  lock.lock();
  try{
   while(left == arr.length){
    try{full.await();} catch (InterruptedException i) {i.printStackTrace();}
   }
   arr[in_ptr] = new Bread("MianBao",Producer.num++);
   System.out.println("Put the bread: "+arr[in_ptr].getName()+"------into basket["+in_ptr+"]");
   left++;
   if(++in_ptr == arr.length){in_ptr = 0;}
   empty.signal();
  } finally {
   lock.unlock();
  }
 }
 //bread out from basket
 public Bread out(){
  lock.lock();
  try{
   while(left == 0){
    try{empty.await();} catch (InterruptedException i) {i.printStackTrace();}
   }
   Bread out_bread = arr[out_ptr];
   System.out.println("Get the bread: "+out_bread.getName()+"-----------from basket["+out_ptr+"]");
   left--;
   if(++out_ptr == arr.length){out_ptr = 0;}
   full.signal();
   return out_bread;
  } finally {
   lock.unlock();
  }
 }
}
class Bread {
 private String name;
 Bread(String name,int num){
  this.name = name + num;
 }
 public String getName(){
  return this.name;
 }
}
class Producer implements Runnable {
 private Basket basket;
 public static int num = 1; //the first number for Bread's name
 Producer(Basket b){
  this.basket = b;
 }
 public void run(){
  while(true) {
   basket.in();
   try{Thread.sleep(10);}catch(InterruptedException i){}
  }
 }
}
class Consumer implements Runnable {
 private Basket basket;
 private Bread i_get;
 Consumer(Basket b){
  this.basket = b;
 }
 public void run(){
  while(true){
   i_get = basket.out();
   try{Thread.sleep(10);}catch(InterruptedException i){}
  }
 }
}
public class ProduceConsume_7 {
 public static void main(String[] args) {
  Basket b = new Basket(20); // the basket size = 20
  Producer pro = new Producer(b);
  Consumer con = new Consumer(b);
  Thread pro_t1 = new Thread(pro);
  Thread pro_t2 = new Thread(pro);
  Thread con_t1 = new Thread(con);
  Thread con_t2 = new Thread(con);
  Thread con_t3 = new Thread(con);
  pro_t1.start();
  pro_t2.start();
  con_t1.start();
  con_t2.start();
  con_t3.start();
 }
}

这里涉及了消费者、生产者、面包和篮子,其中面包和篮子是多线程共同操作的资源,生产者线程生产面包放进篮子,消费者线程从篮子中取出面包。理想的代码是将生产任务和消费任务都封装在资源类中,因为面包是篮子容器的元素,所以不适合封装到面包类中,而且封装到篮子中,能更方便地操作容器。

注意,一定要将所有涉及资源操作的代码都放进锁的内部,否则会产生多线程不同步问题。例如,在Producer类中定义了生产面包的方法produce(),然后将其作为放进篮子的方法basket.in()的参数,即basket.in(producer()),这是错误的行为,因为produce()是在锁的外部执行后才传递给in()方法的。

以上这篇基于Java 生产者消费者模式(详细分析)就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

您可能感兴趣的文章:

  • JAVA生产者消费者(线程同步)代码学习示例
  • Java并发编程中的生产者与消费者模型简述
  • Java 生产者/消费者问题实例详解
  • JAVA多线程实现生产者消费者的实例详解
(0)

相关推荐

  • Java 生产者/消费者问题实例详解

    生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,如下图所示,生产者向空间里存放数据,而消费者取用数据,如果不加以协调可能会出现以下情况: 存储空间已满,而生产者占用着它,消费者等着生产者让出空间从而去除产品,生产者等着消费者消费产品,从而向空间中添加产品.互相等待,从而发生死锁. 以下实例演示了如何通过线程解决生产者/消费者问题: /* author by w3cschool.cc ProducerConsumerTest.java */ public

  • JAVA生产者消费者(线程同步)代码学习示例

    一.问题描述 生产者消费者问题是一个典型的线程同步问题.生产者生产商品放到容器中,容器有一定的容量(只能顺序放,先放后拿),消费者消费商品,当容器满了后,生产者等待,当容器为空时,消费者等待.当生产者将商品放入容器后,通知消费者:当消费者拿走商品后,通知生产者. 二.解决方案 对容器资源加锁,当取得锁后,才能对互斥资源进行操作. 复制代码 代码如下: public class ProducerConsumerTest { public static void main(String []args

  • Java并发编程中的生产者与消费者模型简述

    概述 对于多线程程序来说,生产者和消费者模型是非常经典的模型.更加准确的说,应该叫"生产者-消费者-仓库模型".离开了仓库,生产者.消费者就缺少了共用的存储空间,也就不存在并非协作的问题了. 示例 定义一个场景.一个仓库只允许存放10件商品,生产者每次可以向其中放入一个商品,消费者可以每次从其中取出一个商品.同时,需要注意以下4点: 1.  同一时间内只能有一个生产者生产,生产方法需要加锁synchronized. 2.  同一时间内只能有一个消费者消费,消费方法需要加锁synchro

  • JAVA多线程实现生产者消费者的实例详解

    JAVA多线程实现生产者消费者的实例详解 下面的代码实现了生产者消费者的问题 Product.Java package consumerProducer; public class Product { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } public Product(String id) { this.id=id; } publ

  • 基于Java 生产者消费者模式(详细分析)

    生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗.虽然它们任务不同,但处理的资源是相同的,这体现的是一种线程间通信方式. 本文将先说明单生产者单消费者的情况,之后再说明多生产者多消费者模式的情况.还会分别使用wait()/nofity()/nofityAll()机制.lock()/unlock()机制实现这两种模式. 在开始介绍模式之前,先解释下wait().notify()和no

  • Java生产者消费者模式实例分析

    本文实例讲述了Java生产者消费者模式.分享给大家供大家参考,具体如下: java的生产者消费者模式,有三个部分组成,一个是生产者,一个是消费者,一个是缓存. 这么做有什么好处呢? 1.解耦(去依赖),如果是消费者直接调用生产者,那如果生产者的代码变动了,消费者的代码也需要随之变动 2.高效,如果消费者直接掉生产者,执行时间较长的话,会阻塞,影响其他业务的进行 3.负载均衡,如果消费者直接调生产者,那生产者和消费者就得在一起了,日后业务量非常大的话,要想减轻服务器的压力,想拆分生产和消费,就很困

  • java wait()/notify() 实现生产者消费者模式详解

    java wait()/notify() 实现生产者消费者模式 java中的多线程会涉及到线程间通信,常见的线程通信方式,例如共享变量.管道流等,这里我们要实现生产者消费者模式,也需要涉及到线程通信,不过这里我们用到了java中的wait().notify()方法: wait():进入临界区的线程在运行到一部分后,发现进行后面的任务所需的资源还没有准备充分,所以调用wait()方法,让线程阻塞,等待资源,同时释放临界区的锁,此时线程的状态也从RUNNABLE状态变为WAITING状态: noti

  • Java多线程之线程通信生产者消费者模式及等待唤醒机制代码详解

    前言 前面的例子都是多个线程在做相同的操作,比如4个线程都对共享数据做tickets–操作.大多情况下,程序中需要不同的线程做不同的事,比如一个线程对共享变量做tickets++操作,另一个线程对共享变量做tickets–操作,这就是大名鼎鼎的生产者和消费者模式. 正文 一,生产者-消费者模式也是多线程 生产者和消费者模式也是多线程的范例.所以其编程需要遵循多线程的规矩. 首先,既然是多线程,就必然要使用同步.上回说到,synchronized关键字在修饰函数的时候,使用的是"this"

  • Java多线程之生产者消费者模式详解

    目录 1.生产者消费者模型 2.实现生产者消费者模型 3.生产者消费者模型的作用是什么? 总结 问题: 1.什么是阻塞队列?如何使用阻塞队列来实现生产者-消费者模型? 2. 生产者消费者模型的作用是什么? 1. 生产者消费者模型 在生产者-消费者模式中,通常有两类线程,即生产者线程(若干个)和消费者线程(若干个).生产者线程向消息队列加入数据,消费者线程则从消息队列消耗数据.生产者和消费者.消息队列之间的关系结构图如图: (1) 消息队列可以用来平衡生产和消费的线程资源: (2) 生产者仅负责产

  • Java RabbitMQ高级特性详细分析

    目录 消息的可靠投递 确认模式 退回模式 Consumer Ack 消费端限流 TTL(Time To Live) 设置某个队列为过期队列 设置单独某个消息过期 死信队列 延迟队列 消息的可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 return 退回模式 rabbitmq整个消息投递的路径为: producer—>rabbitmq broker—>

  • Lock、Condition实现简单的生产者消费者模式示例

    复制代码 代码如下: package condition; import java.util.ArrayList;import java.util.List;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock; /** * 利用Lock.Condition实现生产者消费者模式 * @aut

  • Python教程之生产者消费者模式解析

    为什么使用生产者消费者模式 在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程.在多线程开发中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完才能继续生产数据.同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者.为了解决这个问题引入了生产者和消费者模式. 什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用通

  • 详解Python 模拟实现生产者消费者模式的实例

    详解Python 模拟实现生产者消费者模式的实例 散仙使用python3.4模拟实现的一个生产者与消费者的例子,用到的知识有线程,队列,循环等,源码如下: Python代码 import queue import time import threading import random q=queue.Queue(5) #生产者 def pr(): name=threading.current_thread().getName() print(name+"线程启动......") for

  • 非常适合新手学生的Java线程池超详细分析

    目录 线程池的好处 创建线程池的五种方式 缓存线程池CachedThreadPool 固定容量线程池FixedThreadPool 单个线程池SingleThreadExecutor 定时任务线程池ScheduledThreadPool ThreadPoolExecutor创建线程池(十分推荐) ThreadPoolExecutor的七个参数详解 workQueue handler 如何触发拒绝策略和线程池扩容? 线程池的好处 可以实现线程的复用,避免重新创建线程和销毁线程.创建线程和销毁线程对

随机推荐