浅谈Java中的Queue家族

Queue接口

先看下Queue的继承关系和其中定义的方法:

Queue继承自Collection,Collection继承自Iterable。

Queue有三类主要的方法,我们用个表格来看一下他们的区别:

方法类型 方法名称 方法名称 区别
Insert add offer 两个方法都表示向Queue中添加某个元素,不同之处在于添加失败的情况,add只会返回true,如果添加失败,会抛出异常。offer在添加失败的时候会返回false。所以对那些有固定长度的Queue,优先使用offer方法。
Remove remove poll 如果Queue是空的情况下,remove会抛出异常,而poll会返回null。
Examine element peek 获取Queue头部的元素,但不从Queue中删除。两者的区别还是在于Queue为空的情况下,element会抛出异常,而peek返回null。

注意,因为对poll和peek来说null是有特殊含义的,所以一般来说Queue中禁止插入null,但是在实现中还是有一些类允许插入null比如LinkedList。

尽管如此,我们在使用中还是要避免插入null元素。

Queue的分类

一般来说Queue可以分为BlockingQueue,Deque和TransferQueue三种。

BlockingQueue

BlockingQueue是Queue的一种实现,它提供了两种额外的功能:

当当前Queue是空的时候,从BlockingQueue中获取元素的操作会被阻塞。当当前Queue达到最大容量的时候,插入BlockingQueue的操作会被阻塞。

BlockingQueue的操作可以分为下面四类:

操作类型Throws exceptionSpecial valueBlocksTimes outInsertadd(e)offer(e)put(e)offer(e, time, unit)Removeremove()poll()take()poll(time, unit)Examineelement()peek()not applicablenot applicable

第一类是会抛出异常的操作,当遇到插入失败,队列为空的时候抛出异常。

第二类是不会抛出异常的操作。

第三类是会Block的操作。当Queue为空或者达到最大容量的时候。

第四类是time out的操作,在给定的时间里会Block,超时会直接返回。

BlockingQueue是线程安全的Queue,可以在生产者消费者模式的多线程中使用,如下所示:

class Producer implements Runnable {
   private final BlockingQueue queue;
   Producer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { queue.put(produce()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   Object produce() { ... }
 }

 class Consumer implements Runnable {
   private final BlockingQueue queue;
   Consumer(BlockingQueue q) { queue = q; }
   public void run() {
     try {
       while (true) { consume(queue.take()); }
     } catch (InterruptedException ex) { ... handle ...}
   }
   void consume(Object x) { ... }
 }

 class Setup {
   void main() {
     BlockingQueue q = new SomeQueueImplementation();
     Producer p = new Producer(q);
     Consumer c1 = new Consumer(q);
     Consumer c2 = new Consumer(q);
     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
   }
 }

最后,在一个线程中向BlockQueue中插入元素之前的操作happens-before另外一个线程中从BlockQueue中删除或者获取的操作。

Deque

Deque是Queue的子类,它代表double ended queue,也就是说可以从Queue的头部或者尾部插入和删除元素。

同样的,我们也可以将Deque的方法用下面的表格来表示,Deque的方法可以分为对头部的操作和对尾部的操作:

方法类型 Throws exception Special value Throws exception Special value
Insert addFirst(e) offerFirst(e) addLast(e) offerLast(e)
Remove removeFirst() pollFirst() removeLast() pollLast()
Examine getFirst() peekFirst() getLast() peekLast()

和Queue的方法描述基本一致,这里就不多讲了。

当Deque以 FIFO (First-In-First-Out)的方法处理元素的时候,Deque就相当于一个Queue。

当Deque以LIFO (Last-In-First-Out)的方式处理元素的时候,Deque就相当于一个Stack。

TransferQueue

TransferQueue继承自BlockingQueue,为什么叫Transfer呢?因为TransferQueue提供了一个transfer的方法,生产者可以调用这个transfer方法,从而等待消费者调用take或者poll方法从Queue中拿取数据。

还提供了非阻塞和timeout版本的tryTransfer方法以供使用。

我们举个TransferQueue实现的生产者消费者的问题。

先定义一个生产者:

@Slf4j
@Data
@AllArgsConstructor
class Producer implements Runnable {
    private TransferQueue<String> transferQueue;

    private String name;

    private Integer messageCount;

    public static final AtomicInteger messageProduced = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                boolean added = transferQueue.tryTransfer( "第"+i+"个", 2000, TimeUnit.MILLISECONDS);
                log.info("transfered {} 是否成功: {}","第"+i+"个",added);
                if(added){
                    messageProduced.incrementAndGet();
                }
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total transfered {}",messageProduced.get());
    }
}

在生产者的run方法中,我们调用了tryTransfer方法,等待2秒钟,如果没成功则直接返回。

再定义一个消费者:

@Slf4j
@Data
@AllArgsConstructor
public class Consumer implements Runnable {

    private TransferQueue<String> transferQueue;

    private String name;

    private int messageCount;

    public static final AtomicInteger messageConsumed = new AtomicInteger();

    @Override
    public void run() {
        for (int i = 0; i < messageCount; i++) {
            try {
                String element = transferQueue.take();
                log.info("take {}",element );
                messageConsumed.incrementAndGet();
                Thread.sleep(500);
            } catch (InterruptedException e) {
                log.error(e.getMessage(),e);
            }
        }
        log.info("total consumed {}",messageConsumed.get());
    }

}

在run方法中,调用了transferQueue.take方法来取消息。

下面先看一下一个生产者,零个消费者的情况:

@Test
public void testOneProduceZeroConsumer() throws InterruptedException {

    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(10);
    Producer producer = new Producer(transferQueue, "ProducerOne", 5);

    exService.execute(producer);

    exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
    exService.shutdown();
}

输出结果:

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第2个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第3个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第4个 是否成功: false

[pool-1-thread-1] INFO com.flydean.Producer - total transfered 0

可以看到,因为没有消费者,所以消息并没有发送成功。

再看下一个有消费者的情况:

@Test
public void testOneProduceOneConsumer() throws InterruptedException {

    TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
    ExecutorService exService = Executors.newFixedThreadPool(10);
    Producer producer = new Producer(transferQueue, "ProducerOne", 2);
    Consumer consumer = new Consumer(transferQueue, "ConsumerOne", 2);

    exService.execute(producer);
    exService.execute(consumer);

    exService.awaitTermination(50000, TimeUnit.MILLISECONDS);
    exService.shutdown();
}

输出结果:

[pool-1-thread-2] INFO com.flydean.Consumer - take 第0个

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第0个 是否成功: true

[pool-1-thread-2] INFO com.flydean.Consumer - take 第1个

[pool-1-thread-1] INFO com.flydean.Producer - transfered 第1个 是否成功: true

[pool-1-thread-1] INFO com.flydean.Producer - total transfered 2

[pool-1-thread-2] INFO com.flydean.Consumer - total consumed 2

可以看到Producer和Consumer是一个一个来生产和消费的。

以上就是浅谈Java中的Queue家族的详细内容,更多关于Java中的Queue家族的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java优先队列(PriorityQueue)重写compare操作

    we can custom min heap or max heap by override the method compare. package myapp.kit.quickstart.utils; import java.util.Comparator; import java.util.Queue; /** * priority queue (heap) demo. * * @author huangdingsheng * @version 1.0, 2020/5/8 */ publi

  • Java PriorityQueue数据结构接口原理及用法

    PriorityQueue是从JDK1.5开始提供的新的数据结构接口,它是一种基于优先级堆的极大优先级队列.优先级队列是不同于先进先出队列的另一种队列.每次从队列中取出的是具有最高优先权的元素.如果不提供Comparator的话,优先队列中元素默认按自然顺序排列,也就是数字默认是小的在队列头,字符串则按字典序排列(参阅 Comparable),也可以根据 Comparator 来指定,这取决于使用哪种构造方法.优先级队列不允许 null 元素.依靠自然排序的优先级队列还不允许插入不可比较的对象(

  • 详解Java中的延时队列 DelayQueue

    当用户超时未支付时,给用户发提醒消息.另一种场景是,超时未付款,订单自动取消.通常,订单创建的时候可以向延迟队列种插入一条消息,到时间自动执行.其实,也可以用临时表,把这些未支付的订单放到一个临时表中,或者Redis,然后定时任务去扫描.这里我们用延时队列来做.RocketMQ有延时队列,RibbitMQ也可以实现,Java自带的也有延时队列,接下来就回顾一下各种队列. Queue 队列是一种集合.除了基本的集合操作以外,队列还提供了额外的插入.提取和检查操作.队列的每个方法都以两种形式存在:一

  • java中PriorityBlockingQueue的入队知识点总结

    在PriorityBlockingQueue中添加元素同样有四种方法,因为是树状的结构,所以在插入方法上也有所变化,是自下而上的操作过程.在入队的规则上有三个要点需要我们注意.鉴于PriorityBlockingQueue入队方法主要通过offer(E)实现,所以我们就这种方法作主要讲解. 1.入队规则 (1)默认的插入规则中,新加入的元素可能会破坏小顶堆的性质,因此需要进行调整. (2)调整的过程为:从尾部下标的位置开始,将加入的元素逐层与当前点的父节点的内容进行比较并交换,直到满足父节点内容

  • java队列之queue用法实例分析

    Queue: 基本上,一个队列就是一个先入先出(FIFO)的数据结构 Queue接口与List.Set同一级别,都是继承了Collection接口.LinkedList实现了Deque接 口. Queue的实现 1.没有实现的阻塞接口的LinkedList: 实现了java.util.Queue接口和java.util.AbstractQueue接口 内置的不阻塞队列: PriorityQueue 和 ConcurrentLinkedQueue PriorityQueue 和 Concurren

  • java ArrayBlockingQueue的方法及缺点分析

    提到ArrayBlockingQueue的方法,想必大家都不陌生,我们在入队和出队的时候,接触了不少这方面的函数.当然ArrayBlockingQueue中的方法也不止于此,今天我们就全部为大家罗列出来,也算是做一个学习方向上的指引.然后就其中的peek方法带来实例介绍,并说明ArrayBlockingQueue使用的不足之处. 1.ArrayBlockingQueue函数列表 // 创建一个带有给定的(固定)容量和默认访问策略的 ArrayBlockingQueue. ArrayBlockin

  • java中DelayQueue实例用法详解

    在阻塞队里中,除了对元素进行增加和删除外,我们可以把元素的删除做一个延迟的处理,即使用DelayQueue的方法.这里的删除需要一定的时间才能生效,有点类似于过期处理的理念.下面我们就DelayQueue的概念.特点进行讲解,然后在代码示例中体会DelayQueue的使用. 1.概念 是一个带有延迟时间的无界阻塞队列.队列中的元素,只有等延时时间到了,才能取出来.此队列一般用于过期数据的删除,或任务调度.以下,模拟一下定长时间的数据删除. 2.特点 (1)无边界设计 (2)添加(put)不阻塞,

  • java DelayQueue的原理浅析

    在对DelayQueue延迟功能的使用上,很多人不能后完全理解延迟的一些功能使用,这里我们深入来挖掘一下DelayQueue的原理. 下面将从构造方法.接口.继承体系三个方面进行分析,需要注意的是,相较于其它的阻塞队列,DelayQueue因为延迟的功能多了接口的使用,一起来看具体内容. 1.构造方法 public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } 构造方法比较简单,

  • 详解java中DelayQueue的使用

    简介 今天给大家介绍一下DelayQueue,DelayQueue是BlockingQueue的一种,所以它是线程安全的,DelayQueue的特点就是插入Queue中的数据可以按照自定义的delay时间进行排序.只有delay时间小于0的元素才能够被取出. DelayQueue 先看一下DelayQueue的定义: public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements Bloc

  • 浅谈Java中的Queue家族

    Queue接口 先看下Queue的继承关系和其中定义的方法: Queue继承自Collection,Collection继承自Iterable. Queue有三类主要的方法,我们用个表格来看一下他们的区别: 方法类型 方法名称 方法名称 区别 Insert add offer 两个方法都表示向Queue中添加某个元素,不同之处在于添加失败的情况,add只会返回true,如果添加失败,会抛出异常.offer在添加失败的时候会返回false.所以对那些有固定长度的Queue,优先使用offer方法.

  • 浅谈Java中常用数据结构的实现类 Collection和Map

    线性表,链表,哈希表是常用的数据结构,在进行Java开发时,JDK已经为我们提供了一系列相应的类来实现基本的数据结构.这些类均在java.util包中.本文试图通过简单的描述,向读者阐述各个类的作用以及如何正确使用这些类. Collection ├List │├LinkedList │├ArrayList │└Vector │ └Stack └Set Map ├Hashtable ├HashMap └WeakHashMap Collection接口 Collection是最基本的集合接口,一个C

  • 浅谈Java中Unicode的编码和实现

    Unicode的编码和实现 大概来说,Unicode编码系统可分为编码方式和实现方式两个层次. 编码方式 字符是抽象的最小文本单位.它没有固定的形状(可能是一个字形),而且没有值."A"是一个字符,"€"也是一个字符.字符集是字符的集合.编码字符集是一个字符集,它为每一个字符分配一个唯一数字. Unicode 最初设计是作为一种固定宽度的 16 位字符编码.也就是每个字符占用2个字节.这样理论上一共最多可以表示216(即65536)个字符.上述16位统一码字符构成基

  • 浅谈java中null是什么,以及使用中要注意的事项

    1.null既不是对象也不是一种类型,它仅是一种特殊的值,你可以将其赋予任何引用类型,你也可以将null转化成任何类型,例如: Integer i=null; Float f=null; String s=null; 但是不能把null赋值给基本类型,如int ,float,double等 int k=null ----------编译器会报错cannot convert from null to int 2.null是关键字,像public.static.final.它是大小写敏感的,你不能将

  • 浅谈java中math类中三种取整函数的区别

    math类中三大取整函数 1.ceil 2.floor 3.round 其实三种取整函数挺简单的.只要记住三个函数名翻译过来的汉语便能轻松理解三大函数,下面一一介绍 1.ceil,意思是天花板,java中叫做向上取整,大于等于该数字的最接近的整数 例: math.ceil(13.2)=14 math.ceil(-13.2)=-13 2.floor,意思是地板,java中叫做向下取整,小于等于该数字的最接近的整数 例: math.floor(13.2)=13 math.floor(-13.2)=-

  • 浅谈Java中Collection和Collections的区别

    1.java.util.Collection 是一个集合接口.它提供了对集合对象进行基本操作的通用接口方法.Collection接口在Java 类库中有很多具体的实现.Collection接口的意义是为各种具体的集合提供了最大化的统一操作方式. Collection ├List │├LinkedList │├ArrayList │└Vector │ └Stack └Set 2.java.util.Collections 是一个包装类.它包含有各种有关集合操作的静态多态方法.此类不能实例化,就像一

  • 浅谈Java中注解Annotation的定义、使用、解析

    此例子,用于说明如何在Java中对"注解 Annotation"的定义.使用和解析的操作.注解一般用于自定义开发框架中,至于为什么使用,此处不作过多说明,这里只说明如何使用,以作备记.下面例子已测试,可以正常运行通过. 1.注解自定义. 这里定义两个注解,分别用来注解类和注解属性. package cc.rulian.ann; import java.lang.annotation.ElementType; import java.lang.annotation.Retention;

  • 浅谈java中的TreeMap 排序与TreeSet 排序

    TreeMap: package com; import java.util.Comparator; import java.util.TreeMap; public class Test5 { /** * @param args */ public static void main(String[] args) { // TODO Auto-generated method stub TreeMap<String, String> tree = new TreeMap<String,

  • 浅谈java中对集合对象list的几种循环访问

    java中对集合对象list的几种循环访问的总结如下  1 经典的for循环 public static void main(String[] args) { List<String> list = new ArrayList(); list.add("123"); list.add("java"); list.add("j2ee"); System.out.println("=========经典的for循环=======

  • 浅谈java中的一维数组、二维数组、三维数组、多维数组

    这个数组可以看做新手学习,从一维数组 到 多维 数组 循环渐进,其实看起也很简单,一看便知,众所周知,一维.二维或许经常用到,用到二维以上应该就很少了. public class test { public static void main(String[] args) { /*一维数组*/ int num[] = {0,1,2}; /*下面输出 3 行数据,0 ~ 2*/ for (int i = 0; i < num.length; i++) { System.out.println("

随机推荐