Java并发编程之阻塞队列深入详解

目录
  • 1. 什么是阻塞队列
  • 2. 阻塞队列的代码使用
  • 3. 生产者消费者模型
    • (1)应用一:解耦合
    • (2)应用二:削峰填谷
    • (3)相关代码
  • 4.阻塞队列和生产者消费者模型功能的实现

1. 什么是阻塞队列

阻塞队列是一种特殊的队列,和数据结构中普通的队列一样,也遵守先进先出的原则同时,阻塞队列是一种能保证线程安全的数据结构,并且具有以下两种特性:当队列满的时候,继续向队列中插入元素就会让队列阻塞,直到有其他线程从队列中取走元素;当队列为空的时候,继续出队列也会让队列阻塞,直到有其他线程往队列中插入元素

补充:线程阻塞的意思指代码此时不会被执行,即操作系统在此时不会把这个线程调度到CPU上去执行了

2. 阻塞队列的代码使用

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.BlockingDeque;
public class Test {
    public static void main(String[] args) throws InterruptedException {
        //不能直接newBlockingDeque,因为它是一个接口,要向上转型
        //LinkedBlockingDeque内部是基于链表方式来实现的
        BlockingDeque<String> queue=new LinkedBlockingDeque<>(10);//此处可以指定一个具体的数字,这里的的10代表队列的最大容量
        queue.put("hello");
        String elem=queue.take();
        System.out.println(elem);
        elem=queue.take();
        System.out.println(elem);
    }
}

注意: put方法带有阻塞功能,但是offer不具有,所以一般用put方法(能使用offer方法的原因是 BlockingDeque继承了Queue

打印结果如上所示,当打印了hello后,队列为空,代码执行到elem=queue.take();就不会继续往下执行了,此时线程进入阻塞等待状态,什么也不会打印了,直到有其他线程给队列中放入新的元素为止

3. 生产者消费者模型

生产者消费者模型是在服务器开发和后端开发中比较常用的编程手段,一般用于解耦合和削峰填谷。

高耦合度:两个代码模块的关联关系比较高
高内聚:一个代码模块内各个元素彼此结合的紧密
因此,我们一般追求高内聚低耦合,这样会加快执行效率,而使用生产者消费者模型就可以解耦合

(1)应用一:解耦合

我们以实际生活中的情况为例,这里有两台服务器:A服务器和B服务器,当A服务器传输数据给B时,要是直接传输的话,那么不是A向B推送数据,就是B从A中拉取数据,都是需要A和B直接交互,所以A和B存在依赖关系(A和B的耦合度比较高)。未来如果服务器需要扩展,比如加一个C服务器,让A给C传数据,那么改动就比较复杂,且会降低效率。这时我们可以加一个队列,这个队列为阻塞队列,如果A把数据写到队列里,B从中取,那么队列相当于是中转站(或者说交易场所),A相当于生产者(提供数据),B相当于消费者(接收数据),此时就构成了生产者消费者模型,这样会让代码耦合度更低,维护更方便,执行效率更高。

在计算机中,生产者充当其中一组线程,而消费者充当另一组线程,而交易场所就可以使用阻塞队列了

(2)应用二:削峰填谷

实际生活中
在河道中大坝算是一个很重要的组成部分了,如果没有大坝,大家试想一下结果:当汛期来临后上游的水很大时,下游就会涌入大量的水发生水灾让庄稼被淹没;而旱期的话下游的水会很少可能会引发旱灾。若有大坝的话,汛期时大坝把多余的水存到大坝中,关闸蓄水,让上游的水按一定速率往下流,避免突然一波大雨把下游淹了,这样下游不至于出现水灾。旱期时大坝把之前储存好的水放出来,还是让让水按一定速率往下流,避免下流太缺水,这样既可以避免汛期发生洪涝又可以避免旱期发生旱灾了。
峰:相当于汛期
谷:相当于旱期
计算机中
这样的情况在计算机中也是很典型的,尤其是在服务器开发中,网关通常会把互联网中的请求转发给业务服务器,比如一些商品服务器,用户服务器,商家服务器(存放商家的信息),直播服务器。但因为互联网过来的请求数量是多是少不可控,相当于上游的水,如果突然来了一大波请求,网关即使能扛得住,后续的很多服务器收到很多请求也就会崩溃(处理一个请求涉及到一系列的数据库操作,因为数据库相关操作效率本身比较低,这样请求多了就处理不过来了,因此就会崩溃)

所以实际情况中网关和业务服务器之间往往用一个队列来缓冲,这个队列就是阻塞队列(交易场所),用这个队列来实现生产者(网关)消费者(业务服务器)模型,把请求缓存到队列中,后面的消费者(业务服务器)按照自己固定的速率去读请求。这样当请求很多时,虽然队列服务器可能会稍微受到一定压力,但能保证业务服务器的安全。

(3)相关代码

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class TestDemo {
    public static void main(String[] args) {
        // 使用一个 BlockingQueue 作为交易场所
        BlockingQueue<Integer> queue = new LinkedBlockingQueue<>();
        // 此线程作为消费者
        Thread customer = new Thread() {
            @Override
            public void run() {
                while (true) {
                    // 取队首元素
                    try {
                        Integer value = queue.take();
                        System.out.println("消费元素: " + value);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        customer.start();
        // 此线程作为生产者
        Thread producer = new Thread() {
            @Override
            public void run() {
                for (int i = 1; i <= 10000; i++) {
                    System.out.println("生产了元素: " + i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        try {
            customer.join();
            producer.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

打印如上(此代码是让生产者通过sleep每过1秒生产一个元素,而消费者不使用sleep,所以每当生产一个元素时,消费者都会立马消费一个元素)

4.阻塞队列和生产者消费者模型功能的实现

在学会如何使用BlockingQueue后,那么如何自己去实现一个呢?
主要思路:

  • 1.利用数组
  • 2.head代表队头,tail代表队尾
  • 3.head和tail重合后到底是空的还是满的判断方法:专门定义一个size记录当前队列元素个数,入队列时size加1出队列时size减1,当size为0表示空,为数组最大长度就是满的(也可以浪费一个数组空间用head和tail重合表示空,用tail+1和head重合表示满,但此方法较为麻烦,上一个方法较为直观,因此我们使用上一个方法)
public class Test2 {
    static class BlockingQueue {
    private int[] items = new int[1000];    // 此处的1000相当于队列的最大容量, 此处暂时不考虑扩容的问题.
    private int head = 0;//定义队头
    private int tail = 0;//定义队尾
    private int size = 0;//数组大小
    private Object locker = new Object();

    // put 用来入队列
    public void put(int item) throws InterruptedException {
        synchronized (locker) {
            while (size == items.length) {
                // 队列已经满了,阻塞队列开始阻塞
                locker.wait();
            }
            items[tail] = item;
            tail++;
            // 如果到达末尾, 就回到起始位置.
            if (tail >= items.length) {
                tail = 0;
            }
            size++;
            locker.notify();
        }
    }
    // take 用来出队列
    public int take() throws InterruptedException {
        int ret = 0;
        synchronized (locker) {
            while (size == 0) {
                // 对于阻塞队列来说, 如果队列为空, 再尝试取元素, 就要阻塞
                locker.wait();
            }
            ret = items[head];
            head++;
            if (head >= items.length) {
                head = 0;
            }
            size--;
            // 此处的notify 用来唤醒 put 中的 wait
            locker.notify();
        }
        return ret;
    }
}

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue queue = new BlockingQueue();
        // 消费者线程
        Thread consumer = new Thread() {
            @Override
            public void run() {
                while (true) {
                    try {
                        int elem = queue.take();
                        System.out.println("消费元素: " + elem);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        consumer.start();

        // 生产者线程
        Thread producer = new Thread() {
            @Override
            public void run() {
                for (int i = 1; i < 10000; i++) {
                    System.out.println("生产元素: " + i);
                    try {
                        queue.put(i);
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        };
        producer.start();
        consumer.join();
        producer.join();
    }
}

运行结果如上。
注意:

  • 1.wait和notify的正确使用
  • 2.put和take都会产生阻塞情况,但阻塞条件是对立的,wait不会同时触发(put唤醒take阻塞,take唤醒put阻塞)

到此这篇关于Java并发编程之阻塞队列深入详解的文章就介绍到这了,更多相关Java 阻塞队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解Java七大阻塞队列之SynchronousQueue

    目录 分析 其实SynchronousQueue 是一个特别有意思的阻塞队列,就我个人理解来说,它很重要的特点就是没有容量. 直接看一个例子: package dongguabai.test.juc.test; import java.util.concurrent.SynchronousQueue; /** * @author Dongguabai * @description * @date 2021-09-01 21:52 */ public class TestSynchronousQu

  • Java常见的阻塞队列总结

    Java阻塞队列 阻塞队列和普通队列主要区别在阻塞二字: 阻塞添加:队列已满时,添加元素线程会阻塞,直到队列不满时才唤醒线程执行添加操作 阻塞删除:队列元素为空时,删除元素线程会阻塞,直到队列不为空再执行删除操作 常见的阻塞队列有 LinkedBlockingQueue 和 ArrayBlockingQueue,其中它们都实现 BlockingQueue 接口,该接口定义了阻塞队列需实现的核心方法: public interface BlockingQueue<E> extends Queue

  • Java 阻塞队列和线程池原理分析

    目录 [1]阻塞队列 一.什么是阻塞队列? 二.阻塞队列有什么用? 三.阻塞队列的简单实用 [2]Java 线程池 一.我们为什么需要Java 线程池?使用它的好处是什么? 二.Java中主要提供了哪几种线程的线程池? 三.线程类的继承关系 四.ThreadPoolExecutor参数的含义 corePoolSize 五.线程池工作流程(机制) 六.关于两种提交方法的比较 [1]阻塞队列 一.什么是阻塞队列? ① 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满. ②

  • Java并发编程之阻塞队列(BlockingQueue)详解

    目录 队列 阻塞队列 ArrayBlockingQueue 重要属性 构造方法 添加元素 add(e) offer(e) put(e) offer(e,time,unit) 移除元素 take() dequeue() LinkedBlockingQueue 重要属性 构造方法 添加元素 offer(e) put(e) 移除元素 poll() take() 对比 总结 大家好,我是小黑,一个在互联网苟且偷生的农民工. 队列 学过数据结构的同学应该都知道,队列是数据结构中一种特殊的线性表结构,和平时

  • Java并发编程之阻塞队列深入详解

    目录 1. 什么是阻塞队列 2. 阻塞队列的代码使用 3. 生产者消费者模型 (1)应用一:解耦合 (2)应用二:削峰填谷 (3)相关代码 4.阻塞队列和生产者消费者模型功能的实现 1. 什么是阻塞队列 阻塞队列是一种特殊的队列,和数据结构中普通的队列一样,也遵守先进先出的原则同时,阻塞队列是一种能保证线程安全的数据结构,并且具有以下两种特性:当队列满的时候,继续向队列中插入元素就会让队列阻塞,直到有其他线程从队列中取走元素:当队列为空的时候,继续出队列也会让队列阻塞,直到有其他线程往队列中插入

  • Java并发编程(CyclicBarrier)实例详解

    Java并发编程(CyclicBarrier)实例详解 前言: 使用JAVA编写并发程序的时候,我们需要仔细去思考一下并发流程的控制,如何让各个线程之间协作完成某项工作.有时候,我们启动N个线程去做一件事情,只有当这N个线程都达到某一个临界点的时候,我们才能继续下面的工作,就是说如果这N个线程中的某一个线程先到达预先定义好的临界点,它必须等待其他N-1线程也到达这个临界点,接下来的工作才能继续,只要这N个线程中有1个线程没有到达所谓的临界点,其他线程就算抢先到达了临界点,也只能等待,只有所有这N

  • java并发编程专题(五)----详解(JUC)ReentrantLock

    上一节我们了解了Lock接口的一些简单的说明,知道Lock锁的常用形式,那么这节我们正式开始进入JUC锁(java.util.concurrent包下的锁,简称JUC锁).下面我们来看一下Lock最常用的实现类ReentrantLock. 1.ReentrantLock简介 由单词意思我们可以知道这是可重入的意思.那么可重入对于锁而言到底意味着什么呢?简单来说,它有一个与锁相关的获取计数器,如果拥有锁的某个线程再次得到锁,那么获取计数器就加1,然后锁需要被释放两次才能获得真正释放.这模仿了 sy

  • java并发编程专题(三)----详解线程的同步

    有兴趣的朋友可以回顾一下前两篇 java并发编程专题(一)----线程基础知识 java并发编程专题(二)----如何创建并运行java线程 在现实开发中,我们或多或少的都经历过这样的情景:某一个变量被多个用户并发式的访问并修改,如何保证该变量在并发过程中对每一个用户的正确性呢?今天我们来聊聊线程同步的概念. 一般来说,程序并行化是为了获得更高的执行效率,但前提是,高效率不能以牺牲正确性为代价.如果程序并行化后, 连基本的执行结果的正确性都无法保证, 那么并行程序本身也就没有任何意义了.因此,

  • java并发编程StampedLock高性能读写锁详解

    目录 一.读写锁 二.悲观读锁 三.乐观读 一.读写锁 在我的<java并发编程>上一篇文章中为大家介绍了<ReentrantLock读写锁>,ReentrantReadWriteLock可以保证最多同时有一个线程在写数据,或者可以同时有多个线程读数据,但读写不能同时进行. 比如你正在做的是日志,有一个线程正在做写操作,但是在写日志的时候你可能需要把日志集中转移到集中管理日志服务,但是此时读线程不能读数据(因为无法获取读锁).面对这个需求,ReentrantReadWriteLoc

  • JAVA并发编程有界缓存的实现详解

    JAVA并发编程有界缓存的实现 1.有界缓存的基类 package cn.xf.cp.ch14; /** * *功能:有界缓存实现基类 *时间:下午2:20:00 *文件:BaseBoundedBuffer.java *@author Administrator * * @param <V> */ public class BaseBoundedBuffer<V> { private final V[] buf; private int tail; private int head

  • Java并发编程之常用的辅助类详解

    1.CountDownLatch 1.2.示例:班长锁门问题 问题描述:假如有7个同学晚上上自习,钥匙在班长手上,并且要负责锁门.班长必须要等所有人都走光了,班长才能关灯锁门.这6个同学的顺序是无序的,不知道它们是何时离开.6个同学各上各的自习,中间没有交互.假如说6个学生是普通线程,班长是主线程,如何让主线程要等一堆线程运行完了,主线程才能运行完成呢. public class CountDownLatchDemo { public static void main(String[] args

  • Java并发编程之阻塞队列详解

    1.什么是阻塞队列? 队列是一种数据结构,它有两个基本操作:在队列尾部加入一个元素,从队列头部移除一个元素.阻塞队里与普通的队列的区别在于,普通队列不会对当前线程产生阻塞,在面对类似消费者-生产者模型时,就必须额外的实现同步策略以及线程间唤醒策略.使用阻塞队列,就会对当前线程产生阻塞,当队列是空时,从队列中获取元素的操作将会被阻塞,当队列是满时,往队列里添加元素的操作也会被阻塞. 2.主要的阻塞队列及其方法 java.util.concurrent包下提供主要的几种阻塞队列,主要有以下几个: 1

  • Java 阻塞队列BlockingQueue详解

    目录 一. 前言 二. 认识BlockingQueue 三.BlockingQueue的核心方法: 四.常见BlockingQueue 五. 小结 一. 前言 在新增的Concurrent包中,BlockingQueue很好的解决了多线程中,如何高效安全“传输”数据的问题.通过这些高效并且线程安全的队列类,为我们快速搭建高质量的多线程程序带来极大的便利.本文详细介绍了BlockingQueue家庭中的所有成员,包括他们各自的功能以及常见使用场景. 二. 认识BlockingQueue 阻塞队列,

随机推荐