学生视角手把手带你写Java 线程池

目录
  • Java手写线程池(第一代)
    • 手写线程池-定义参数
    • 手写线程池-构造器
    • 手写线程池-默认构造器
    • 手写线程池-execute方法
    • 手写线程池-处理任务
    • 手写线程池-优雅关闭线程池
    • 手写线程池-暴力关闭线程池
    • 手写线程池-源代码
    • 问题

Java手写线程池(第一代)

经常使用线程池,故今天突发奇想,手写一个线程池,会有很多不足,请多多宽容。因为这也是第一代的版本,后续会更完善。

手写线程池-定义参数

	private final AtomicInteger taskcount=new AtomicInteger(0);
    private final AtomicInteger threadNumber=new AtomicInteger(0);
    private volatile int corePoolSize;
    private final Set<MyThreadPoolExecutor.MyWorker> workers;
    private final BlockingQueue<Runnable> waitingQueue;
    private final String THREADPOOL_NAME="MyThread-Pool-";
    private volatile boolean isRunning=true;
    private volatile boolean STOPNOW=false;
    private final ThreadFactory threadFactory; 
  • taskcount:执行任务次数
  • threadNumber:线程编号,从0开始依次递增。
  • corePoolSize:核心线程数
  • workers:工作线程
  • waitingQueue:等待队列
  • THREADPOOL_NAME:线程名称
  • isRunning:是否运行
  • STOPNOW:是否立刻停止
  • threadFactory:线程工厂

手写线程池-构造器

    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this.corePoolSize=corePoolSize;
        this.workers=new HashSet<>(corePoolSize);
        this.waitingQueue=waitingQueue;
        this.threadFactory=threadFactory;
        //线程预热
        for (int i = 0; i < corePoolSize; i++) {
            new MyWorker();
        }
    }

该构造器作用:

1:对参数进行赋值。

2:线程预热。根据corePoolSize的大小来调用MyWorker的构造器。我们可以看看MyWorker构造器做了什么。

	final Thread thread; //为每个MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }
  • MyWorker构造器通过线程工厂对当前对象生成Thread;
  • 并设置线程名为:MyThread-Pool-自增线程编号;
  • 然后调用线程的start方法启动线程;
  • 最后存放在workers这个Set集合中,这样就可以实现线程复用了。

手写线程池-默认构造器

	public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory());
    }
  • 默认构造器的赋初始值:
  • corePoolSize:5
  • waitingQueue:new ArrayBlockingQueue<>(10),长度为10的有限阻塞队列
  • threadFactory:Executors.defaultThreadFactory()

手写线程池-execute方法

	public boolean execute(Runnable runnable)
    {
        return waitingQueue.offer(runnable);
    }
  • 本质上其实就是把Runnable(任务)放到waitingQueue中。

手写线程池-处理任务

	   @Override
        public void run() {
            //循环接收任务
                while (true)
                {
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }

本质上就是一个死循环接收任务,退出条件如下:

  • 优雅的退出。当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了)
  • 暴力退出。当STOPNOW为true,则说明调用了shutdownNow方法
  • else语句块会不断取任务,当任务!=null时则调用run方法处理任务

手写线程池-优雅关闭线程池

	public void shutdown()
    {
        this.isRunning=false;
    }

手写线程池-暴力关闭线程池

	public void shutdownNow()
    {
        this.STOPNOW=true;
    }

手写线程池-源代码

  • 手写线程池类的源代码
package com.springframework.concurrent;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程池类
 * @author 游政杰
 */
public class MyThreadPoolExecutor {

    private final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数
    private final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号
    private volatile int corePoolSize; //核心线程数
    private final Set<MyThreadPoolExecutor.MyWorker> workers; //工作线程
    private final BlockingQueue<Runnable> waitingQueue; //等待队列
    private final String THREADPOOL_NAME="MyThread-Pool-";//线程名称
    private volatile boolean isRunning=true; //是否运行
    private volatile boolean STOPNOW=false; //是否立刻停止
    private final ThreadFactory threadFactory; //线程工厂

    public MyThreadPoolExecutor(){
        this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory());
    }

    public MyThreadPoolExecutor(int corePoolSize, BlockingQueue<Runnable> waitingQueue,ThreadFactory threadFactory) {
        this.corePoolSize=corePoolSize;
        this.workers=new HashSet<>(corePoolSize);
        this.waitingQueue=waitingQueue;
        this.threadFactory=threadFactory;
        //线程预热
        for (int i = 0; i < corePoolSize; i++) {
            new MyWorker();
        }
    }

    /**
     * MyWorker就是我们每一个线程对象
     */
    private final class MyWorker implements Runnable{

        final Thread thread; //为每个MyWorker

        MyWorker(){
            Thread td = threadFactory.newThread(this);
            td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());
            this.thread=td;
            this.thread.start();
            workers.add(this);
        }

        @Override
        public void run() {
            //循环接收任务
                while (true)
                {
                    //循环退出条件:
                    //1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。
                    //2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。
                    if((!isRunning&&waitingQueue.size()==0)||STOPNOW)
                    {
                        break;
                    }else {
                        //不断取任务,当任务!=null时则调用run方法处理任务
                        Runnable runnable = waitingQueue.poll();
                        if(runnable!=null){
                            runnable.run();
                            System.out.println("task==>"+taskcount.incrementAndGet());
                        }
                    }
                }
        }
    }

    public boolean execute(Runnable runnable)
    {
        return waitingQueue.offer(runnable);
    }
    //优雅的关闭
    public void shutdown()
    {
        this.isRunning=false;
    }
    //暴力关闭
    public void shutdownNow()
    {
        this.STOPNOW=true;
    }
}
  • 测试使用手写线程池代码
package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;

public class ThreadPoolTest {

  public static void main(String[] args) {

      MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor
              (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory());

      for(int i=0;i<10;i++){

          int finalI = i;
          myThreadPoolExecutor.execute(()->{
              System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);
          });

      }

      myThreadPoolExecutor.shutdown();

//      myThreadPoolExecutor.shutdownNow();

  }
}

问题

为什么自定义线程池的execute执行的任务有时会变少?

那是因为waitingQueue满了放不下任务了,导致任务被丢弃,相当于DiscardPolicy拒绝策略

解决办法有:

1:设置最大线程数,自动对线程池扩容。

2:调大waitingQueue的容量capacity

最后:因为这是我手写的线程池的初代版本,基本实现线程池的复用功能,然而还有很多未完善,将来会多出几篇完善后的文章,对目前手写的线程池进行升级。

后续还会继续出关于作者手写Spring框架,手写Tomcat等等框架的博文!!!!!

到此这篇关于学生视角手把手带你写Java 线程池的文章就介绍到这了,更多相关Java 线程池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java线程池不同场景如何使用及示例经验总结

    https://blog.csdn.net/qq_34272760/article/details/120648262?spm=1001.2014.3001.5502

  • Java线程池流程编排运用实战源码

    目录 引导语 1.流程引擎关键代码回顾 2.异步执行SpringBean 3.如何区分异步的SpringBean 4.mock流程引擎数据中心 5.新建线程池 6.测试 7.总结 引导语 在线程池的面试中,面试官除了喜欢问 ThreadPoolExecutor 的底层源码外,还喜欢问你有没有在实际的工作中用过 ThreadPoolExecutor,我们在并发集合类的<场景集合:并发 List.Map 的应用场景>一文中说过一种简单的流程引擎,如果没有看过的同学,可以返回去看一下. 本章就在流程

  • Java自定义线程池的实现示例

    目录 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 二.JDK线程池工具类. 三.业界知名自定义线程池扩展使用. 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 1.继承Thread类,(Thread类实现Runnable接口),来个类图加深印象. 2.实现Runnable接口实现无返回值.实现run()方法,啥时候run,黑话了. 3.实现Callable接口重写call()+FutureTask获取. public class CustomThread {

  • Java线程池必知必会知识点总结

    目录 1.线程数使用开发规约 2.ThreadPoolExecutor源码 1.构造函数 2.核心参数 3.execute()方法 3.线程池的工作流程 4.Executors创建返回ThreadPoolExecutor对象(不推荐) 1.Executors#newCachedThreadPool=>创建可缓存的线程池 2.Executors#newSingleThreadExecutor=>创建单线程的线程池 3.Executors#newFixedThreadPool=>创建固定长度

  • Java线程池7个参数的详细含义

    目录 一.corePoolSize线程池核心线程大小 二.maximumPoolSize线程池最大线程数量 三.keepAliveTime空闲线程存活时间 四.unit空闲线程存活时间单位 五.workQueue工作队列 六.threadFactory线程工厂 七.handler拒绝策略 java多线程开发时,常常用到线程池技术,这篇文章是对创建java线程池时的七个参数的详细解释. 从源码中可以看出,线程池的构造函数有7个参数 这 7 个参数分别是: corePoolSize:核心线程数. m

  • java线程池使用及原理面试题

    目录 引导语 1.说说你对线程池的理解? 2.ThreadPoolExecutor.Executor.ExecutorService.Runnable.Callable.FutureTask 之间的关系? 3.说一说队列在线程池中起的作用? 4.结合请求不断增加时,说一说线程池构造器参数的含义和表现? 5.coreSize 和 maxSize 可以动态设置么,有没有规则限制? 6.说一说对于线程空闲回收的理解,源码中如何体现的? 7.如果我想在线程池任务执行之前和之后,做一些资源清理的工作,可以

  • Java中该如何优雅的使用线程池详解

    目录 为什么要用线程池? 线程池 线程池基本概念 线程池接口定义和实现类 ExecutorService ScheduledExecutorService 线程池工具类 newFixedThreadPool(int nThreads) newCachedThreadPool() newSingleThreadExecutor() newScheduledThreadPool(int corePoolSize) 任务线程池执行过程 如何确认合适的线程数量? 线程池的使用分析 合理配置线程池大小 线

  • java线程池ThreadPoolExecutor类使用详解

    在<阿里巴巴java开发手册>中指出了线程资源必须通过线程池提供,不允许在应用中自行显示的创建线程,这样一方面是线程的创建更加规范,可以合理控制开辟线程的数量:另一方面线程的细节管理交给线程池处理,优化了资源的开销.而线程池不允许使用Executors去创建,而要通过ThreadPoolExecutor方式,这一方面是由于jdk中Executor框架虽然提供了如newFixedThreadPool().newSingleThreadExecutor().newCachedThreadPool(

  • 学生视角手把手带你写Java 线程池改良版

    目录 Java手写线程池(第二代) 第二代线程池的优化 线程池构造器 线程池拒绝策略 execute方法 手写线程池源码 MyExecutorService MyRejectedExecutionException MyRejectedExecutionHandle 核心类MyThreadPoolExecutor 线程池测试类 Java手写线程池(第二代) 第二代线程池的优化 1:新增了4种拒绝策略.分别为:MyAbortPolicy.MyDiscardPolicy.MyDiscardOldes

  • 学生视角手把手带你写Java 线程池

    目录 Java手写线程池(第一代) 手写线程池-定义参数 手写线程池-构造器 手写线程池-默认构造器 手写线程池-execute方法 手写线程池-处理任务 手写线程池-优雅关闭线程池 手写线程池-暴力关闭线程池 手写线程池-源代码 问题 Java手写线程池(第一代) 经常使用线程池,故今天突发奇想,手写一个线程池,会有很多不足,请多多宽容.因为这也是第一代的版本,后续会更完善. 手写线程池-定义参数 private final AtomicInteger taskcount=new Atomic

  • 手把手带你理解java线程池之工作队列workQueue

    目录 线程池之工作队列 ArrayBlockingQueue SynchronousQueue LinkedBlockingDeque LinkedBlockingQueue LinkedTransferQueue PriorityBlockingQueue 线程池之工作队列 ArrayBlockingQueue 采用数组来实现,并采用可重入锁ReentrantLock来做并发控制,无论是添加还是读取,都先要获得锁才能进行操作 可看出进行读写操作都使用了ReentrantLock,ArrayBl

  • 非常适合新手学生的Java线程池超详细分析

    目录 线程池的好处 创建线程池的五种方式 缓存线程池CachedThreadPool 固定容量线程池FixedThreadPool 单个线程池SingleThreadExecutor 定时任务线程池ScheduledThreadPool ThreadPoolExecutor创建线程池(十分推荐) ThreadPoolExecutor的七个参数详解 workQueue handler 如何触发拒绝策略和线程池扩容? 线程池的好处 可以实现线程的复用,避免重新创建线程和销毁线程.创建线程和销毁线程对

  • 非常适合新手学生的Java线程池优化升级版

    目录 升级版线程池的优化 线程池构造器 线程池拒绝策略 execute方法 手写线程池源码 MyExecutorService MyRejectedExecutionException MyRejectedExecutionHandle 核心类MyThreadPoolExecutor 线程池测试类 升级版线程池的优化 1:新增了4种拒绝策略.分别为:MyAbortPolicy.MyDiscardPolicy.MyDiscardOldestPolicy.MyCallerRunsPolicy 2:对

  • Java实现手写一个线程池的示例代码

    目录 概述 线程池框架设计 代码实现 阻塞队列的实现 线程池消费端实现 获取任务超时设计 拒绝策略设计 概述 线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记. 线程池框架设计 我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的.同

  • 一文带你深入剖析Java线程池的前世今生

    目录 由线程到线程池 线程在做什么 为什么需要线程池 线程池实现原理 总结 由线程到线程池 线程在做什么 灵魂拷问:写了那么多代码,你能够用一句话简练描述线程在干啥吗? public class Demo01 {   public static void main(String[] args) {     var thread = new Thread(() -> {       System.out.println("Hello world from a Java thread"

  • 一篇文章带你了解如何正确使用java线程池

    目录 1.线程是不是越多越好? 2.如何正确使用多线程? 3.Java线程池的工作原理 4.掌握JUC线程池API 总结 1.线程是不是越多越好? 在学习多线程之前,读者可能会有疑问?如果单线程跑得太慢,那么是否就能多创建多个线程来跑任务?并发的情况,线程是不是创建越多越好?这是一个很经典的问题,画图表示一下创建很多线程的情况,然后进行情况分析. 创建线程和销毁线程都是需要时间的,如果创建时间+销毁时间>执行任务时间就很不划算 创建后的线程是需要内存去存放的,创建的线程对应一个Thread对象,

  • java自带的四种线程池实例详解

    目录 java预定义的哪四种线程池? 四种线程池有什么区别? 线程池有哪几个重要参数? 如何自定义线程池 总结 java预定义的哪四种线程池? newSingleThreadExexcutor:单线程数的线程池(核心线程数=最大线程数=1) newFixedThreadPool:固定线程数的线程池(核心线程数=最大线程数=自定义) newCacheThreadPool:可缓存的线程池(核心线程数=0,最大线程数=Integer.MAX_VALUE) newScheduledThreadPool:

  • 一篇文章带你搞懂Java线程池实现原理

    目录 1. 为什么要使用线程池 2. 线程池的使用 3. 线程池核心参数 4. 线程池工作原理 5. 线程池源码剖析 5.1 线程池的属性 5.2 线程池状态 5.3 execute源码 5.4 worker源码 5.5 runWorker源码 1. 为什么要使用线程池 使用线程池通常由以下两个原因: 频繁创建销毁线程需要消耗系统资源,使用线程池可以复用线程. 使用线程池可以更容易管理线程,线程池可以动态管理线程个数.具有阻塞队列.定时周期执行任务.环境隔离等. 2. 线程池的使用 /** *

随机推荐