Java阻塞队列BlockingQueue详解

目录
  • 队列的类型
  • 数据结构
  • 阻塞队列 BlockingQueue
  • 常见的阻塞队列
  • BlockingQueue API
  • ArrayBlockingQueue 源码简解
  • 生产者消费者模式
  • 延迟队列 DelayQueue

队列的类型

  • 无限队列(unbounded queue) 无容量限定,只随存储变化
  • 有限队列(bounded queue) 定义了最大容量

向无限队列添加元素的所有操作都将永远不会阻塞(也是线程安全的),因此它可以增长到非常大的容量。 使用无限阻塞队列 BlockingQueue 设计生产者 - 消费者模型时最重要的是消费者应该能够像生产者向队列添加消息一样快地消费消息 。否则可能内存不足而抛出 OutOfMemory 异常。

数据结构

  • 1.通常使用链表或数组实现
  • 2.一般具有 FIFO(先进先出) 特性,也可以设计为双端队列
  • 3.队列的主要操作:入队和出队

阻塞队列 BlockingQueue

定义:线程通信中,在任意时刻,无论并发有多高,在单个 JVM 上,同一时间永远只有一个线程能对队列进行入队或出队操作。BlockingQueue 可以在线程之间共享而无需任何显式同步

阻塞队列的类型:

JAVA中的应用场景 : 线程池、SpringCloud-Eureka 三级缓存、Nacos、MQ、Netty 等

常见的阻塞队列

  • ArrayBlockingQueue : 由数组支持的有界队列

    • 应用场景: 线程池中有比较多的应用、生产者消费者模型
    • 工作原理: 基于 ReentrantLock 保证线程安全,根据Condition实现队列满时的阻塞
  • LinkedBlockingQueue : 基于链表的无界队列(理论上有界)
  • PriorityBlockingQueue : 由优先级堆支持的无界优先级队列
  • DelayQueue : 由优先级堆支持的、基于时间的调度队列,内部基于无界队列PriorityQueue 实现,而无界队列基于数组的扩容实现
    • 使用方法: 入队的对象必须要实现 Delayed 接口,而 Delayed 集成自 Comparable 接口
    • 应用场景: 售卖电影票等
    • 工作原理: 队列内部会根据时间优先级进行排序。延迟类线程池周期执行。

它们都实现了BlockingQueue接口,都有put()和take()等方法,创建方式如下:

BlockingQueue<String> blockingQueue = new ArrayBlockingQueue<> (666);

BlockingQueue API

添加元素:

方法 含义
add() 如果插入成功则返回 true,否则抛出 IllegalStateException 异常
put() 将指定的元素插入队列,如果队列满了,会阻塞直到有空间插入
offer() 如果插入成功则返回 true,否则返回 false
offer(E e, long timeout, TimeUnit unit) 尝试将元素插入队列,如果队列已满,会阻塞直到有空间插入,阻塞有时间控制

检索元素:

方法 含义
take() 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
poll(long timeout, TimeUnit unit) 检索并删除队列的头部,如有必要,等待指定的等待时间以使元素可用,如果超时,则返回 null

ArrayBlockingQueue 源码简解

实现:同步等待队列(CLH)+ 条件等待队列满足条件的元素在CLH队列中等待锁,不满足条件的队列挪到条件等待队列,满足条件后再从 tail 插入 CLH 队列

线程获取锁的条件: 在 CLH 队列里等待的 Node 节点,并且 Node 节点的前驱节点是 Singal。条件等待队列里的线程是无法获取锁的。

/**
 * 构造方法
 * 还有两个构造函数,一个无fair参数,一个可传入集合,创建时插入队列
 * @param capacity 固定容量
 * @param fair 默认是false:访问顺序未指定; true:按照FIFO顺序处理
 */
public ArrayBlockingQueue(int capacity, boolean fair) {
   if (capacity <= 0)
        throw new IllegalArgumentException();
    this.items = new Object[capacity];
    lock = new ReentrantLock(fair); // 根据fair创建对应的锁
    // 条件对象,配合容器能满足业务
    notEmpty = lock.newCondition(); // 出队条件对象
    notFull =  lock.newCondition(); // 入队条件对象
}
/**
 * 入队方法
 * 在队列的尾部插入指定的元素,如果队列已满,则等待空间可用
 */
public void put(E e) throws InterruptedException {
    checkNotNull(e); // 检查put对象是否为空,空抛出异常
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 若未被中断尝试获取锁,详见下文
    try {
        // 队列中元素的数量 等于 排队元素的长度
        while (count == items.length)
            notFull.await(); // 见下文
        enqueue(e); // 元素入队
    } finally {
        lock.unlock();
    }
}
/**
 * 出队方法
 * 获取队列的头部元素并将其删除,如果队列为空,则阻塞并等待元素变为可用
 */
public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly(); // 见下文
    try {
        while (count == 0)
            notEmpty.await(); // 见下文
        return dequeue(); // 元素出队
    } finally {
        lock.unlock();
    }
}

令当前线程等待,直到收到信号或被中断详:与此 Condition 关联的锁被自动释放,进入等待,并且处于休眠状态,直到发生以下四种情况之一:

  • ①其他线程调用这个Condition的 signal 方法,当前线程恰好被选为要被唤醒的线程;
  • ②其他线程调用这个条件的 signalAll 方法
  • ③其他线程中断当前线程,支持中断线程挂起;
  • ④一个“虚假的唤醒”发生了。

在这些情况下,在此方法返回之前,当前线程必须重新获得与此条件相关联的锁。当线程返回时,保证它持有这个锁。

如果当前线程有以下两种情况之一:

  • ①在进入该方法时设置中断状态;
  • ②在等待时被中断,支持线程挂起的中断 抛出InterruptedException

生产者消费者模式

BlockingQueue 可以在线程之间共享而无需任何显式同步,在生产者消费者之间,只需要将阻塞队列以参数的形式进行传递即可。它内部的机制会自动保证线程的安全性。

生产者:实现了 Runnable 接口,每个生产者生产100种商品和1个中断标记后完成线程任务

@Slf4j
@Slf4j
public class Producer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> blockingQueue;
    private final int stopTag;
    /**
     * 构造方法
     * @param blockingQueue
     * @param stopTag
     */
    public Producer(BlockingQueue<Integer> blockingQueue,int stopTag) {
        this.blockingQueue = blockingQueue;
        this.stopTag = stopTag;
    }
    @Override
    public void run() {
        try {
            generateNumbers();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
   private void generateNumbers() throws InterruptedException {
        // 每个生产者都随机生产10种商品
        for (int i = 0; i < 10; i++) {
            int product = ThreadLocalRandom.current().nextInt(1000,1100);
            log.info("生产者{}号,生产了商品,编号为{}",Thread.currentThread().getId(),product);
            blockingQueue.put(product);
        }
        // 生产终止标记
        blockingQueue.put(stopTag);
        log.info("生产者{}号,生产了第终止标记编号{}",Thread.currentThread().getId(),Thread.currentThread().getId());
    }
}

消费者:消费者拿到终止消费标记终止消费,否则消费商品,拿到终止标记后完成线程任务

@Slf4j
public class Consumer implements Runnable{
    // 作为参数的阻塞队列
    private BlockingQueue<Integer> queue;
    private final int stopTage;
    public Consumer(BlockingQueue<Integer> queue, int stopTage) {
        this.queue = queue;
        this.stopTage = stopTage;
    }
    @Override
    public void run() {
        try {
            while (true) {
                Integer product = queue.take();
                if (product.equals(stopTage)) {
                    log.info("{}号消费者,停止消费,因为拿到了停止消费标记",Thread.currentThread().getId());
                    return;
                }
                log.info("{}号消费者,拿到的商品编号:{}",Thread.currentThread().getId(),product);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

客户端类: 创建与计算机 CPU 核数相同的线程数,与 16个生产者

public class ProductConsumerTest {
    public static void main(String[] args) {
        // 阻塞队列容量
        int blockingQueueSize = 10;
        // 生产者数量
        int producerSize = 16;
        // 消费者数量 = 计算机线程核数 8
        int consumerSize = Runtime.getRuntime().availableProcessors();
        // 终止消费标记
        int stopTag = Integer.MAX_VALUE;
        BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(blockingQueueSize);
        // 创建16个生产者线程
        for (int i = 0; i < producerSize; i++) {
            new Thread(new Producer(blockingQueue, stopTag)).start();
        }
        // 创建8个消费者线程
        for (int j = 0; j < consumerSize; j++) {
            new Thread(new Consumer(blockingQueue, stopTag)).start();
        }
    }
}

延迟队列 DelayQueue

定义: Java 延迟队列提供了在指定时间才能获取队列元素的功能,队列头元素是最接近过期的元素。没有过期元素的话,使用 poll() 方法会返回 null 值,超时判定是通过getDelay(TimeUnit.NANOSECONDS)方法的返回值小于等于 0 来判断。延时队列不能存放空元素。

/**
 * 电影票类,实现了Delayed接口,重写 compareTo 和 getDelay方法
 */
public class MovieTicket implements Delayed {
    //延迟时间
    private final long delay;
    //到期时间
    private final long expire;
    //数据
    private final String msg;
    //创建时间
    private final long now;
    public long getDelay() {
        return delay;
    }
    public long getExpire() {
        return expire;
    }
    public String getMsg() {
        return msg;
    }
    public long getNow() {
        return now;
    }
    /**
     * @param msg 消息
     * @param delay 延期时间
     */
    public MovieTicket(String msg , long delay) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间
        now = System.currentTimeMillis();
    }
    /**
     * @param msg
     */
    public MovieTicket(String msg){
        this(msg,1000);
    }
    public MovieTicket(){
        this(null,1000);
    }
    /**
     * 获得延迟时间   用过期时间-当前时间,时间单位毫秒
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire
                - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }
    /**
     * 用于延迟队列内部比较排序  当前时间的延迟时间 - 比较对象的延迟时间
     * 越早过期的时间在队列中越靠前
     * @param delayed
     * @return
     */
    @Override
    public int compareTo(Delayed delayed) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS)
                - delayed.getDelay(TimeUnit.MILLISECONDS));
    }
}

测试类:

public static void main(String[] args) {
    DelayQueue<MovieTicket> delayQueue = new DelayQueue<MovieTicket>();
    MovieTicket ticket = new MovieTicket("电影票1",10000);
    delayQueue.put(ticket);
    MovieTicket ticket1 = new MovieTicket("电影票2",5000);
    delayQueue.put(ticket1);
    MovieTicket ticket2 = new MovieTicket("电影票3",8000);
    delayQueue.put(ticket2);
    log.info("message:--->入队完毕");
    while( delayQueue.size() > 0 ){
        try {
            ticket = delayQueue.take();
            log.info("电影票出队:{}",ticket.getMsg());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

从运行结果可以看出队列是延迟出队,间隔和我们所设置的时间相同

到此这篇关于Java阻塞队列BlockingQueue详解的文章就介绍到这了,更多相关Java BlockingQueue 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解Java阻塞队列(BlockingQueue)的实现原理

    阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满:从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空.并发包下很多高级同步类的实现都是基于BlockingQueue实现的. BlockingQueue 的操作方法 BlockingQueue 具有 4 组不同的方法用于插入.移除以及对队列中的元素进行检查.如果

  • Java并发编程之阻塞队列(BlockingQueue)详解

    目录 队列 阻塞队列 ArrayBlockingQueue 重要属性 构造方法 添加元素 add(e) offer(e) put(e) offer(e,time,unit) 移除元素 take() dequeue() LinkedBlockingQueue 重要属性 构造方法 添加元素 offer(e) put(e) 移除元素 poll() take() 对比 总结 大家好,我是小黑,一个在互联网苟且偷生的农民工. 队列 学过数据结构的同学应该都知道,队列是数据结构中一种特殊的线性表结构,和平时

  • Java源码解析阻塞队列ArrayBlockingQueue常用方法

    本文基于jdk1.8进行分析 ArrayBlockingQueue的功能简介参考https://www.jb51.net/article/154211.htm. 首先看一下ArrayBlockingQueue的成员变量.如下图.最主要的成员变量是items,它是一个Object类型的数组用于保存阻塞队列中的元素.其次是takeIndex,putIndex,count,分别表示了从队列获取元素的位置,往队列里放元素的位置和队列中元素的个数.然后是lock,notEmpty和notFull三个和锁相

  • java 中 阻塞队列BlockingQueue详解及实例

    java 中 阻塞队列BlockingQueue详解及实例 BlockingQueue很好的解决了多线程中数据的传输,首先BlockingQueue是一个接口,它大致有四个实现类,这是一个很特殊的队列,如果BlockQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒.同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间才会被唤醒继续操作.

  • Java源码解析阻塞队列ArrayBlockingQueue功能简介

    本文基于jdk1.8进行分析. 阻塞队列是java开发时常用的一个数据结构.首先看一下阻塞队列的作用是什么.阻塞队列的作用,从源码中类的注释中来了解,是最清晰准确的. ArrayBlockingQueue是一个用数组实现的有界阻塞队列.提供FIFO的功能.队列头上的元素是在队列中呆了最长时间的元素,队列尾上的元素是在队列中呆了时间最短的元素.新元素会插入在队列尾部,从队列获取元素时会从队列头上获取. 这是一个传统的有界队列,在这个有界队列里,一个固定大小的数组用来保存生产者产生的元素和消费者获取

  • Java源码解析阻塞队列ArrayBlockingQueue介绍

    Java的阻塞队列,在实现时,使用到了lock和condition,下面是对其主要方法的介绍. 首先看一下,阻塞队列中使用到的锁. /** Main lock guarding all access **/ final ReentrantLock lock;​ /** Condition for waiting takes **/ private final Condition notEmpty;​ /** Condition for waiting puts **/ private final

  • Java 阻塞队列BlockingQueue详解

    目录 一. 前言 二. 认识BlockingQueue 三.BlockingQueue的核心方法: 四.常见BlockingQueue 五. 小结 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 二. 认识BlockingQueue 阻塞队列,

  • Java阻塞队列BlockingQueue详解

    目录 队列的类型 数据结构 阻塞队列 BlockingQueue 常见的阻塞队列 BlockingQueue API ArrayBlockingQueue 源码简解 生产者消费者模式 延迟队列 DelayQueue 队列的类型 无限队列(unbounded queue) 无容量限定,只随存储变化 有限队列(bounded queue) 定义了最大容量 向无限队列添加元素的所有操作都将永远不会阻塞(也是线程安全的),因此它可以增长到非常大的容量. 使用无限阻塞队列 BlockingQueue 设计

  • Java并发编程之阻塞队列深入详解

    目录 1. 什么是阻塞队列 2. 阻塞队列的代码使用 3. 生产者消费者模型 (1)应用一:解耦合 (2)应用二:削峰填谷 (3)相关代码 4.阻塞队列和生产者消费者模型功能的实现 1. 什么是阻塞队列 阻塞队列是一种特殊的队列,和数据结构中普通的队列一样,也遵守先进先出的原则同时,阻塞队列是一种能保证线程安全的数据结构,并且具有以下两种特性:当队列满的时候,继续向队列中插入元素就会让队列阻塞,直到有其他线程从队列中取走元素:当队列为空的时候,继续出队列也会让队列阻塞,直到有其他线程往队列中插入

  • Java 阻塞队列详解及简单使用

     Java 阻塞队列详解 概要: 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全"传输"数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 认识BlockingQueue阻塞队列,顾名思义,首先它是一个队列,而一个队列在数据结构中所起的作用大致如下图所示: 从上图我们可以很清楚看到,通过一个共享的队列,

  • Java深入了解数据结构之栈与队列的详解

    目录 一,栈 1,概念 2,栈的操作 3,栈的实现 ①入栈 ②出栈 ③获取栈顶元素 ④判断栈是否为空 4,实现mystack 二,队列 1,概念 2,队列的实现 ①入队 ②出队 ③获取队首元素 3,实现myqueue 一,栈 1,概念 在我们软件应用 ,栈这种后进先出数据结构的应用是非常普遍的.比如你用浏 览器上网时不管什么浏览器都有 个"后退"键,你点击后可以接访问顺序的逆序加载浏览过的网页. 很多类似的软件,比如 Word Photoshop 等文档或图像编 软件中 都有撤销 )的

  • Java 栈和队列的相互转换详解

    目录 用栈实现队列-力扣232题 用队列实现栈-力扣225题  1. 双队列实现栈 2.一个队列实现栈 栈和队列的本质是相同的,都只能在线性表的一端进行插入和删除.因此,栈和队列可以相互转换. 用栈实现队列-力扣232题 题目要求:仅使用两个栈实现先入先出队列.队列应当支持一般队列支持的所有操作 使用双栈来实现队列,我们就可以让一个栈储存具体元素,另一个栈做辅助  上图可以看到,新元素进栈时,要确保该栈为空.进入栈的元素按顺序存到辅助栈中,等新元素进入栈之后,再将辅助栈中的元素按顺序出到该栈中.

  • Java之ThreadPoolExecutor类详解

    ThreadPoolExecutor ThreadPoolExecutor是线程池框架的一个核心类,线程池通过线程复用机制,并对线程进行统一管理 降低系统资源消耗.通过复用已存在的线程,降低线程创建和销毁造成的消耗: 提高响应速度.当有任务到达时,无需等待新线程的创建便能立即执行: 提高线程的可管理性.线程是稀缺资源,如果无限制的创建,不仅会消耗大量系统资源,还会降低系统的稳 定性,使用线程池可以进行对线程进行统一的分配.调优和监控. 线程池的运行状态总共有5种,其值和含义分别如下: RUNNI

随机推荐