Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

Queue是什么

队列,是一种数据结构。除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的。无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的。在FIFO队列中,所有新元素都插入队列的末尾。

Queue中的方法

Queue中的方法不难理解,6个,每2对是一个也就是总共3对。看一下JDKAPI就知道了:

注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议。

BlockingQueue

1、BlockingQueue概述

BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。

BlockingQueue有四个具体的实现类,根据不同需求,选择不同的实现类

1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。

2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

3、PriorityBlockingQueue:类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序。

4、SynchronousQueue:特殊的BlockingQueue,对其的操作必须是放和取交替完成的。

LinkedBlockingQueue可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

讲BlockingQueue,因为BlockingQueue是Queue中的一个重点,并且通过BlockingQueue我们再次加深对于生产者/消费者模型的理解。其他的Queue都不难,通过查看JDKAPI和简单阅读源码完全可以理解他们的作用。

BlockingQueue,顾名思义,阻塞队列。BlockingQueue是在java.util.concurrent下的,因此不难理解,BlockingQueue是为了解决多线程中数据高效安全传输而提出的。

多线程中,很多场景都可以使用队列实现,比如经典的生产者/消费者模型,通过队列可以便利地实现两者之间数据的共享,定义一个生产者线程,定义一个消费者线程,通过队列共享数据就可以了。

当然现实不可能都是理想的,比如消费者消费速度比生产者生产的速度要快,那么消费者消费到一定程度上的时候,必须要暂停等待一下了(使消费者线程处于WAITING状态)。BlockingQueue的提出,就是为了解决这个问题的,他不用程序员去控制这些细节,同时还要兼顾效率和线程安全。

阻塞队列所谓的"阻塞",指的是某些情况下线程会挂起(即阻塞),一旦条件满足,被挂起的线程又会自动唤醒。使用BlockingQueue,不需要关心什么时候需要阻塞线程,什么时候需要唤醒线程,这些内容BlockingQueue都已经做好了

2、BlockingQueue中的方法

BlockingQueue既然是Queue的子接口,必然有Queue中的方法,上面已经列了。看一下BlockingQueue中特有的方法:

(1)voidput(Ee)throwsInterruptedException

把e添加进BlockingQueue中,如果BlockingQueue中没有空间,则调用线程被阻塞,进入等待状态,直到BlockingQueue中有空间再继续

(2)voidtake()throwsInterruptedException

取走BlockingQueue里面排在首位的对象,如果BlockingQueue为空,则调用线程被阻塞,进入等待状态,直到BlockingQueue有新的数据被加入

(3)intdrainTo(Collection<?superE>c,intmaxElements)

一次性取走BlockingQueue中的数据到c中,可以指定取的个数。通过该方法可以提升获取数据效率,不需要多次分批加锁或释放锁

3、ArrayBlockingQueue

基于数组的阻塞队列,必须指定队列大小。比较简单。ArrayBlockingQueue中只有一个ReentrantLock对象,这意味着生产者和消费者无法并行运行(见下面的代码)。另外,创建ArrayBlockingQueue时,可以指定ReentrantLock是否为公平锁,默认采用非公平锁。

/** Main lock guarding all access */
private final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;

4、LinkedBlockingQueue

基于链表的阻塞队列,和ArrayBlockingQueue差不多。不过LinkedBlockingQueue如果不指定队列容量大小,会默认一个类似无限大小的容量,之所以说是类似是因为这个无限大小是Integer.MAX_VALUE,这么说就好理解ArrayBlockingQueue为什么必须要制定大小了,如果ArrayBlockingQueue不指定大小的话就用Integer.MAX_VALUE,那将造成大量的空间浪费,但是基于链表实现就不一样的,一个一个节点连起来而已。另外,LinkedBlockingQueue生产者和消费者都有自己的锁(见下面的代码),这意味着生产者和消费者可以"同时"运行。

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

5、SynchronousQueue

比较特殊,一种没有缓冲的等待队列。什么叫做没有缓冲区,ArrayBlocking中有:

/** The queued items */
private final E[] items;

数组用以存储队列。LinkedBlockingQueue中有:

/**
 * Linked list node class
 */
static class Node<E> {
  /** The item, volatile to ensure barrier separating write and read */
  volatile E item;
  Node<E> next;
  Node(E x) { item = x; }
}

将队列以链表形式连接。

生产者/消费者操作数据实际上都是通过这两个"中介"来操作数据的,但是SynchronousQueue则是生产者直接把数据给消费者(消费者直接从生产者这里拿数据),好像又回到了没有生产者/消费者模型的老办法了。换句话说,每一个插入操作必须等待一个线程对应的移除操作。SynchronousQueue又有两种模式:

1、公平模式

采用公平锁,并配合一个FIFO队列(Queue)来管理多余的生产者和消费者

2、非公平模式

采用非公平锁,并配合一个LIFO栈(Stack)来管理多余的生产者和消费者,这也是SynchronousQueue默认的模式

利用BlockingQueue实现生产者消费者模型

上一篇我们写的生产者消费者模型有局限,局限体现在:

缓冲区内只能存放一个数据,实际生产者/消费者模型中的缓冲区内可以存放大量生产者生产出来的数据
生产者和消费者处理数据的速度几乎一样
OK,我们就用BlockingQueue来简单写一个例子,并且让生产者、消费者处理数据速度不同。子类选择的是ArrayBlockingQueue,大小定为10:

public static void main(String[] args)
{
  final BlockingQueue<String> bq = new ArrayBlockingQueue<String>(10);
  Runnable producerRunnable = new Runnable()
  {
    int i = 0;
    public void run()
    {
      while (true)
      {
        try
        {
          System.out.println("我生产了一个" + i++);
          bq.put(i + "");
          Thread.sleep(1000);
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
        }
      }
    }
  };
  Runnable customerRunnable = new Runnable()
  {
    public void run()
    {
      while (true)
      {
        try
        {
          System.out.println("我消费了一个" + bq.take());
          Thread.sleep(3000);
        }
        catch (InterruptedException e)
        {
          e.printStackTrace();
        }
      }
    }
  };
  Thread producerThread = new Thread(producerRunnable);
  Thread customerThread = new Thread(customerRunnable);
  producerThread.start();
  customerThread.start();
}

代码的做法是让生产者生产速度快于消费者消费速度的,看一下运行结果:

我生产了一个0
我消费了一个1
我生产了一个1
我生产了一个2
我消费了一个2
我生产了一个3
我生产了一个4
我生产了一个5
我消费了一个3
我生产了一个6
我生产了一个7
我生产了一个8
我消费了一个4
我生产了一个9
我生产了一个10
我生产了一个11
我消费了一个5
我生产了一个12
我生产了一个13
我生产了一个14
我消费了一个6
我生产了一个15
我生产了一个16
我消费了一个7
我生产了一个17
我消费了一个8
我生产了一个18

分两部分来看输出结果:

1、第1行~第23行。这块BlockingQueue未满,所以生产者随便生产,消费者随便消费,基本上都是生产3个消费1个,消费者消费速度慢

2、第24行~第27行,从前面我们可以看出,生产到16,消费到6,说明到了ArrayBlockingQueue的极限10了,这时候没办法,生产者生产一个ArrayBlockingQueue就满了,所以不能继续生产了,只有等到消费者消费完才可以继续生产。所以之后的打印内容一定是一个生产者、一个消费者

这就是前面一章开头说的"通过平衡生产者和消费者的处理能力来提高整体处理数据的速度",这给例子应该体现得很明显。另外,也不要担心非单一生产者/消费者场景下的系统假死问题,缓冲区空、缓冲区满的场景BlockingQueue都是定义了不同的Condition,所以不会唤醒自己的同类。

总结

以上就是本文关于Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅本站:

浅谈Java多线程的优点及代码示例

浅谈Java多线程处理中Future的妙用(附源码)

Java利用future及时获取多线程运行结果

如有不足之处,欢迎留言指出。

您可能感兴趣的文章:

  • Java多线程中不同条件下编写生产消费者模型方法介绍
  • 生产消费者模式实现方式和线程安全问题代码示例
(0)

相关推荐

  • Java多线程中不同条件下编写生产消费者模型方法介绍

    简介: 生产者.消费者模型是多线程编程的常见问题,最简单的一个生产者.一个消费者线程模型大多数人都能够写出来,但是一旦条件发生变化,我们就很容易掉进多线程的bug中.这篇文章主要讲解了生产者和消费者的数量,商品缓存位置数量,商品数量等多个条件的不同组合下,写出正确的生产者消费者模型的方法. 欢迎探讨,如有错误敬请指正 生产消费者模型 生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者两种角色,他们通过内存缓冲区进行通信,生产者生产消费者需要的资料,消费者把资料做成产品.生产消费者模式

  • 生产消费者模式实现方式和线程安全问题代码示例

    生产者消费者模式的几种实现方式 拿我们生活中的例子来说,工厂生产出来的产品总是要输出到外面使用的,这就是生产与消费的概念. 在我们实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等). 产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者. 第一种:采用wait-notify实现生产者消费者模式 1. 一生产者与一消费者: 2. 一生产者与多消费者: 第二种: 采用阻塞队列实现生产者消费者

  • Java多线程Queue、BlockingQueue和使用BlockingQueue实现生产消费者模型方法解析

    Queue是什么 队列,是一种数据结构.除了优先级队列和LIFO队列外,队列都是以FIFO(先进先出)的方式对各个元素进行排序的.无论使用哪种排序方式,队列的头都是调用remove()或poll()移除元素的.在FIFO队列中,所有新元素都插入队列的末尾. Queue中的方法 Queue中的方法不难理解,6个,每2对是一个也就是总共3对.看一下JDKAPI就知道了: 注意一点就好,Queue通常不允许插入Null,尽管某些实现(比如LinkedList)是允许的,但是也不建议. Blocking

  • java多线程编程之向线程传递数据的三种方法

    在传统的同步开发模式下,当我们调用一个函数时,通过这个函数的参数将数据传入,并通过这个函数的返回值来返回最终的计算结果.但在多线程的异步开发模式下,数据的传递和返回和同步开发模式有很大的区别.由于线程的运行和结束是不可预料的,因此,在传递和返回数据时就无法象函数一样通过函数参数和return语句来返回数据.本文就以上原因介绍了几种用于向线程传递数据的方法,在下一篇文章中将介绍从线程中返回数据的方法. 欲先取之,必先予之.一般在使用线程时都需要有一些初始化数据,然后线程利用这些数据进行加工处理,并

  • java多线程编程之从线程返回数据的两种方法

    一.通过类变量和方法返回数据 使用这种方法返回数据需要在调用start方法后才能通过类变量或方法得到数据.让我们先来看看例程2-13会得到什么结果. 复制代码 代码如下: package mythread; public class MyThread extends Thread{    private String value1;    private String value2; public void run()    {        value1 = "通过成员变量返回数据"

  • Java实现简易生产者消费者模型过程解析

    一.概述 一共两个线程,一个线程生产产品,一个线程消费产品,使用同步代码块方法,同步两个线程.当产品没有时,通知生产者生产,生产者生产后,通知消费者消费,并等待消费者消费完. 需要注意的是,有可能出现,停止生产产品后,消费者还没未来得及消费生产者生产的最后一个产品,就结束消费,导致最后一个产品没有被消费. 本例使用synchronize以及wait().notify()实现简易版的线程者消费者模型. 二.测试用例 这里的产品用笔来演示,每只笔都有其编号code 一共有四个类:分别是生产者类,产品

  • Java多线程atomic包介绍及使用方法

    引言 Java从JDK1.5开始提供了java.util.concurrent.atomic包,方便程序员在多线程环境下,无锁的进行原子操作.原子变量的底层使用了处理器提供的原子指令,但是不同的CPU架构可能提供的原子指令不一样,也有可能需要某种形式的内部锁,所以该方法不能绝对保证线程不被阻塞. Atomic包介绍 在Atomic包里一共有12个类,四种原子更新方式,分别是原子更新基本类型,原子更新数组,原子更新引用和原子更新字段.Atomic包里的类基本都是使用Unsafe实现的包装类. 原子

  • 一文精通Java 多线程之全方位解读

    目录 并行和并发 线程基础概念 线程和进程 多线程的好处 线程的状态 实现多线程的两种方式 继承Thread类 实现Runnable接口 线程的安全性和原子性 锁的概念和使用 生产消费者模型 生产消费者模型中的类–存储类 生产消费者模型中的类–生产者 生产消费者模型中的类–消费者 测试类 效果 volatile变量 线程池的概念和使用 并行和并发 并行:多个CPU实例或是多台机器同时执行一段处理逻辑,是真正的同时. 并发:一个CPU或一台机器,通过CPU调度算法,让用户看上去同时去执行,实际上从

  • Java多线程工具篇BlockingQueue的详解

    前言: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue 阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中通过一个共享的队列,可以使得数据由队列的一端输入,从另外一端输出: 常用的队列主要有以下两种:(当

  • Java多线程 BlockingQueue实现生产者消费者模型详解

    BlockingQueue BlockingQueue.解决了多线程中,如何高效安全"传输"数据的问题.程序员无需关心什么时候阻塞线程,什么时候唤醒线程,该唤醒哪个线程. 方法介绍 BlockingQueue是Queue的子类 void put(E e) 插入指定元素,当BlockingQueue为满,则线程阻塞,进入Waiting状态,直到BlockingQueue有空闲空间再继续. 这里以ArrayBlockingQueue为例进行分析 void take() 队首出队,当Bloc

  • Java并发编程之阻塞队列(BlockingQueue)详解

    目录 队列 阻塞队列 ArrayBlockingQueue 重要属性 构造方法 添加元素 add(e) offer(e) put(e) offer(e,time,unit) 移除元素 take() dequeue() LinkedBlockingQueue 重要属性 构造方法 添加元素 offer(e) put(e) 移除元素 poll() take() 对比 总结 大家好,我是小黑,一个在互联网苟且偷生的农民工. 队列 学过数据结构的同学应该都知道,队列是数据结构中一种特殊的线性表结构,和平时

随机推荐