手把手带你理解java线程池之工作队列workQueue

目录
  • 线程池之工作队列
  • ArrayBlockingQueue
  • SynchronousQueue
  • LinkedBlockingDeque
  • LinkedBlockingQueue
  • LinkedTransferQueue
  • PriorityBlockingQueue

线程池之工作队列

ArrayBlockingQueue

采用数组来实现,并采用可重入锁ReentrantLock来做并发控制,无论是添加还是读取,都先要获得锁才能进行操作 可看出进行读写操作都使用了ReentrantLock,ArrayBlockingQueue需要为其指定容量

public boolean offer(E e) {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            if (count == items.length)
                return false;
            else {
                enqueue(e);
                return true;
            }
        } finally {
            lock.unlock();
        }
    }

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

SynchronousQueue

由于SynchronousQueue源码比较复杂,里面大量的Cas操作,SynchronousQueue没有容器,所以里面是装不了任务的,当一个生产者线程生产一个任务的 时候,如果没有对应的消费者消费,那么该生产者会一直阻塞,知道有消费者消费为止。
图示:

如下代码,如果我们将消费者线程注释掉执行,那么生产者哪里将会一直阻塞

package thread.customthreadpool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 测试SynchronousQueue
 */
public class SynchronousQueueTest {

    private static final SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();

    private static final ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) {
        /**
         * Provider
         */
        service.submit(() -> {
            try {
                synchronousQueue.put("liu");
            }catch (Exception e){
                e.printStackTrace();
            }
            System.out.println("Consumer finished spending");
        });

        /**
         * Consumer
         */
        service.submit(() ->{
            try {
                synchronousQueue.take();
            }catch (Exception e){
                e.printStackTrace();
            }
            System.out.println("take over");
        });
    }
}

LinkedBlockingDeque

LinkedBlockingDeque是一个双向队列,底层使用单链表实现,任何一段都可进行元素的读写操作,在初始化LinkedBlockingDeque的时候, 我们可以指定容量,也可不指定,如果不指定,则容量为Integer.MAX_VALUE,

注:Deque是双端队列,而Queue是单端队列,双端意思是两端都可以进行读写操作,而单端则只能从一端进,一端出(FIFO)

public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
}
package thread.customthreadpool;
import java.util.concurrent.LinkedBlockingDeque;
public class LinkedBlockingDequeTest {

    private static final LinkedBlockingDeque<Integer> deque = new LinkedBlockingDeque<>();

    public static void main(String[] args) throws InterruptedException {
        deque.put(1);
        deque.put(2);
        deque.put(3);
        deque.put(4);
        deque.put(5);
        System.out.println(deque);
        System.out.println("deque size  "+deque.size());
        deque.take();
        deque.take();
        deque.take();
        deque.take();
        deque.take();
        System.out.println(deque);
        System.out.println("deque size  "+deque.size());
    }
}

LinkedBlockingQueue

底层基于单向连表实现,是一个单向队列,具有先进先出(FIFO)特点,使用了ReentrantLock来做并发控制,读写操作都上锁

private final ReentrantLock putLock = new ReentrantLock();
    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

DelayDeque

DelayDeque是一个无界队列,添加进DelayDeque的元素会经过compareTo方法计算,然后按照时间 进行排序,排在队头的元素是最早到期的,越往后到期时间越长,DelayDeque只能接受Delayed接口类型 如图所示,队列里的元素并不是按照先进先出的规则,而是按照过期时间

package thread.customthreadpool.delayDeque;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class MyDelayed implements Delayed {

    private final String taskName ;
    private final long nowTime = System.currentTimeMillis();
    private final long expireTime ;

    public MyDelayed(String taskName,long expireTime) {
        this.taskName = taskName;
        this.expireTime = expireTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert((nowTime+expireTime) - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        MyDelayed myDelayed = (MyDelayed) o;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        return "MyDelayed{" +
                "taskName='" + taskName + '\'' +
                ", nowTime=" + nowTime +
                ", expireTime=" + expireTime +
                '}';
    }
}
package thread.customthreadpool.delayDeque;

import java.util.concurrent.*;

public class MyDelayQueue {

    private static final DelayQueue<MyDelayed> delayQueue = new DelayQueue<>();

    private static final ExecutorService service = Executors.newCachedThreadPool();

    public static void main(String[] args) throws InterruptedException {
        service.submit(() -> {
            delayQueue.put(new MyDelayed("A-Task",5000));
            delayQueue.put(new MyDelayed("B-Task",4000));
            delayQueue.put(new MyDelayed("C-Task",3000));
            delayQueue.put(new MyDelayed("D-Task",2000));
            delayQueue.put(new MyDelayed("E-Task",1000));
        });
        while (true){
            System.out.println(delayQueue.take());
        }
    }
}

result

应用场景

1.美团外卖订单:当我们下单后没付款 ,30分钟后将自动取消订单
2.缓存,对于某些任务,需要在特定的时间清理;

and so on

LinkedTransferQueue

当消费线程从队列中取元素时,如果队列为空,那么生成一个为null的节点,消费者线程就一直等待,此时如果生产者线程发现队列中有一个null节点, 它就不入队了,而是将元素填充到这个null节点并唤醒消费者线程,然后消费者线程取走元素。
LinkedTransferQueue是 SynchronousQueue 和 LinkedBlockingQueue 的整合,性能比较高,因为没有锁操作, SynchronousQueue不能存储元素,而LinkedTransferQueue能存储元素,

PriorityBlockingQueue

PriorityBlockingQueue是一个无界的阻塞队列,同时是一个支持优先级的队列,读写操作都是基于ReentrantLock, 内部使用堆算法保证每次出队都是优先级最高的元素

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        E result;
        try {
            while ( (result = dequeue()) == null)
                notEmpty.await();
        } finally {
            lock.unlock();
        }
        return result;
}

到此这篇关于手把手带你理解java线程池之工作队列workQueue的文章就介绍到这了,更多相关java线程池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java线程池详细解读

    目录 1.线程池 1.1 线程池概念 1.2 线程池的实现 2.StringBuffer类 面试题:请解释String.StringBuffer.StringBuilder的区别? 3.Runtime类 面试题:什么叫gc?如何处理 4.System类 面试题:请解释final.finally.finalize的区别? 5.对象克隆 6.Date类 6.1 日期处理类-Date类 6.2 日期格式化-SimpleDateFormat类(核心) 7. 数字操作类-Math类 7.1 随机数-Ran

  • 超详细讲解Java线程池

    目录 池化技术 池化思想介绍 池化技术的应用 如何设计一个线程池 Java线程池解析 ThreadPoolExecutor使用介绍 内置线程池使用 ThreadPoolExecutor解析 整体设计 线程池生命周期 任务管理解析 woker对象 Java线程池实践建议 不建议使用Exectuors 线程池大小设置 线程池监控 带着问题阅读 1.什么是池化,池化能带来什么好处 2.如何设计一个资源池 3.Java的线程池如何使用,Java提供了哪些内置线程池 4.线程池使用有哪些注意事项 池化技术

  • 一篇文章带你深入了解Java线程池

    目录 线程池模型 常用线程池 ThreadPoolExecutor 构造函数参数说明 线程池默认工作行为 ForkJoinPool FutureTask 线程数量分析 CPU密集型 IO密集型 总结 线程池模型 一般的池化模型会有两个方法,用于获取资源和释放资源,就像这样: public interface XXPool{ XX acquire(); void release(); } 但是,工程中的线程池一般是生产者和消费者模型,线程池是消费者,任务的提交者是生产者,下面是一个简化的线程池模型

  • java线程池详解及代码介绍

    目录 一.线程池简介 二.四种常见的线程池详解 三.缓冲队列BlockingQueue和自定义线程池ThreadPoolExecutor 总结 一.线程池简介 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池,使用线程池可以很好的提高性能,线程池在系统启动时既创建大量空闲的线程,程序将一个任务传给线程池.线程池就会启动一条线程来执行这个任务,执行结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待执行下一个任务. 线程池的工作机制 在线程池的编程模式下,任务是提交给整个

  • 手把手带你理解java线程池之工作队列workQueue

    目录 线程池之工作队列 ArrayBlockingQueue SynchronousQueue LinkedBlockingDeque LinkedBlockingQueue LinkedTransferQueue PriorityBlockingQueue 线程池之工作队列 ArrayBlockingQueue 采用数组来实现,并采用可重入锁ReentrantLock来做并发控制,无论是添加还是读取,都先要获得锁才能进行操作 可看出进行读写操作都使用了ReentrantLock,ArrayBl

  • 学生视角手把手带你写Java 线程池改良版

    目录 Java手写线程池(第二代) 第二代线程池的优化 线程池构造器 线程池拒绝策略 execute方法 手写线程池源码 MyExecutorService MyRejectedExecutionException MyRejectedExecutionHandle 核心类MyThreadPoolExecutor 线程池测试类 Java手写线程池(第二代) 第二代线程池的优化 1:新增了4种拒绝策略.分别为:MyAbortPolicy.MyDiscardPolicy.MyDiscardOldes

  • 学生视角手把手带你写Java 线程池

    目录 Java手写线程池(第一代) 手写线程池-定义参数 手写线程池-构造器 手写线程池-默认构造器 手写线程池-execute方法 手写线程池-处理任务 手写线程池-优雅关闭线程池 手写线程池-暴力关闭线程池 手写线程池-源代码 问题 Java手写线程池(第一代) 经常使用线程池,故今天突发奇想,手写一个线程池,会有很多不足,请多多宽容.因为这也是第一代的版本,后续会更完善. 手写线程池-定义参数 private final AtomicInteger taskcount=new Atomic

  • 深入理解Java 线程池

    线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的.在jdk1.5之后这一情况有了很大的改观.Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用.为我们在开发中处理线程的问题提供了非常大的帮助. 线程池的作用: 线程池作用就是限制系统中执行线程的数量.      根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果:少了浪费了系统资源,多了造成系统拥挤效率不高.用

  • 深入理解Java线程池从设计思想到源码解读

    线程池:从设计思想到源码解析 前言初识线程池线程池优势线程池设计思路 深入线程池构造方法任务队列拒绝策略线程池状态初始化&容量调整&关闭 使用线程池ThreadPoolExecutorExecutors封装线程池 解读线程池execute()addWorker()Worker类runWorker()processWorkerExit() 前言 各位小伙伴儿,春节已经结束了,在此献上一篇肝了一个春节假期的迟来的拜年之作,希望读者朋友们都能有收获. 根据穆氏哲学,投入越多,收获越大.我作此文时

  • 如何理解Java线程池及其使用方法

    目录 一.前言 二.总体的架构 三.研读ThreadPoolExecutor 3.1.任务缓存队列 3.2.拒绝策略 3.3.线程池的任务处理策略 3.4.线程池的关闭 3.5.源码分析 四.常见的四种线程池 4.1.newFixedThreadPool 4.2.newSingleThreadExecutor 4.3.newCachedThreadPool 4.4.newScheduledThreadPool 五.使用实例 5.1.newFixedThreadPool实例 5.2.newCach

  • 一文带你深入剖析Java线程池的前世今生

    目录 由线程到线程池 线程在做什么 为什么需要线程池 线程池实现原理 总结 由线程到线程池 线程在做什么 灵魂拷问:写了那么多代码,你能够用一句话简练描述线程在干啥吗? public class Demo01 {   public static void main(String[] args) {     var thread = new Thread(() -> {       System.out.println("Hello world from a Java thread"

  • 一篇文章带你搞懂Java线程池实现原理

    目录 1. 为什么要使用线程池 2. 线程池的使用 3. 线程池核心参数 4. 线程池工作原理 5. 线程池源码剖析 5.1 线程池的属性 5.2 线程池状态 5.3 execute源码 5.4 worker源码 5.5 runWorker源码 1. 为什么要使用线程池 使用线程池通常由以下两个原因: 频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程. 使用线程池可以更容易管理线程,线程池可以动态管理线程个数.具有阻塞队列.定时周期执行任务.环境隔离等. 2. 线程池的使用 /** *

  • java自带的四种线程池实例详解

    目录 java预定义的哪四种线程池? 四种线程池有什么区别? 线程池有哪几个重要参数? 如何自定义线程池 总结 java预定义的哪四种线程池? newSingleThreadExexcutor:单线程数的线程池(核心线程数=最大线程数=1) newFixedThreadPool:固定线程数的线程池(核心线程数=最大线程数=自定义) newCacheThreadPool:可缓存的线程池(核心线程数=0,最大线程数=Integer.MAX_VALUE) newScheduledThreadPool:

  • 一篇文章带你了解如何正确使用java线程池

    目录 1.线程是不是越多越好? 2.如何正确使用多线程? 3.Java线程池的工作原理 4.掌握JUC线程池API 总结 1.线程是不是越多越好? 在学习多线程之前,读者可能会有疑问?如果单线程跑得太慢,那么是否就能多创建多个线程来跑任务?并发的情况,线程是不是创建越多越好?这是一个很经典的问题,画图表示一下创建很多线程的情况,然后进行情况分析. 创建线程和销毁线程都是需要时间的,如果创建时间+销毁时间>执行任务时间就很不划算 创建后的线程是需要内存去存放的,创建的线程对应一个Thread对象,

随机推荐