Java实现手写线程池的示例代码

目录
  • 前言
  • 线程池给我们提供的功能
  • 工具介绍
  • Worker设计
  • 线程池设计
  • 总结

前言

在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题。多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作。在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮助大家理解线程池的核心原理!!!

线程池给我们提供的功能

我们首先来看一个使用线程池的例子:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Demo01 {

  public static void main(String[] args) {
    ExecutorService pool = Executors.newFixedThreadPool(5);
    for (int i = 0; i < 100; i++) {
      pool.execute(new Runnable() {
        @Override
        public void run() {
          for (int i = 0; i < 100; i++) {
            System.out.println(Thread.currentThread().getName() + " print " + i);
          }
        }
      });
    }
  }
}

在上面的例子当中,我们使用Executors.newFixedThreadPool去生成来一个固定线程数目的线程池,在上面的代码当中我们是使用5个线程,然后通过execute方法不断的去向线程池当中提交任务,大致流程如下图所示:

线程池通过execute函数不断的往线程池当中的任务队列加入任务,而线程池当中的线程会不断的从任务队列当中取出任务,然后进行执行,然后继续取任务,继续执行....,线程的执行过程如下:

while (true) {
  Runnable runnable = taskQueue.take(); // 从任务队列当中取出任务
  runnable.run(); // 执行任务
}

根据上面所谈到的内容,现在我们的需求很清晰了,首先我们需要有一个队列去存储我们所需要的任务,然后需要开启多个线程不断的去任务队列当中取出任务,然后进行执行,然后重复取任务执行任务的操作。

工具介绍

在我们前面提到的线程池实现的原理当中有一个非常重要的数据结构,就是ArrayBlockingQueue阻塞队列,它是一个并发安全的数据结构,我们首先先简单介绍一下这个数据结构的使用方法。(如果你想深入了解阻塞队列的实现原理,可以参考这篇文章JDK数组阻塞队列源码剖析)

我们主要用的是ArrayBlockingQueue的下面两个方法:

put函数,这个函数是往线程当中加入数据的。我们需要了解的是,如果一个线程调用了这个函数往队列当中加入数据,如果此时队列已经满了则线程需要被挂起,如果没有满则需要将数据加入到队列当中,也就是将数据存储到数组当中。

take函数,从队列当中取出数据,但是当队列为空的时候需要将调用这个方法的线程阻塞。当队列当中有数据的时候,就可以从队列当中取出数据。

需要注意的是,如果一个线程被上面两个任何一个线程阻塞之后,可以调用对应线程的interrupt方法终止线程的执行,同时还会抛出一个异常。

下面是一份测试代码:

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class QueueTest {

  public static void main(String[] args) throws InterruptedException {
    ArrayBlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(5); // 队列的容量为5
    Thread thread = new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        try {
          queue.put(i);
          System.out.println("数据 " + i + "被加入到队列当中");
        } catch (InterruptedException e) {
          System.out.println("出现了中断异常");
          // 如果出现中断异常 则退出 线程就不会一直在 put 方法被挂起了
          return;
        }finally {
        }
      }
    });
    thread.start();
    TimeUnit.SECONDS.sleep(1);
    thread.interrupt();
  }
}

上面代码输出结果:

数据 0被加入到队列当中
数据 1被加入到队列当中
数据 2被加入到队列当中
数据 3被加入到队列当中
数据 4被加入到队列当中
出现了中断异常

上面代码的执行顺序是:

线程thread会将0-4这5个数据加入到队列当中,但是在加入第6个数据的时候,阻塞队列已经满了,因此在加入数据的时候线程thread会被阻塞,然后主线程在休息一秒之后中断了线程thread,然后线程thread发生了中断异常,然后被捕获进入catch代码块,然后函数返回,线程thread就不会一直被阻塞了,这一点在我们后面写线程池非常重要!!!

Worker设计

在前文当中我们已经提到了我们的线程需要不断的去任务队列里面取出任务然后执行,我们设计一个Worker类去做这件事!

首先在类当中肯定需要有一个线程池的任务队列,因为worker需要不断的从阻塞队列当中取出任务进行执行。

我们用一个isStopped变量表示线程是否需要终止了,也就是线程池是否需要关闭,如果线程池需要关闭了,那么线程也应该停止了。

我们还需要有一个变量记录执行任务的线程,因为当我们需要关闭线程池的时候需要等待任务队列当中所有的任务执行完成,那么当所有的任务都执行执行完成的时候,队列肯定是空的,而如果这个时候有线程还去取任务,那么肯定会被阻塞,前面已经提到了ArrayBlockingQueue的使用方法了,我们可以使用这个线程的interrupt的方法去中断这个线程的执行,这个线程会出现异常,然后这个线程捕获这个异常就可以退出了,因此我们需要知道对那个线程执行interrupt方法!

Worker实现的代码如下:

import java.util.concurrent.ArrayBlockingQueue;

public class Worker implements Runnable {

  // 用于保存任务的队列
  private ArrayBlockingQueue<Runnable> tasks;
  // 线程的状态 是否终止
  private volatile boolean isStopped;

  // 保存执行 run 方法的线程
  private volatile Thread thisThread;

  public Worker(ArrayBlockingQueue<Runnable> tasks) {
    // 这个参数是线程池当中传入的
    this.tasks = tasks;
  }

  @Override
  public void run() {
    thisThread = Thread.currentThread();
    while (!isStopped) {
      try {
        Runnable task = tasks.take();
        task.run();
      } catch (InterruptedException e) {
        // do nothing
      }
    }
  }
    // 注意是其他线程调用这个方法 同时需要注意是 thisThread 这个线程在执行上面的 run 方法
  // 其他线程调用 thisThread 的 interrupt 方法之后 thisThread 会出现异常 然后就不会一直阻塞了
  // 会判断 isStopped 是否为 true 如果为 true 的话就可以退出 while 循环了
  public void stop() {
    isStopped = true;
    thisThread.interrupt(); // 中断线程 thisThread
  }

  public boolean isStopped(){
    return isStopped;
  }
}

线程池设计

首先线程池需要可以指定有多少个线程,阻塞队列的最大长度,因此我们需要有这两个参数。

线程池肯定需要有一个队列去存放通过submit函数提交的任务。

需要有一个变量存储所有的woker,因为线程池关闭的时候需要将这些worker都停下来,也就是调用worker的stop方法。

需要有一个shutDown函数表示关闭线程池。

需要有一个函数能够停止所有线程的执行,因为关闭线程池就是让所有线程的工作停下来。

线程池实现代码:

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;

public class MyFixedThreadPool {
  // 用于存储任务的阻塞队列
  private ArrayBlockingQueue<Runnable> taskQueue;

  // 保存线程池当中所有的线程
  private ArrayList<Worker> threadLists;

  // 线程池是否关闭
  private boolean isShutDown;

  // 线程池当中的线程数目
  private int numThread;

  public MyFixedThreadPool(int i) {
    this(Runtime.getRuntime().availableProcessors() + 1, 1024);
  }

  public MyFixedThreadPool(int numThread, int maxTaskNumber) {
    this.numThread = numThread;
    taskQueue = new ArrayBlockingQueue<>(maxTaskNumber); // 创建阻塞队列
    threadLists = new ArrayList<>();
    // 将所有的 worker 都保存下来
    for (int i = 0; i < numThread; i++) {
      Worker worker = new Worker(taskQueue);
      threadLists.add(worker);
    }
    for (int i = 0; i < threadLists.size(); i++) {
      new Thread(threadLists.get(i),
              "ThreadPool-Thread-" + i).start(); // 让worker开始工作
    }
  }

  // 停止所有的 worker 这个只在线程池要关闭的时候才会调用
  private void stopAllThread() {
    for (Worker worker : threadLists) {
      worker.stop(); // 调用 worker 的 stop 方法 让正在执行 worker 当中 run 方法的线程停止执行
    }
  }

  public void shutDown() {
    // 等待任务队列当中的任务执行完成
    while (taskQueue.size() != 0) {
      // 如果队列当中还有任务 则让出 CPU 的使用权
      Thread.yield();
    }
    // 在所有的任务都被执行完成之后 停止所有线程的执行
    stopAllThread();
  }

  public void submit(Runnable runnable) {
    try {
      taskQueue.put(runnable); // 如果任务队列满了, 调用这个方法的线程会被阻塞
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}

测试代码:

public class Test {

  public static void main(String[] args) {
    MyFixedThreadPool pool = new MyFixedThreadPool(5, 1024); // 开启5个线程 任务队列当中最多只能存1024个任务
    for (int i = 0; i < 1000000; i++) {
      pool.submit(() -> {
        System.out.println(Thread.currentThread().getName()); // 提交的任务就是打印线程自己的名字
      });
    }
    pool.shutDown();
  }
}

上面的代码是可以正常执行并且结束的,这个输出太长了这里只列出部分输出结果:

ThreadPool-Thread-0
ThreadPool-Thread-4
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-1
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-3
ThreadPool-Thread-2
ThreadPool-Thread-1
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-0
ThreadPool-Thread-1
ThreadPool-Thread-4

从上面的输出我们可以看见线程池当中只有5个线程,这5个线程在不断从队列当中取出任务然后执行,因为我们可以看到同一个线程的名字输出了多次。

总结

在本篇文章当中主要介绍了线程池的原理,以及我们应该去如何设计一个线程池,同时也介绍了在阻塞队列当中一个非常重要的数据结构ArrayBlockingQueue的使用方法。

线程池当中有一个阻塞队列去存放所有被提交到线程池当中的任务。

所有的Worker会不断的从任务队列当中取出任务然后执行。

线程池的shutDown方法其实比较难思考该怎么实现的,首先在我们真正关闭线程池之前需要将任务队列当中所有的任务执行完成,然后将所有的线程停下来。

在所有的任务执行完成之后,可能有的线程就会阻塞在take方法上(从队列当中取数据的方法,如果队列为空线程会阻塞),好在ArrayBlockingQueue在实现的时候就考虑到了这个问题,只需要其他线程调用了这个被阻塞线程的interrupt方法的话,线程就可以通过捕获异常恢复执行,然后判断isStopped,如果需要停止了就跳出while循环,这样的话我们就可以完成所有线程的停止操作了。

到此这篇关于Java实现手写线程池的示例代码的文章就介绍到这了,更多相关Java线程池内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java线程组构造方法源码解析

    目录 线程组构造方法 为啥开启线程,就能统计到呢? 线程组下面可以有线程组吗? 线程组构造方法 我们看这个线程组,线程组名字是system,设置优先级,然后指定父线程是空,可以看出这个是根线程组,这个方法是私有的,不是给我们调用的. 把当前线程加入线程组中 我们试试上述代码逻辑,对叭,没指定线程组就加入当前创建的main线程的线程组,如果指定线程组就是当前线程组. 我们看源码也是一样: 活跃线程和活跃线程组数量都是0: 我们这是没开启线程,所以我们需要开启线程,这里就看到活跃线程数量了: 为啥开

  • Java创建线程的五种写法总结

    目录 通过继承Thread类并实现run方法创建一个线程 通过实现Runnable接口,并实现run方法的方法创建一个线程 通过Thread匿名内部类创建一个线程 通过Runnable匿名内部类创建一个线程 通过Lambda表达式的方式创建一个线程 通过继承Thread类并实现run方法创建一个线程 // 定义一个Thread类,相当于一个线程的模板 class MyThread01 extends Thread { // 重写run方法// run方法描述的是线程要执行的具体任务@Overri

  • Java查看线程运行状态的方法详解

    目录 一.查看线程的运行状态 二.解题思路 三.代码详解 一.查看线程的运行状态 题目 线程有以下6种状态:新建.运行.阻塞.等待.计时等待和终止. new新线程时,线程处于新建 状态. 调用start()方法时,线程处于运行状态. 当线程需要获得对象的内置锁,而该锁正被其他线程拥有,线程处于阻塞状态. 线程等待其他线程通知调度表可以运行时,该线程处于等待状态. 对于一些含有时间参数的方法,如 Thread 类的 sleep()方法,可以使线程处于计时等待状态. 当run()方法运行完毕或出现异

  • Java查看和修改线程优先级操作详解

    目录 查看和修改线程优先级 1.题目 2.解题思路 3.代码详解 查看和修改线程优先级 1.题目 JAVA中每个线程都有优化级属性,默认情况下,新建的线程和创建该线程的线程优先级是一样的.当线程调度器选择要运行的线程时,会选择优先级高的线程. 实现:查看和修改线程的优先级 2.解题思路 创建一个类:ThreadPriorityTest,继承了JFrame.用来界面显示当前线程组中运行的线程. 定义2个方法: do_this_windowActivated():用来监听窗体激活事件 do_butt

  • Java线程池并发执行多个任务方式

    目录 Java线程池并发执行多个任务 Java线程池的正确使用 1.Executors存在什么问题 2.Executors为什么存在缺陷 3. 线程池参数详解 4. 线程池正确打开方式 Java线程池并发执行多个任务 Java在语言层面提供了多线程的支持,线程池能够避免频繁的线程创建和销毁的开销,因此很多时候在项目当中我们是使用的线程池去完成多线程的任务. Java提供了Executors 框架提供了一些基础的组件能够轻松的完成多线程异步的操作,Executors提供了一系列的静态工厂方法能够获

  • Java多线程实现第三方数据同步

    本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下 一.场景 最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+.在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换:增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换.在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!! 二.获取数据的方式 2.1 递归方式

  • Java多线程工具CompletableFuture的使用教程

    目录 前言 Future的问题 CompletableFuture应运而生 使用方式 基本使用-提交异步任务 处理上个异步任务结果 对两个结果进行选用-acceptEither 对两个结果进行合并-thenCombine, thenAcceptBoth 异常处理 案例 大量用户发送短信|消息 并发获取商品不同信息 问题 thenRun和thenRunAsync有什么区别 handle和exceptional有什么区别 最后 前言 Future的问题 写多线程程序的时候,可以使用Future从一个

  • Java多线程ThreadPoolExecutor详解

    目录 1 newFixedThreadPool 2 newCachedThreadPool 3 newSingleThreadExecutor 4 提交任务 5 关闭线程池 前言: 根据ThreadPoolExecutor的构造方法,JDK提供了很多工厂方法来创建各种用途的线程池. 1 newFixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolE

  • Java线程的五种状态介绍

    目录 1. 线程的5种状态 2. Java线程的6种状态 3. Java线程状态的转换 1. 线程的5种状态 从操作系统层面上,任何线程一般都具有五种状态,即创建.就绪.运行.阻塞.终止. (1) 新建状态(NEW) 在程序中用构造方法创建一个新线程时,如new Thread(),该线程就是创建状态,此时它已经有了相应的内存空间和其它资源,但是还没有开始执行. (2) 就绪状态(READ) 新建线程对象后,调用该线程的start()方法就可以启动线程.当线程启动时,线程就进入就绪状态(runna

  • Java实现手写线程池的示例代码

    目录 前言 线程池给我们提供的功能 工具介绍 Worker设计 线程池设计 总结 前言 在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题.多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作.在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮

  • Java实现手写线程池实例并测试详解

    前言 在之前的文章中介绍过线程池的核心原理,在一次面试中面试官让手写线程池,这块知识忘记的差不多了,因此本篇文章做一个回顾. 希望能够加深自己的印象以及帮助到其他的小伙伴儿们 在线程池核心原理篇介绍过线程池的核心原理,今天来模拟线程池和工作队列的流程,以及编写代码和测试类进行测试.下面附下之前线程池的核心流程: 在线程池核心原理的源码中,涉及到了一系列的流程,包括线程池队列数量是否已满,运用什么样的拒绝策略等.在我们手写线程池的代码中,不需要考虑那么多因素,只需要模拟简单的情景和过程,因此整体来

  • Java实现手写自旋锁的示例代码

    目录 前言 自旋锁 原子性 自己动手写自旋锁 自己动手写可重入自旋锁 总结 前言 我们在写并发程序的时候,一个非常常见的需求就是保证在某一个时刻只有一个线程执行某段代码,像这种代码叫做临界区,而通常保证一个时刻只有一个线程执行临界区的代码的方法就是锁.在本篇文章当中我们将会仔细分析和学习自旋锁,所谓自旋锁就是通过while循环实现的,让拿到锁的线程进入临界区执行代码,让没有拿到锁的线程一直进行while死循环,这其实就是线程自己“旋”在while循环了,因而这种锁就叫做自旋锁. 自旋锁 原子性

  • Java手写线程池的实现方法

    本文实例为大家分享了Java手写线程池的实现代码,供大家参考,具体内容如下 1.线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程. 2.线程池简易架构 3.简易线程池代码(自行优化) import java.util.List; /** * 线程接口 * * @Author yjian * @Date 14:49 2017/10/14 **/ public interface IThreadPool { //加入任务 void ex

  • Java实现手写一个线程池的示例代码

    目录 概述 线程池框架设计 代码实现 阻塞队列的实现 线程池消费端实现 获取任务超时设计 拒绝策略设计 概述 线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记. 线程池框架设计 我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的.同

  • Java实现手写乞丐版线程池的示例代码

    目录 前言 线程池的具体实现 线程池实现思路 线程池实现代码 线程池测试代码 杂谈 总结 前言 在上篇文章线程池的前世今生当中我们介绍了实现线程池的原理,在这篇文章当中我们主要介绍实现一个非常简易版的线程池,深入的去理解其中的原理,麻雀虽小,五脏俱全. 线程池的具体实现 线程池实现思路 任务保存到哪里? 在上篇文章线程池的前世今生当中我们具体去介绍了线程池当中的原理.在线程池当中我们有很多个线程不断的从任务池(用户在使用线程池的时候不断的使用execute方法将任务添加到线程池当中)里面去拿任务

  • Java手写线程池之向JDK线程池进发

    目录 前言 JDK线程池一瞥 自己动手实现线程池 线程池参数介绍 实现Runnable 实现Callable 拒绝策略的实现 线程池关闭实现 工作线程的工作实现 线程池实现的BUG 完整代码 线程池测试 总结 前言 在前面的文章自己动手写乞丐版线程池中,我们写了一个非常简单的线程池实现,这个只是一个非常简单的实现,在本篇文章当中我们将要实现一个和JDK内部实现的线程池非常相似的线程池. JDK线程池一瞥 我们首先看一个JDK给我们提供的线程池ThreadPoolExecutor的构造函数的参数:

  • 利用Java手写阻塞队列的示例代码

    目录 前言 需求分析 阻塞队列实现原理 线程阻塞和唤醒 数组循环使用 代码实现 成员变量定义 构造函数 put函数 offer函数 add函数 take函数 重写toString函数 完整代码 总结 前言 在我们平时编程的时候一个很重要的工具就是容器,在本篇文章当中主要给大家介绍阻塞队列的原理,并且在了解原理之后自己动手实现一个低配版的阻塞队列. 需求分析 在前面的两篇文章ArrayDeque(JDK双端队列)源码深度剖析和深入剖析(JDK)ArrayQueue源码当中我们仔细介绍了队列的原理,

  • java 百度手写文字识别接口配置代码

    代码如下所示: package org.fh.util; import org.json.JSONObject; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.util.List; import java.util.Map; /** * 说明:获取文字识别token类 * 作者:

  • Python快速实现一个线程池的示例代码

    目录 楔子 Future 对象 提交函数自动创建 Future 对象 future.set_result 到底干了什么事情 提交多个函数 使用 map 来提交多个函数 按照顺序等待执行 取消一个函数的执行 函数执行时出现异常 等待所有函数执行完毕 小结 楔子 当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程.但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置. 比如我们实现一个有 10

随机推荐