Java多线程中不同条件下编写生产消费者模型方法介绍

简介:

生产者、消费者模型是多线程编程的常见问题,最简单的一个生产者、一个消费者线程模型大多数人都能够写出来,但是一旦条件发生变化,我们就很容易掉进多线程的bug中。这篇文章主要讲解了生产者和消费者的数量,商品缓存位置数量,商品数量等多个条件的不同组合下,写出正确的生产者消费者模型的方法。

欢迎探讨,如有错误敬请指正

生产消费者模型

    生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品。生产消费者模式如下图。

定义商品类

package demo;
/*定义商品*/
public class Goods {
	public final String name;
	public final int price;
	public final int id;
	public Goods(String name, int price, int id){
		this.name = name;
		/*类型*/
		this.price = price;
		/*价格*/
		this.id = id;
		/*商品序列号*/
	}
	@Override
	  public String toString(){
		return "name: " + name + ",  price:"+ price + ",  id: " + id;
	}
}

基本要求:

1)生产者不能重复生产一个商品,也就是说不能有两个id相同的商品

2)生产者不能覆盖一个商品(当前商品还未被消费,就被下一个新商品覆盖)。也就是说消费商品时,商品的id属性可以不连续,但不能出现缺号的情况

3)消费者不能重复消费一个商品

1.生产者线程无线生产,消费者线程无限消费的模式

1.1使用线程对象,一个生产者线程,一个消费者线程,一个商品存储位置

package demo;
import java.util.Random;
/*使用线程对象,一个缓存位置,一个生产者,一个消费者,无限生产商品消费商品*/
public class ProducterComsumerDemo1 {
	/*定义一个商品缓存位置*/
	private volatile Goods goods;
	/*定义一个对象作为锁,不使用goods作为锁是因为生产者每次会产生一个新的对象*/
	private Object obj = new Object();
	/*isFull == true 生产者线程休息,消费者线程消费
   *isFull == false 消费者线程休息,生产者线程生产*/
	private volatile Boolean isFull = false;
	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;
	/*随机产生一个sleep时间*/
	private Random rnd = new Random();
	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			try{
				while(true){
					/*获取obj对象的锁, id 和 isFull 的操作都在同步代码块中*/
					synchronized(obj){
						if(!isFull){
							/*wait方法使当前线程阻塞,并释放锁*/
							obj.wait();
						}
						/*随机延时一段时间*/
						Thread.sleep(rnd.nextint(250));
						/*模拟消费商品*/
						System.out.println(goods);
						/*随机延时一段时间*/
						Thread.sleep(rnd.nextint(250));
						isFull = false;
						/*唤醒阻塞obj上的生产者线程*/
						obj.notify();
					}
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(250));
				}
			}
			catch (InterruptedException e){
				/*什么都不做*/
			}
		}
	}
	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			try {
				while(true){
					synchronized(obj){
						if(isFull){
							obj.wait();
						}
						Thread.sleep(rnd.nextint(500));
						/*如果id为偶数,生产价格为2的产品A
             *如果id为奇数,生产价格为1的产品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						} else{
							goods = new Goods("B", 1, id);
						}
						Thread.sleep(rnd.nextint(250));
						id++;
						isFull = true;
						/*唤醒阻塞的消费者线程*/
						obj.notify();
					}
				}
			}
			catch (InterruptedException e) {
				/*什么都不做*/
			}
		}
	}
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		new Thread(p).start();
		new Thread(c).start();
	}
}

运行结果

name: B,  price:1,  id: 1
name: A,  price:2,  id: 2
name: B,  price:1,  id: 3
name: A,  price:2,  id: 4
name: B,  price:1,  id: 5
name: A,  price:2,  id: 6
name: B,  price:1,  id: 7
name: A,  price:2,  id: 8
name: B,  price:1,  id: 9
name: A,  price:2,  id: 10
name: B,  price:1,  id: 11
name: A,  price:2,  id: 12
name: B,  price:1,  id: 13
……

从结果看出,商品类型交替生产,每个商品的id都不相同,且不会漏过任何一个id,生产者没有重复生产,消费者没有重复消费,结果完全正确。

1.2.使用线程对象,多个生产者线程,多个消费者线程,1个缓存位置

1.2.1一个经典的bug

对于多生产者,多消费者这个问题,看起来我们似乎不用修改代码,只需在main方法中多添加几个线程就好。假设我们需要三个消费者,一个生产者,那么我们只需要在main方法中再添加两个消费者线程。

public static void main(String[] args) throws InterruptedException{
	ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
	Runnable c = pcd.new ComsumeThread();
	Runnable p = pcd.new ProductThread();
	new Thread(c).start();
	new Thread(p).start();
	new Thread(c).start();
	new Thread(c).start();
}

运行结果

name: B,  price:1,  id: 1
name: A,  price:2,  id: 2
name: A,  price:2,  id: 2
name: B,  price:1,  id: 3
name: B,  price:1,  id: 3
name: A,  price:2,  id: 4
name: A,  price:2,  id: 4
name: B,  price:1,  id: 5
name: B,  price:1,  id: 5
name: A,  price:2,  id: 6
……

从结果中,我们发现消费者重复消费了商品,所以这样做显然是错误的。这里我们定义多个消费者,一个生产者,所以遇到了重复消费的问题,如果定义成一个消费者,多个生产者就会遇到id覆盖的问题。如果我们定义多个消费者,多个生产者,那么即会遇到重复消费,也会遇到id覆盖的问题。注意,上面的代码使用的notifyAll唤醒方法,如果使用notify方法唤醒bug仍然可能发生。

现在我们来分析一下原因。当生产者生产好了商品,会唤醒因没有商品而阻塞消费者线程,假设唤醒的消费者线程超过两个,这两个线程会竞争获取锁,获取到锁的线程就会从obj.wait()方法中返回,然后消费商品,并把isFull置为false,然后释放锁。当被唤醒的另一个线程竞争获取到锁了以后也会从obj.wait()方法中返回。会再次消费同一个商品。显然,每一个被唤醒的线程应该再次检查isFull这个条件。所以无论是消费者,还是生产者,isFull的判断必须改成while循环,这样才能得到正确的结果而不受生产者的线程数和消费者的线程数的影响。

而对于只有一个生产者线程,一个消费者线程,用if判断是没有问题的,但是仍然强烈建议改成while语句进行判断。

1.2.2正确的姿势

package demo;
import java.util.Random;
/*使用线程对象,一个缓存位置,一个生产者,一个消费者,无限生产商品消费商品*/
public class ProducterComsumerDemo1 {
	/*定义一个商品缓存位置*/
	private volatile Goods goods;
	/*定义一个对象作为锁,不使用goods作为锁是因为生产者每次会产生一个新的对象*/
	private Object obj = new Object();
	/*isFull == true 生产者线程休息,消费者线程消费
   *isFull == false 消费者线程消费,生产者线程生产*/
	private volatile Boolean isFull = false;
	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;
	/*随机产生一个sleep时间*/
	private Random rnd = new Random();
	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			try{
				while(true){
					/*获取obj对象的锁, id 和 isFull 的操作都在同步代码块中*/
					synchronized(obj){
						while(!isFull){
							/*wait方法使当前线程阻塞,并释放锁*/
							obj.wait();
						}
						/*随机延时一段时间*/
						Thread.sleep(rnd.nextint(250));
						/*模拟消费商品*/
						System.out.println(goods);
						/*随机延时一段时间*/
						Thread.sleep(rnd.nextint(250));
						isFull = false;
						/*唤醒阻塞obj上的生产者线程*/
						obj.notifyAll();
					}
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(250));
				}
			}
			catch (InterruptedException e){
				/*我就是任性,这里什么都不做*/
			}
		}
	}
	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			try {
				while(true){
					synchronized(obj){
						while(isFull){
							obj.wait();
						}
						Thread.sleep(rnd.nextint(500));
						/*如果id为偶数,生产价格为2的产品A
               如果id为奇数,生产价格为1的产品B*/
						if(id % 2 == 0){
							goods = new Goods("A", 2, id);
						} else{
							goods = new Goods("B", 1, id);
						}
						Thread.sleep(rnd.nextint(250));
						id++;
						isFull = true;
						/*唤醒阻塞的消费者线程*/
						obj.notifyAll();
					}
				}
			}
			catch (InterruptedException e) {
				/*我就是任性,这里什么都不做*/
			}
		}
	}
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo1 pcd = new ProducterComsumerDemo1();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(c).start();
		new Thread(c).start();
		new Thread(c).start();
	}
}

1.3使用线程对象,多个缓存位置(有界),多生产者,多消费者

1)当缓存位置满时,我们应该阻塞生产者线程

2)当缓存位置空时,我们应该阻塞消费者线程

下面的代码我没有用java对象内置的锁,而是用了ReentrantLock对象。是因为普通对象的锁只有一个阻塞队列,如果使用notify方式,无法保证唤醒的就是特定类型的线程(消费者线程或生产者线程),而notifyAll方法会唤醒所有的线程,当剩余的缓存商品的数量小于生产者线程数量或已缓存商品的数量小于消费者线程时效率就比较低。所以这里我们通过ReentrantLock对象构造两个阻塞队列提高效率。

1.3.1普通方式

package demo;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*使用线程对象,多个缓存位置(有界),多生产者,多消费者,无限循环模式*/
public class ProducterComsumerDemo2 {
	/*最大缓存商品数*/
	private final int MAX_SLOT = 2;
	/*定义缓存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();
	/*定义线程锁和锁对应的阻塞队列*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();
	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;
	/*随机产生一个sleep时间*/
	private Random rnd = new Random();
	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				/*加锁,queue的出列操作都在同步代码块中*/
				lock.lock();
				try {
					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();
					}
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(200));
					/*模拟消费商品*/
					Goods goods = queue.remove();
					System.out.println(goods);
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(200));
					/*唤醒阻塞的生产者线程*/
					full.signal();
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
				finally{
					lock.unlock();
				}
				/*释放锁后随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextint(200));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}
	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				/*加锁,queue的入列操作,id操作都在同步代码块中*/
				lock.lock();
				try{
					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}
					Thread.sleep(rnd.nextint(200));
					Goods goods = null;
					/*根据序号产生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
						break;
						case 1 : goods = new Goods("B", 2, id);
						break;
						case 2 : goods = new Goods("C", 3, id);
						break;
					}
					Thread.sleep(rnd.nextint(200));
					queue.add(goods);
					id++;
					/*唤醒阻塞的消费者线程*/
					empty.signal();
				}
				catch(InterruptedException e){
					/*什么都不做*/
				}
				finally{
					lock.unlock();
				}
				/*释放锁后随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextint(100));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo2 pcd = new ProducterComsumerDemo2();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		/*两个生产者线程,两个消费者线程*/
		new Thread(p).start();
		new Thread(p).start();
		new Thread(c).start();
		new Thread(c).start();
	}
}

运行结果

queue is empty
queue is empty
name: B,  price:2,  id: 1
name: C,  price:3,  id: 2
name: A,  price:1,  id: 3
queue is full
name: B,  price:2,  id: 4
name: C,  price:3,  id: 5
queue is full
name: A,  price:1,  id: 6
name: B,  price:2,  id: 7
name: C,  price:3,  id: 8
name: A,  price:1,  id: 9
name: B,  price:2,  id: 10
name: C,  price:3,  id: 11
name: A,  price:1,  id: 12
name: B,  price:2,  id: 13
name: C,  price:3,  id: 14
……

1.3.2 更优雅的实现方式

下面使用线程池(ThreadPool)和阻塞队列(LinkedBlockingQueue)原子类(AtomicInteger)以更加优雅的方式实现上述功能。LinkedBlockingQueue阻塞队列仅在take和put方法上锁,所以id必须定义为原子类。

package demo;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/*使用线程对象,多个缓存位置(有界),多生产者,多消费者,无限循环模式*/
public class ProducterComsumerDemo4 {
	/*最大缓存商品数*/
	private final int MAX_SLOT = 3;
	/*定义缓存商品的容器*/
	private LinkedBlockingQueue<Goods> queue = new LinkedBlockingQueue<Goods>(MAX_SLOT);
	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private AtomicInteger id = new AtomicInteger(1);
	/*随机产生一个sleep时间*/
	private Random rnd = new Random();
	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				try {
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(200));
					/*模拟消费商品*/
					Goods goods = queue.take();
					System.out.println(goods);
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(200));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
		}
	}
	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				try{
					int x = id.getAndIncrement();
					Goods goods = null;
					Thread.sleep(rnd.nextint(200));
					/*根据序号产生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
						break;
						case 1 : goods = new Goods("B", 2, x);
						break;
						case 2 : goods = new Goods("C", 3, x);
						break;
					}
					Thread.sleep(rnd.nextint(200));
					queue.put(goods);
					Thread.sleep(rnd.nextint(100));
				}
				catch(InterruptedException e){
					/*什么都不做*/
				}
			}
		}
	}
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo4 pcd = new ProducterComsumerDemo4();
		Runnable c = pcd.new ComsumeThread();
		Runnable p = pcd.new ProductThread();
		/*定义线程池*/
		ExecutorService es = Executors.newCachedThreadPool();
		/*三个生产者线程,两个消费者线程*/
		es.execute(p);
		es.execute(p);
		es.execute(p);
		es.execute(c);
		es.execute(c);
		es.shutdown();
	}
}

2.有限商品个数

这个问题显然比上面的问题要复杂不少,原因在于要保证缓存区的商品要全部消费掉,没有重复消费商品,没有覆盖商品,同时还要保证所有线程能够正常结束,防止存在一直阻塞的线程。

2.1使用线程对象,多个缓存位置(有界),多生产者,多消费者

思路定义一下三个变量

/*需要生产的总商品数*/
  private final int TOTAL_NUM = 30;

  /*已产生的数量*/
  private volatile int productNum = 0;

  /*已消耗的商品数*/
  private volatile int comsumedNum = 0;

每生产一个商品 productNum 自增1,直到TOTAL_NUM为止,如果不满足条件 productNum < TOTAL_NUM 则结束进程,自增操作必须在full.await()方法调用之前,防止生产者线程无法唤醒。

同理,每消费一个商品 comsumedNum 自增1,直到TOTAL_NUM为止,如果不满足条件 comsumedNum < TOTAL_NUM 则结束进程,自增操作必须在empty.await()方法调用之前,防止消费者线程无法唤醒。

comsumedNum和productNum相当于计划经济时代的粮票一样,有了它能够保证生产者线程在唤醒后一定需要生产一个商品,消费者线程在唤醒以后一定能够消费一个商品

package demo;
import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/*使用线程对象,多个缓存位置(有界),多生产者,多消费者, 有限商品个数*/
public class ProducterComsumerDemo3 {
	/*需要生产的总商品数*/
	private final int TOTAL_NUM = 30;
	/*已产生的数量*/
	private volatile int productNum = 0;
	/*已消耗的商品数*/
	private volatile int comsumedNum = 0;
	/*最大缓存商品数*/
	private final int MAX_SLOT = 2;
	/*定义线程公用的锁和条件*/
	private Lock lock = new ReentrantLock();
	private Condition full = lock.newCondition();
	private Condition empty = lock.newCondition();
	/*定义缓存商品的容器*/
	private LinkedList<Goods> queue = new LinkedList<Goods>();
	/*商品的id编号,生产者制造的每个商品的id都不一样,每生产一个id自增1*/
	private int id = 1;
	/*随机产生一个sleep时间*/
	private Random rnd = new Random();
	/*=================定义消费者线程==================*/
	public class ComsumeThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				/*加锁, id、comsumedNum 操作都在同步代码块中*/
				lock.lock();
				try {
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(250));
					if(comsumedNum < TOTAL_NUM){
						comsumedNum++;
					} else{
						/*这里会自动执行finally的语句,释放锁*/
						break;
					}
					while(queue.isEmpty()){
						System.out.println("queue is empty");
						empty.await();
					}
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(250));
					/*模拟消费商品*/
					Goods goods = queue.remove();
					System.out.println(goods);
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(250));
					/*唤醒阻塞的生产者线程*/
					full.signal();
				}
				catch (InterruptedException e) {
				}
				finally{
					lock.unlock();
				}
				/*释放锁后,随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextint(250));
				}
				catch (InterruptedException e) {
				}
			}
			System.out.println(
			          "customer "
			          + Thread.currentThread().getName()
			          + " is over");
		}
	}
	/*=================定义生产者线程==================*/
	public class ProductThread implements Runnable{
		@Override
		    public void run(){
			while(true){
				lock.lock();
				try{
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(250));
					if(productNum < TOTAL_NUM){
						productNum++;
					} else{
						/*这里会自动执行finally的语句,释放锁*/
						break;
					}
					Thread.sleep(rnd.nextint(250));
					while(queue.size() == MAX_SLOT){
						System.out.println("queue is full");
						full.await();
					}
					Thread.sleep(rnd.nextint(250));
					Goods goods = null;
					/*根据序号产生不同的商品*/
					switch(id%3){
						case 0 : goods = new Goods("A", 1, id);
						break;
						case 1 : goods = new Goods("B", 2, id);
						break;
						case 2 : goods = new Goods("C", 3, id);
						break;
					}
					queue.add(goods);
					id++;
					/*唤醒阻塞的消费者线程*/
					empty.signal();
				}
				catch(InterruptedException e){
				}
				finally{
					lock.unlock();
				}
				/*释放锁后,随机延时一段时间*/
				try {
					Thread.sleep(rnd.nextint(250));
				}
				catch (InterruptedException e) {
					/*什么都不做*/
				}
			}
			System.out.println(
			          "producter "
			          + Thread.currentThread().getName()
			          + " is over");
		}
	}
	/*=================main==================*/
	public static void main(String[] args) throws InterruptedException{
		ProducterComsumerDemo3 pcd = new ProducterComsumerDemo3();
		ComsumeThread c = pcd.new ComsumeThread();
		ProductThread p = pcd.new ProductThread();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(p).start();
		new Thread(c).start();
		new Thread(c).start();
		new Thread(c).start();
		System.out.println("main Thread is over");
	}
}

2.2利用线程池,原子类,阻塞队列,以更优雅的方式实现

LinkedBlockingQueue阻塞队列仅在take和put方法上锁,所以productNum和comsumedNum必须定义为原子类。

package demo;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/*使用线程池,多个缓存位置(有界),多生产者,多消费者, 有限商品个数*/
public class LinkedBlockingQueueDemo {
	/*需要生产的总商品数*/
	private final int TOTAL_NUM = 20;
	/*已产生商品的数量*/
	volatile AtomicInteger productNum = new AtomicInteger(0);
	/*已消耗的商品数*/
	volatile AtomicInteger comsumedNum = new AtomicInteger(0);
	/*最大缓存商品数*/
	private final int MAX_SLOT = 5;
	/*同步阻塞队列,队列容量为MAX_SLOT*/
	private LinkedBlockingQueue<Goods> lbq = new LinkedBlockingQueue<Goods>(MAX_SLOT);
	/*随机数*/
	private Random rnd = new Random();
	/*pn表示产品的编号,产品编号从1开始*/
	private volatile AtomicInteger pn = new AtomicInteger(1);
	/*=================定义消费者线程==================*/
	public class CustomerThread implements Runnable{
		@Override
		    public void run(){
			while(comsumedNum.getAndIncrement() < TOTAL_NUM){
				try{
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(500));
					/*从队列中取出商品,队列空时发生阻塞*/
					Goods goods = lbq.take();
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(500));
					/*模拟消耗商品*/
					System.out.println(goods);
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(500));
				}
				catch(InterruptedException e){
				}
			}
			System.out.println(
			          "customer "
			          + Thread.currentThread().getName()
			          + " is over");
		}
	}
	/*=================定义生产者线程==================*/
	public class ProducerThread implements Runnable{
		@Override
		    public void run(){
			while(productNum.getAndIncrement() < TOTAL_NUM){
				try {
					int x = pn.getAndIncrement();
					Goods goods = null;
					/*根据序号产生不同的商品*/
					switch(x%3){
						case 0 : goods = new Goods("A", 1, x);
						break;
						case 1 : goods = new Goods("B", 2, x);
						break;
						case 2 : goods = new Goods("C", 3, x);
						break;
					}
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(500));
					/*产生的新产品入列,队列满时发生阻塞*/
					lbq.put(goods);
					/*随机延时一段时间*/
					Thread.sleep(rnd.nextint(500));
				}
				catch (InterruptedException e1) {
					/*什么都不做*/
				}
			}
			System.out.println(
			          "producter "
			          + Thread.currentThread().getName()
			          + " is over ");
		}
	}
	/*=================main==================*/
	public static void main(String[] args){
		LinkedBlockingQueueDemo lbqd = new LinkedBlockingQueueDemo();
		Runnable c = lbqd.new CustomerThread();
		Runnable p = lbqd.new ProducerThread();
		ExecutorService es = Executors.newCachedThreadPool();
		es.execute(c);
		es.execute(c);
		es.execute(c);
		es.execute(p);
		es.execute(p);
		es.execute(p);
		es.shutdown();
		System.out.println("main Thread is over");
	}
}

总结

以上就是本文关于Java多线程中不同条件下编写生产消费者模型方法介绍的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:

Java设计模式之代理模式原理及实现代码分享

快速理解Java设计模式中的组合模式

Java设计模式之访问者模式使用场景及代码示例

如有不足之处,欢迎留言指出。

(0)

相关推荐

  • Java多线程ForkJoinPool实例详解

    引言 java 7提供了另外一个很有用的线程池框架,Fork/Join框架 理论 Fork/Join框架主要有以下两个类组成. * ForkJoinPool 这个类实现了ExecutorService接口和工作窃取算法(Work-Stealing Algorithm).它管理工作者线程,并提供任务的状态信息,以及任务的执行信息 * ForkJoinTask 这个类是一个将在ForkJoinPool执行的任务的基类. Fork/Join框架提供了在一个任务里执行fork()和join()操作的机制

  • 详解Java中多线程异常捕获Runnable的实现

    详解Java中多线程异常捕获Runnable的实现 1.背景: Java 多线程异常不向主线程抛,自己处理,外部捕获不了异常.所以要实现主线程对子线程异常的捕获. 2.工具: 实现Runnable接口的LayerInitTask类,ThreadException类,线程安全的Vector 3.思路: 向LayerInitTask中传入Vector,记录异常情况,外部遍历,判断,抛出异常. 4.代码: package step5.exception; import java.util.Vector

  • Java利用future及时获取多线程运行结果

    Future接口是Java标准API的一部分,在java.util.concurrent包中.Future接口是Java线程Future模式的实现,可以来进行异步计算. 有了Future就可以进行三段式的编程了,1.启动多线程任务2.处理其他事3.收集多线程任务结果.从而实现了非阻塞的任务调用.在途中遇到一个问题,那就是虽然能异步获取结果,但是Future的结果需要通过isdone来判断是否有结果,或者使用get()函数来阻塞式获取执行结果.这样就不能实时跟踪其他线程的结果状态了,所以直接使用g

  • Java多线程之死锁的出现和解决方法

    什么是死锁? 死锁是这样一种情形:多个线程同时被阻塞,它们中的一个或者全部都在等待某个资源被释放.由于线程被无限期地阻塞,因此程序不能正常运行.形象的说就是:一个宝藏需要两把钥匙来打开,同时间正好来了两个人,他们一人一把钥匙,但是双方都再等着对方能交出钥匙来打开宝藏,谁都没释放自己的那把钥匙.就这样这俩人一直僵持下去,直到开发人员发现这个局面. 导致死锁的根源在于不适当地运用"synchronized"关键词来管理线程对特定对象的访问."synchronized"关

  • java 多线程的同步几种方法

    java 多线程的同步几种方法 一.引言 前几天面试,被大师虐残了,好多基础知识必须得重新拿起来啊.闲话不多说,进入正题. 二.为什么要线程同步 因为当我们有多个线程要同时访问一个变量或对象时,如果这些线程中既有读又有写操作时,就会导致变量值或对象的状态出现混乱,从而导致程序异常.举个例子,如果一个银行账户同时被两个线程操作,一个取100块,一个存钱100块.假设账户原本有0块,如果取钱线程和存钱线程同时发生,会出现什么结果呢?取钱不成功,账户余额是100.取钱成功了,账户余额是0.那到底是哪个

  • RxJava2.x+ReTrofit2.x多线程下载文件的示例代码

    写在前面: 接到公司需求:要做一个apk升级的功能,原理其实很简单,百度也一大堆例子,可大部分都是用框架,要么就是HttpURLConnection,实在是不想这么干.正好看了两天的RxJava2.x+ReTrofit2.x,据说这俩框架是目前最火的异步请求框架了.固本文使用RxJava2.x+ReTrofit2.x实现多线程下载文件的功能. 如果对RxJava2.x+ReTrofit2.x不太了解的请先去看相关的文档. 大神至此请无视. 思路分析: 思路及其简洁明了,主要分为以下四步 1.获取

  • Java多线程中不同条件下编写生产消费者模型方法介绍

    简介: 生产者.消费者模型是多线程编程的常见问题,最简单的一个生产者.一个消费者线程模型大多数人都能够写出来,但是一旦条件发生变化,我们就很容易掉进多线程的bug中.这篇文章主要讲解了生产者和消费者的数量,商品缓存位置数量,商品数量等多个条件的不同组合下,写出正确的生产者消费者模型的方法. 欢迎探讨,如有错误敬请指正 生产消费者模型 生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品.生产消费者模式

  • Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

    Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDKAPI就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blocking

  • Java多线程中Lock锁的使用总结

    多核时代 摩尔定律告诉我们:当价格不变时,集成电路上可容纳的晶体管数目,约每隔18个月便会增加一倍,性能也将提升一倍.换言之,每一美元所能买到的电脑性能,将每隔18个月翻两倍以上.然而最近摩尔定律似乎遇到了麻烦,目前微处理器的集成度似乎到了极限,在目前的制造工艺和体系架构下很难再提高单个处理器的速度了,否则它就被烧坏了.所以现在的芯片制造商改变了策略,转而在一个电路板上集成更多的处理器,也就是我们现在常见的多核处理器. 这就给软件行业带来麻烦(也可以说带来机会,比如说就业机会,呵呵).原来的情况

  • Java多线程中Lock锁的使用小结

    Lock基本使用 Lock它是java.util.concurrent.locks下的一个接口,它也是用来处理线程同步问题的. public interface Lock { void lock(); void lockInterruptibly() throws InterruptedException; boolean tryLock(); boolean tryLock(long time, TimeUnit unit) throws InterruptedException; void

  • Java多线程中的Balking模式详解

    目录 1.场景 2.详细说明 3.Balking模式的本质:停止并返回 源代码如下: 总结 1.场景 自动保存功能: 为防止电脑死机,而定期将数据内容保存到文件中的功能. 2.详细说明 当数据内容被修改时,内容才会被保存.即当写入的内容与上次写入的内容一致时,其实就没有必要执行写入操作.也就是说,以”数据内容是否一致”作为守护条件.若数据内容相同,则不执行写入操作,直接返回. 3.Balking模式的本质:停止并返回 如果现在不合适执行该操作,或者没有必要执行该操作,就停止处理,直接返回—-Ba

  • Java AQS中ReentrantLock条件锁的使用

    目录 一.什么是AQS 1.定义 2.特性 3.属性 4.资源共享方式 5.两种队列 6.队列节点状态 7.实现方法 二.等待队列 1.同步等待队列 2.条件等待队列 三.condition接口 四.ReentrantLock 五.源码解析 一.什么是AQS 1.定义 java.util.concurrent包中的大多数同步器实现都是围绕着共同的基础行为,比如等待队列.条件队列.独占获取.共享获取等,而这些行为的抽象就是基于AbstractQueuedSynchronizer(简称AQS)实现的

  • Java多线程中ReentrantLock与Condition详解

    一.ReentrantLock类 1.1什么是reentrantlock java.util.concurrent.lock中的Lock框架是锁定的一个抽象,它允许把锁定的实现作为Java类,而不是作为语言的特性来实现.这就为Lock的多种实现留下了空间,各种实现可能有不同的调度算法.性能特性或者锁定语义.ReentrantLock类实现了Lock,它拥有与synchronized相同的并发性和内存语义,但是添加了类似锁投票.定时锁等候和可中断锁等候的一些特性.此外,它还提供了在激烈争用情况下更

  • Java多线程中线程的两种创建方式及比较代码示例

    1.线程的概念:线程(thread)是指一个任务从头至尾的执行流,线程提供一个运行任务的机制,对于java而言,一个程序中可以并发的执行多个线程,这些线程可以在多处理器系统上同时运行.当程序作为一个应用程序运行时,java解释器为main()方法启动一个线程. 2.并行与并发: (1)并发:在单处理器系统中,多个线程共享CPU时间,而操作系统负责调度及分配资源给它们. (2)并行:在多处理器系统中,多个处理器可以同时运行多个线程,这些线程在同一时间可以同时运行,而不同于并发,只能多个线程共享CP

  • Java多线程中的wait/notify通信模式实例详解

    前言 最近在看一些JUC下的源码,更加意识到想要学好Java多线程,基础是关键,比如想要学好ReentranLock源码,就得掌握好AQS源码,而AQS源码中又有很多Java多线程经典的一些应用:再比如看了线程池的核心源码实现,又学到了很多核心实现,其实这些都可以提出来慢慢消化并变成自己的知识点,今天这个Java等待/通知模式其实是Thread.join()实现的关键,还有线程池工作线程中线程跟线程之间的通信的核心所在,故在此为了加深理解,做此记录! 参考资料<Java并发编程艺术>(电子PD

  • Java多线程中的单例模式两种实现方式

    Java多线程中的单例模式 一.在多线程环境下创建单例 方式一: package com.ietree.multithread.sync; public class Singletion { private static class InnerSingletion { private static Singletion single = new Singletion(); } public static Singletion getInstance() { return InnerSinglet

随机推荐