Java实现生产者消费者问题与读者写者问题详解

1、生产者消费者问题

生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步;(2)在生产者和消费者之间建立一个管道。第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。

同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性。常用的同步方法是采用信号或加锁机制,保证资源在任意时刻至多被一个线程访问。Java语言在多线程编程上实现了完全对象化,提供了对同步机制的良好支持。在Java中一共有五种方法支持同步,其中前四个是同步方法,一个是管道方法。

wait() / notify()方法
await() / signal()方法
BlockingQueue阻塞队列方法
Semaphore方法
PipedInputStream / PipedOutputStream

1.1 wait() / notify()方法

wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。

wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。

notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。

各起了4个生产者,4个消费者

package test;
public class Hosee {
	private static Integer count = 0;
	private final Integer FULL = 10;
	private static String LOCK = "LOCK";
	class Producer implements Runnable {
		@Override public void run() {
			for (int i = 0; i < 10; i++) {
				try {
					Thread.sleep(3000);
				}
				catch (Exception e) {
					e.printStackTrace();
				}
				synchronized (LOCK) {
					while (count == FULL) {
						try {
							LOCK.wait();
						}
						catch (Exception e) {
							e.printStackTrace();
						}
					}
					count++;
					System.out.println(Thread.currentThread().getName() + "生产者生产,目前总共有" + count);
					LOCK.notifyAll();
				}
			}
		}
	}
	class Consumer implements Runnable {
		@Override public void run() {
			for (int i = 0; i < 10; i++) {
				try {
					Thread.sleep(3000);
				}
				catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				synchronized (LOCK) {
					while (count == 0) {
						try {
							LOCK.wait();
						}
						catch (Exception e) {
							TODO: handle exception e.printStackTrace();
						}
					}
					count--;
					System.out.println(Thread.currentThread().getName() + "消费者消费,目前总共有" + count);
					LOCK.notifyAll();
				}
			}
		}
	}
	public static void main(String[] args) throws Exception {
		Hosee hosee = new Hosee();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
	}
}

(需要注意的是,用什么加锁就用什么notify和wait,实例中使用的是LOCK)

部分打印结果:

由于生产者和消费者说明一致,所以最多都是在2左右,当减少一个消费者时,则会加到10。

1.2 await() / signal()方法

首先,我们先来看看await()/signal()与wait()/notify()的区别:

wait()和notify()必须在synchronized的代码块中使用 因为只有在获取当前对象的锁时才能进行这两个操作 否则会报异常 而await()和signal()一般与Lock()配合使用。
wait是Object的方法,而await只有部分类有,如Condition。
await()/signal()和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。
那么为什么有了synchronized还要提出Lock呢?

1.2.1 对synchronized的改进

synchronized并不完美,它有一些功能性的限制 —— 它无法中断一个正在等候获得锁的线程,也无法通过投票得到锁,如果不想等下去,也就没法得到锁。同步还要求锁的释放只能在与获得锁所在的堆栈帧相同的堆栈帧中进行,多数情况下,这没问题(而且与异常处理交互得很好),但是,确实存在一些非块结构的锁定更合适的情况。

1.2.2 ReentrantLock 类

java.util.concurrent.lock 中的 Lock 框架是锁定的一个抽象,它允许把锁定的实现作为 Java 类,而不是作为语言的特性来实现(更加面向对象)。这就为 Lock 的多种实现留下了空间,各种实现可能有不同的调度算法、性能特性或者锁定语义。 ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)

reentrant 锁意味着什么呢?简单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放(重入锁)。这模仿了 synchronized 的语义;如果线程进入由线程已经拥有的监控器保护的 synchronized 块,就允许线程继续进行,当线程退出第二个(或者后续) synchronized 块的时候,不释放锁,只有线程退出它进入的监控器保护的第一个synchronized 块时,才释放锁。

简单解释下重入锁:

public class Child extends Father implements Runnable{
	final static Child child = new Child();//为了保证锁唯一
	public static void main(String[] args) {
		for (int i = 0; i < 50; i++) {
			new Thread(child).start();
		}
	}
	public synchronized void doSomething() {
		System.out.println("1child.doSomething()");
		doAnotherThing(); // 调用自己类中其他的synchronized方法
	}
	private synchronized void doAnotherThing() {
		super.doSomething(); // 调用父类的synchronized方法
		System.out.println("3child.doAnotherThing()");
	}
	@Override
	public void run() {
		child.doSomething();
	}
}
class Father {
	public synchronized void doSomething() {
		System.out.println("2father.doSomething()");
	}
}

上述代码的锁都是child对象,当执行child.doSomething时,该线程获得child对象的锁,在doSomething方法内执行doAnotherThing时再次请求child对象的锁,因为synchronized是重入锁,所以可以得到该锁,继续在doAnotherThing里执行父类的doSomething方法时第三次请求child对象的锁,同理可得到,如果不是重入锁的话,那这后面这两次请求锁将会被一直阻塞,从而导致死锁。

在查看下面代码示例时,可以看到 Lock 和 synchronized 有一点明显的区别 —— lock 必须在 finally 块中释放。否则,如果受保护的代码将抛出异常,锁就有可能永远得不到释放!这一点区别看起来可能没什么,但是实际上,它极为重要。忘记在 finally 块中释放锁,可能会在程序中留下一个定时炸弹,当有一天炸弹爆炸时,您要花费很大力气才有找到源头在哪。而使用同步,JVM 将确保锁会获得自动释放。

Lock lock = new ReentrantLock();
lock.lock();
try {
 // update object state
}
finally {
 lock.unlock();
}

除此之外,与目前的 synchronized 实现相比,争用下的 ReentrantLock 实现更具可伸缩性。(在未来的 JVM 版本中,synchronized 的争用性能很有可能会获得提高。)这意味着当许多线程都在争用同一个锁时,使用 ReentrantLock 的总体开支通常要比 synchronized 少得多。

1.2.3 什么时候选择用 ReentrantLock 代替 synchronized

在 Java1.5 中,synchronized 是性能低效的。因为这是一个重量级操作,需要调用操作接口,导致有可能加锁消耗的系统时间比加锁以外的操作还多。相比之下使用 Java 提供的 Lock 对象,性能更高一些。但是到了 Java1.6,发生了变化。synchronized 在语义上很清晰,可以进行很多优化,有适应自旋,锁消除,锁粗化,轻量级锁,偏向锁等等。导致在 Java1.6 上 synchronized 的性能并不比 Lock 差。官方也表示,他们也更支持 synchronized,在未来的版本中还有优化余地。

所以在确实需要一些 synchronized 所没有的特性的时候,比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者锁投票使用ReentrantLock。ReentrantLock 还具有可伸缩性的好处,应当在高度争用的情况下使用它,但是请记住,大多数 synchronized 块几乎从来没有出现过争用,所以可以把高度争用放在一边。我建议用 synchronized 开发,直到确实证明 synchronized 不合适,而不要仅仅是假设如果使用 ReentrantLock “性能会更好”。请记住,这些是供高级用户使用的高级工具。(而且,真正的高级用户喜欢选择能够找到的最简单工具,直到他们认为简单的工具不适用为止。)。一如既往,首先要把事情做好,然后再考虑是不是有必要做得更快。

1.2.4 接下来我们使用ReentrantLock来实现生产者消费者问题

package test;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class Hosee {
	private static Integer count = 0;
	private final Integer FULL = 10;
	final Lock lock = new ReentrantLock();
	final Condition NotFull = lock.newCondition();
	final Condition NotEmpty = lock.newCondition();
	class Producer implements Runnable {
		@Override
		public void run() {
			for (int i = 0; i < 10; i++) {
				try {
					Thread.sleep(3000);
				} catch (Exception e) {
					e.printStackTrace();
				}
				lock.lock();
				try {
					while (count == FULL) {
						try {
							NotFull.await();
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
					count++;
					System.out.println(Thread.currentThread().getName()
							+ "生产者生产,目前总共有" + count);
					NotEmpty.signal();
				} finally {
					lock.unlock();
				}
			}
		}
	}
	class Consumer implements Runnable {
		@Override
		public void run() {
			for (int i = 0; i < 10; i++) {
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				lock.lock();
				try {
					while (count == 0) {
						try {
							NotEmpty.await();
						} catch (Exception e) {
							// TODO: handle exception
							e.printStackTrace();
						}
					}
					count--;
					System.out.println(Thread.currentThread().getName()
							+ "消费者消费,目前总共有" + count);
					NotFull.signal();
				} finally {
					lock.unlock();
				}
			}
		}
	}
	public static void main(String[] args) throws Exception {
		Hosee hosee = new Hosee();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
	}
}

运行结果与第一个类似。上述代码用了两个Condition,其实用一个也是可以的,只不过要signalall()。

1.3 BlockingQueue阻塞队列方法

BlockingQueue是JDK5.0的新增内容,它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await() / signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法。

put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。

take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。

package test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Hosee {
	private static Integer count = 0;
	final BlockingQueue<Integer> bq = new ArrayBlockingQueue<Integer>(10);
	class Producer implements Runnable {
		@Override
		public void run() {
			for (int i = 0; i < 10; i++) {
				try {
					Thread.sleep(3000);
				} catch (Exception e) {
					e.printStackTrace();
				}
				try {
					bq.put(1);
					count++;
					System.out.println(Thread.currentThread().getName()
							+ "生产者生产,目前总共有" + count);
				} catch (InterruptedException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		}
	}
	class Consumer implements Runnable {
		@Override
		public void run() {
			for (int i = 0; i < 10; i++) {
				try {
					Thread.sleep(3000);
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
				try {
					bq.take();
					count--;
					System.out.println(Thread.currentThread().getName()
							+ "消费者消费,目前总共有" + count);
				} catch (Exception e) {
					// TODO: handle exception
					e.printStackTrace();
				}
			}
		}
	}
	public static void main(String[] args) throws Exception {
		Hosee hosee = new Hosee();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
	}
}

其实这个BlockingQueue比较难用代码来演示,因为put()与take()方法无法与输出语句保证同步,当然你可以自己去实现 BlockingQueue(BlockingQueue是用await()/signal() 实现的)。所以在输出结果上你会发现不匹配。

例如:当缓冲区已满,生产者在put()操作时,put()内部调用了await()方法,放弃了线程的执行,然后消费者线程执行,调用take()方法,take()内部调用了signal()方法,通知生产者线程可以执行,致使在消费者的println()还没运行的情况下生产者的println()先被执行,所以有了输出不匹配的情况。

对于BlockingQueue大家可以放心使用,这可不是它的问题,只是在它和别的对象之间的同步有问题。

1.4 Semaphore方法

Semaphore 信号量,就是一个允许实现设置好的令牌。也许有1个,也许有10个或更多。
谁拿到令牌(acquire)就可以去执行了,如果没有令牌则需要等待。
执行完毕,一定要归还(release)令牌,否则令牌会被很快用光,别的线程就无法获得令牌而执行下去了。

package test;
import java.util.concurrent.Semaphore;
public class Hosee
{
	int count = 0;
	final Semaphore notFull = new Semaphore(10);
	final Semaphore notEmpty = new Semaphore(0);
	final Semaphore mutex = new Semaphore(1);
	class Producer implements Runnable
	{
		@Override
		public void run()
		{
			for (int i = 0; i < 10; i++)
			{
				try
				{
					Thread.sleep(3000);
				}
				catch (Exception e)
				{
					e.printStackTrace();
				}
				try
				{
					notFull.acquire();//顺序不能颠倒,否则会造成死锁。
					mutex.acquire();
					count++;
					System.out.println(Thread.currentThread().getName()
							+ "生产者生产,目前总共有" + count);
				}
				catch (Exception e)
				{
					e.printStackTrace();
				}
				finally
				{
					mutex.release();
					notEmpty.release();
				}
			}
		}
	}
	class Consumer implements Runnable
	{
		@Override
		public void run()
		{
			for (int i = 0; i < 10; i++)
			{
				try
				{
					Thread.sleep(3000);
				}
				catch (InterruptedException e1)
				{
					e1.printStackTrace();
				}
				try
				{
					notEmpty.acquire();//顺序不能颠倒,否则会造成死锁。
					mutex.acquire();
					count--;
					System.out.println(Thread.currentThread().getName()
							+ "消费者消费,目前总共有" + count);
				}
				catch (Exception e)
				{
					e.printStackTrace();
				}
				finally
				{
					mutex.release();
					notFull.release();
				}
			}
		}
	}
	public static void main(String[] args) throws Exception
	{
		Hosee hosee = new Hosee();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
	}
}

注意notFull.acquire()与mutex.acquire()的位置不能互换,如果先得到互斥锁再发生等待,会造成死锁。

1.5 PipedInputStream / PipedOutputStream

这个类位于java.io包中,是解决同步问题的最简单的办法,一个线程将数据写入管道,另一个线程从管道读取数据,这样便构成了一种生产者/消费者的缓冲区编程模式。PipedInputStream/PipedOutputStream只能用于多线程模式,用于单线程下可能会引发死锁。

package test;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
public class Hosee {
	final PipedInputStream pis = new PipedInputStream();
	final PipedOutputStream pos = new PipedOutputStream();
	{
		try {
			pis.connect(pos);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	class Producer implements Runnable {
		@Override
		public void run() {
			try{
        while(true){
          int b = (int) (Math.random() * 255);
          System.out.println("Producer: a byte, the value is " + b);
          pos.write(b);
          pos.flush();
        }
      }catch(Exception e){
        e.printStackTrace();
      }finally{
        try{
          pos.close();
          pis.close();
        }catch(IOException e){
          System.out.println(e);
        }
      }
		}
	}
	class Consumer implements Runnable {
		@Override
		public void run() {
			try{
        while(true){
          int b = pis.read();
          System.out.println("Consumer: a byte, the value is " + String.valueOf(b));
        }
      }catch(Exception e){
        e.printStackTrace();
      }finally{
        try{
          pos.close();
          pis.close();
        }catch(IOException e){
          System.out.println(e);
        }
      }
		}
	}
	public static void main(String[] args) throws Exception {
		Hosee hosee = new Hosee();
		new Thread(hosee.new Producer()).start();
		new Thread(hosee.new Consumer()).start();
	}
}

与阻塞队列一样,由于read()/write()方法与输出方法不一定同步,输出结果方面会发生不匹配现象,为了使结果更加明显,这里只有1个消费者和1个生产者。

2、读者写者问题

读者—写者问题(Readers-Writers problem)也是一个经典的并发程序设计问题,是经常出现的一种同步问题。计算机系统中的数据(文件、记录)常被多个进程共享,但其中某些进程可能只要求读数据(称为读者Reader);另一些进程则要求修改数据(称为写者Writer)。就共享数据而言,Reader和Writer是两组并发进程共享一组数据区,要求:

(1)允许多个读者同时执行读操作;

(2)不允许读者、写者同时操作;

(3)不允许多个写者同时操作。

Reader和Writer的同步问题分为读者优先、弱写者优先(公平竞争)和强写者优先三种情况,它们的处理方式不同。

首先我们都只考虑公平竞争的情况下,看看Java有哪些方法可以实现读者写者问题

2.1 读写锁

ReentrantReadWriteLock会使用两把锁来解决问题,一个读锁,一个写锁
线程进入读锁的前提条件:
没有其他线程的写锁,
没有写请求或者有写请求,但调用线程和持有锁的线程是同一个
线程进入写锁的前提条件:
没有其他线程的读锁
没有其他线程的写锁

到ReentrantReadWriteLock,首先要做的是与ReentrantLock划清界限。它和后者都是单独的实现,彼此之间没有继承或实现的关系。然后就是总结这个锁机制的特性了:

重入(在上文ReentrantLock处已经介绍了)方面其内部的WriteLock可以获取ReadLock,但是反过来ReadLock想要获得WriteLock则永远都不要想。

WriteLock可以降级为ReadLock,顺序是:先获得WriteLock再获得ReadLock,然后释放WriteLock,这时候线程将保持Readlock的持有。反过来ReadLock想要升级为WriteLock则不可能,为什么?参看(1),呵呵.

ReadLock可以被多个线程持有并且在作用时排斥任何的WriteLock,而WriteLock则是完全的互斥。这一特性最为重要,因为对于高读取频率而相对较低写入的数据结构,使用此类锁同步机制则可以提高并发量。

不管是ReadLock还是WriteLock都支持Interrupt,语义与ReentrantLock一致。

WriteLock支持Condition并且与ReentrantLock语义一致,而ReadLock则不能使用Condition,否则抛出UnsupportedOperationException异常。

看下ReentrantReadWriteLock这个类的两个构造函数

public ReentrantReadWriteLock() {
    this(false);
  }
  /**
   * Creates a new {@code ReentrantReadWriteLock} with
   * the given fairness policy.
   *
   * @param fair {@code true} if this lock should use a fair ordering policy
   */
  public ReentrantReadWriteLock(boolean fair) {
    sync = (fair)? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
  }

fair这个参数表示是否是创建一个公平的读写锁,还是非公平的读写锁。也就是抢占式还是非抢占式。

公平和非公平:公平表示获取的锁的顺序是按照线程加锁的顺序来分配获取到锁的线程时最先加锁的线程,是按照FIFO的顺序来分配锁的;非公平表示获取锁的顺序是无需的,后来加锁的线程可能先获得锁,这种情况就导致某些线程可能一直没获取到锁。

公平锁为啥会影响性能,从code上来看看公平锁仅仅是多了一项检查是否在队首会影响性能,如不是,那么又是在什么地方影响的?假如是闯入的线程,会排在队尾并睡觉(parking)等待前任节点唤醒,这样势必会比非公平锁添加很多paking和unparking的操作

一般的应用场景是: 如果有多个读线程,一个写线程,而且写线程在操作的时候需要阻塞读线程,那么此时就需要使用公平锁,要不然可能写线程一直获取不到锁,导致线程饿死。

再简单说下锁降级

重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。

rwl.readLock().lock();
   if (!cacheValid) {
     // Must release read lock before acquiring write lock
     rwl.readLock().unlock();
     rwl.writeLock().lock();

     if (!cacheValid) {
      data = ...
      cacheValid = true;
     }

     rwl.readLock().lock();
     rwl.writeLock().unlock(); // 降级:先获取读锁再释放写锁
   }

下面我们用读写锁来实现读者写者问题

import java.util.Random;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReadWriteLockTest {
	public static void main(String[] args) {
		final Queue3 q3 = new Queue3();
		for (int i = 0; i < 3; i++) {
			new Thread() {
				public void run() {
					while (true) {
						q3.get();
					}
				}
			}.start();
		}
		for (int i = 0; i < 3; i++) {
			new Thread() {
				public void run() {
					while (true) {
						q3.put(new Random().nextInt(10000));
					}
				}
			}.start();
		}
	}
}
class Queue3 {
	private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
	private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
	public void get() {
		rwl.readLock().lock();// 上读锁,其他线程只能读不能写
		System.out.println(Thread.currentThread().getName()
				+ " be ready to read data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(Thread.currentThread().getName()
				+ "have read data :" + data);
		rwl.readLock().unlock(); // 释放读锁,最好放在finnaly里面
	}
	public void put(Object data) {
		rwl.writeLock().lock();// 上写锁,不允许其他线程读也不允许写
		System.out.println(Thread.currentThread().getName()
				+ " be ready to write data!");
		try {
			Thread.sleep((long) (Math.random() * 1000));
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		this.data = data;
		System.out.println(Thread.currentThread().getName()
				+ " have write data: " + data);
		rwl.writeLock().unlock();// 释放写锁
	}
}

运行结果:

Thread-0 be ready to read data!
Thread-1 be ready to read data!
Thread-2 be ready to read data!
Thread-0have read data :null
Thread-2have read data :null
Thread-1have read data :null
Thread-5 be ready to write data!
Thread-5 have write data: 6934
Thread-5 be ready to write data!
Thread-5 have write data: 8987
Thread-5 be ready to write data!
Thread-5 have write data: 8496

2.2 Semaphore信号量

在1.4中已经介绍了用信号量来实现生产者消费者问题,现在我们将用信号量来实现读者写者问题,信号量的相关知识不再重复,直接看代码

package test;
import java.util.Random;
import java.util.concurrent.Semaphore;
public class ReadWrite
{
	public static void main(String[] args)
	{
		final Queue3 q3 = new Queue3();
		for (int i = 0; i < 3; i++)
		{
			new Thread()
			{
				public void run()
				{
					while (true)
					{
						try
						{
							Thread.sleep((long) (Math.random() * 1000));
						}
						catch (InterruptedException e)
						{
							e.printStackTrace();
						}
						q3.get();
					}
				}
			}.start();
		}
		for (int i = 0; i < 3; i++)
		{
			new Thread()
			{
				public void run()
				{
					while (true)
					{
						try
						{
							Thread.sleep((long) (Math.random() * 1000));
						}
						catch (InterruptedException e)
						{
							e.printStackTrace();
						}
						q3.put(new Random().nextInt(10000));
					}
				}
			}.start();
		}
	}
}
class Queue3
{
	private Object data = null;// 共享数据,只能有一个线程能写该数据,但可以有多个线程同时读该数据。
	private Semaphore wmutex = new Semaphore(1);
	private Semaphore rmutex = new Semaphore(2);
	private int count = 0;
	public void get()
	{
		try
		{
			rmutex.acquire();
			if (count == 0)
				wmutex.acquire();// 当第一读进程欲读数据库时,阻止写进程写
			count++;
			System.out.println(Thread.currentThread().getName()
					+ " be ready to read data!");
			try
			{
				Thread.sleep((long) (Math.random() * 1000));
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			System.out.println(Thread.currentThread().getName()
					+ "have read data :" + data);
			count--;
			if (count == 0)
				wmutex.release();
			rmutex.release();
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
	}
	public void put(Object data)
	{
		try
		{
			wmutex.acquire();
			System.out.println(Thread.currentThread().getName()
					+ " be ready to write data!");
			try
			{
				Thread.sleep((long) (Math.random() * 1000));
			}
			catch (InterruptedException e)
			{
				e.printStackTrace();
			}
			this.data = data;
			System.out.println(Thread.currentThread().getName()
					+ " have write data: " + data);
		}
		catch (Exception e)
		{
			e.printStackTrace();
		}
		finally
		{
			wmutex.release();
		}
	}
}

单纯使用信号量不能解决读者与写者问题,必须引入计数器count(可以用CountDownLatch代替 )对读进程计数; count与wmutex结合使用,使读读能同时进行,读写排斥。count为0时表示读进程开始,此时写进程阻塞(wmutex被读进程获取),当count不为0时,表示有多个读进程,就不用操作 wmutex了,因为第一个读进程已经获得了 wmutex。count表示有多少个读进程在读,每次有一个就+1,读完了-1,当count==0时,表示读进程都结束了。此时 wmutex释放,写进程才有机会获得wmutex。为了使读进程不要一直占有 wmutex,最好让读进程sleep一下,让写进程有机会获得wmutex,使效果更明显。

总结:

以上就是本文关于Java实现生产者消费者问题与读者写者问题详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:java并发学习之BlockingQueue实现生产者消费者详解、Java多线程之线程通信生产者消费者模式及等待唤醒机制代码详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!

(0)

相关推荐

  • JAVA多线程实现生产者消费者的实例详解

    JAVA多线程实现生产者消费者的实例详解 下面的代码实现了生产者消费者的问题 Product.Java package consumerProducer; public class Product { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } public Product(String id) { this.id=id; } publ

  • java多线程解决生产者消费者问题

    本文实例讲述了java多线程解决生产者消费者问题的方法.分享给大家供大家参考.具体分析如下: 题目是这样的: 采用Java 多线程技术,设计实现一个符合生产者和消费者问题的程序.对一个对象(枪膛)进行操作,其最大容量是12颗子弹.生产者线程是一个压入线程,它不断向枪膛中压入子弹:消费者线程是一个射出线程,它不断从枪膛中射出子弹. 要求: (1)给出分析过程说明. (2)程序输出,要模拟体现对枪膛的压入和射出操作: (2)设计程序时应考虑到两个线程的同步问题. 这个和著名的生产者消费者问题几乎是一

  • java并发学习之BlockingQueue实现生产者消费者详解

    1.介绍 阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满:从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空.并发包下很多高级同步类的实现都是基于BlockingQueue实现的. JDK7提供了以下7个阻塞队列: ArrayBlockingQueue :由数组结构组成的有界阻塞队列. LinkedBloc

  • java 中多线程生产者消费者问题详细介绍

    java 中多线程生产者消费者问题 前言: 一般面试喜欢问些线程的问题,较基础的问题无非就是死锁,生产者消费者问题,线程同步等等,在前面的文章有写过死锁,这里就说下多生产多消费的问题了 import java.util.concurrent.locks.*; class BoundedBuffer { final Lock lock = new ReentrantLock();//对象锁 final Condition notFull = lock.newCondition(); //生产者监视

  • java解决单缓冲生产者消费者问题示例

    经典的生产者消费者问题模拟.此程序模拟最简单情形--单缓冲.为模拟实际情况,consume item和produce item时加了延时,可以通过修改延时模拟不同的生成消费速率. [code] [/co/** * single buffer consumer-producer problem. * by xu(xusiwei1236@163.com). * */public class ConsumerProducer { static Object buffer = null; static

  • JAVA生产者消费者(线程同步)代码学习示例

    一.问题描述 生产者消费者问题是一个典型的线程同步问题.生产者生产商品放到容器中,容器有一定的容量(只能顺序放,先放后拿),消费者消费商品,当容器满了后,生产者等待,当容器为空时,消费者等待.当生产者将商品放入容器后,通知消费者:当消费者拿走商品后,通知生产者. 二.解决方案 对容器资源加锁,当取得锁后,才能对互斥资源进行操作. 复制代码 代码如下: public class ProducerConsumerTest { public static void main(String []args

  • Java实现生产者消费者问题与读者写者问题详解

    1.生产者消费者问题 生产者消费者问题是研究多线程程序时绕不开的经典问题之一,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品.解决生产者/消费者问题的方法可分为两类:(1)采用某种机制保护生产者和消费者之间的同步:(2)在生产者和消费者之间建立一个管道.第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式.第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强. 同步问题核心在于:如何保证同一资源被多个线程并发访问时的完整性.常

  • Java多线程之线程通信生产者消费者模式及等待唤醒机制代码详解

    前言 前面的例子都是多个线程在做相同的操作,比如4个线程都对共享数据做tickets–操作.大多情况下,程序中需要不同的线程做不同的事,比如一个线程对共享变量做tickets++操作,另一个线程对共享变量做tickets–操作,这就是大名鼎鼎的生产者和消费者模式. 正文 一,生产者-消费者模式也是多线程 生产者和消费者模式也是多线程的范例.所以其编程需要遵循多线程的规矩. 首先,既然是多线程,就必然要使用同步.上回说到,synchronized关键字在修饰函数的时候,使用的是"this"

  • Java中生产者消费者问题总结

    生产者-消费者算是并发编程中常见的问题.依靠缓冲区我们可以实现生产者与消费者之间的解耦.生产者只管往缓冲区里面放东西,消费者只管往缓冲区里面拿东西.这样我们避免生产者想要交付数据给消费者,但消费者此时还无法接受数据这样的情况发生. wait notify 这个问题其实就是线程间的通讯,所以要注意的是不能同时读写.生产者在缓冲区满的时候不生产,等待:消费者在缓冲区为空的时候不消费,等待.比较经典的做法是wait和notify. 生产者线程执行15次set操作 public class Produc

  • Java编程生产者消费者实现的四种方法

    目录 实现生产者消费者的四种方式 一.最基础的 二.java.util.concurrent.lock 中的 Lock 框架 三.阻塞队列BlockingQueue的实现 Blockqueue 接口的一些方法 四.信号量 Semaphore 的实现 实现生产者消费者的四种方式 一.最基础的 利用 wait() 和 notify() 方法实现,当缓冲区满或为空时都调用 wait() 方法等待,当生产者生产了一个产品或消费者消费了一个产品后会唤醒所有线程: package com.practice;

  • java中生产者消费者问题和代码案例

    目录 应用场景 分析 解决方法 管程法 信号灯法 总结 应用场景 假设仓库中只能存放一件产品,生产者将生产出来的产品放入仓库,消费者将仓库中产品取走消费 如果仓库中没有产品,则生产者将产品放入仓库,否则停止生产并等待,直到仓库中的产品被消费者取走为止 如果仓库中放有产品,则消费者可以将产品取走消费,否则停止消费并等待,直到仓库中再次放入产品为止 分析 这是一个线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件. 对于生产者,没有生产产品之前,要通知消费者等待,而

  • Java page cache回写机制案例详解

    JAVA写文件的基本流程 在不使用堆外内存的情况下,java在写文件时,先将字节写入JVM的堆内内存中:然后调用jvm的写文件函数,将字节写入jvm的堆外内存中,jvm再调用系统内核的写文件函数,将字节写入内核的heap中:然后内核将字节写入page cache中,将page cache状态改为dirty,根据page cache的回写机制在合适的时机将字节写入磁盘. page cache 自动回写机制 page cache的回写时机由系统配置/etc/sysctl.conf 中的几个参数决定,

  • java开发Dubbo负载均衡与集群容错示例详解

    目录 负载均衡与集群容错 Invoker 服务目录 RegistryDirectory 获取Invoker列表 监听注册中心 刷新Invoker列表 StaticDirectory 服务路由 Cluster FailoverClusterInvoker FailfastClusterInvoker FailsafeClusterInvoker FailbackClusterInvoker ForkingClusterInvoker BroadcastClusterInvoker Abstract

  • Java 设计模式之责任链模式及异步责任链详解

    目录 一.定义 二.普通责任链模式 三.异步责任链模式 一.定义 责任链模式(Chain of Responsibility Pattern):避免将一个请求的发送者与接受者耦合在一起,让多个对象都有机会处理请求.将接受请求的对象连接成一条链,并且沿着这条链传递请求,直到有一个对象能够处理它为止. 在很多源码都有涉及,如Mybatis拦截器.Filter- 责任链模式属于行为型模式. 二.普通责任链模式 抽象处理类:AbstractProcessor /** * 抽象处理类 */ public

  • 使用Java构造和解析Json数据的两种方法(详解二)

    JSON(JavaScript Object Notation) 是一种轻量级的数据交换格式,采用完全独立于语言的文本格式,是理想的数据交换格式.同时,JSON是 JavaScript 原生格式,这意味着在 JavaScript 中处理 JSON数据不须要任何特殊的 API 或工具包. 在www.json.org上公布了很多JAVA下的json构造和解析工具,其中org.json和json-lib比较简单,两者使用上差不多但还是有些区别.下面接着介绍用org.json构造和解析Json数据的方法

  • java 可重启线程及线程池类的设计(详解)

    了解JAVA多线程编程的人都知道,要产生一个线程有两种方法,一是类直接继承Thread类并实现其run()方法:二是类实现Runnable接口并实现其run()方法,然后新建一个以该类为构造方法参数的Thread,类似于如下形式: Thread t=new Thread(myRunnable).而最终使线程启动都是执行Thread类的start()方法. 在JAVA中,一个线程一旦运行完毕,即执行完其run()方法,就不可以重新启动了.此时这个线程对象也便成了无用对象,等待垃圾回收器的回收.下次

随机推荐