Java实现手写一个线程池的示例代码

目录
  • 概述
  • 线程池框架设计
  • 代码实现
    • 阻塞队列的实现
    • 线程池消费端实现
    • 获取任务超时设计
    • 拒绝策略设计

概述

线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记。

线程池框架设计

我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的。同时,线程也不是任意多创建的,因为活跃的线程会消耗系统资源,特别是内存,在一定的范围内,增加线程可以提高系统的吞吐率,如果超过了这个范围,反而会降低程序的执行速度。

因此,设计一个容纳多个线程的容器,容器中的线程可以重复使用,省去了频繁创建和销毁线程对象的操作, 达到下面的目标:

  • 降低资源消耗,减少了创建和销毁线程的次数,每个工作线程都可以被重复利用,可执行多个任务
  • 提高响应速度,当任务到达时,如果有线程可以直接用,不会出现系统僵死
  • 提高线程的可管理性,如果无限制的创建线程,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控

线程池的核心思想: 线程复用,同一个线程可以被重复使用,来处理多个任务。

为了实现线程池功能,需要考虑下面几个设计要点:

  • 线程池可以接口外部提交的任务执行
  • 线程池有工作线程的数量,有任务执行,没有任务也空闲在那,等待任务过来,这样既避免线程频繁创建销毁带来的开销,同时也可以避免线程池无限制的创建线程
  • 如果线程池接受提交的任务超过工作线程的数量了,该怎么办?可以用一个队列把任务存下来,等工作线程完成任务后去队列中获取任务,执行
  • 那如果任务实在是太多太多了,达到了我们认为的队列最大值,怎么办,我们可以设计一种任务太多的策略,可以进行切换,比如直接丢弃任务、报错等等

看了上面的设计目标和要点,是不是能立刻想到一个非常经典的设计模型——生产者消费者模型。

  • 阻塞队列存储执行任务,比如外部main函数作为生产者向队列生产任务。
  • 线程池中的工作线程作为消费者获取任务执行。

现在我们将我们的设计思路转换为代码。

代码实现

阻塞队列的实现

  • 阻塞队列主要存放任务,有容量限制
  • 阻塞队列提供添加和删除任务的API, 如果超过容量,阻塞不能添加任务,如果没有任务,阻塞无法获取任务。
/**
 * <p>自定义任务队列, 用来存放任务 </p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  10:15
 * @version: 1.0.0
 */
@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    // 容量
    private int capcity;
    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 阻塞的方式添加任务
    public void put(T task) {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.size() >= capcity) {
                log.debug("wait to add queue");
                try {
                    fullWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            deque.offer(task);
            log.debug("task add successfully");
            emptyWaitSet.signal();
        }  finally {
            lock.unlock();
        }
    }

    // 阻塞获取任务
    public T take() {
        lock.lock();
        try {
            // 通过while的方式
            while (deque.isEmpty()) {
                try {
                    log.debug("wait to take task");
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            T task = deque.poll();
            log.debug("take task successfully");
            // 从队列中获取元素
            return task;
        } finally {
            lock.unlock();
        }
    }
}
  • put()方法是向阻塞队列中添加任务
  • take()方法是向阻塞队列中获取任务

线程池消费端实现

1.定义执行器接口

/**
 * <p>定义一个执行器的接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  12:31
 * @version: 1.0.0
 */
public interface Executor {

    /**
     * 提交任务执行
     * @param task 任务
     */
    void execute(Runnable task);
}

2.定义线程池类实现该接口

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {

    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> taskQueue;

    /**
     * 核心工作线程数
     */
    private int coreSize;

    /**
     * 工作线程集合
     */
    private Set<Worker> workers = new HashSet<>();

    /**
     *  创建线程池
     * @param coreSize 工作线程数量
     * @param capcity 阻塞队列容量
     */
    public ThreadPool(int coreSize, int capcity) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
    }

    /**
     * 提交任务执行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程数小于阈值,直接开始任务执行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超过了阈值,加入到队列中
                taskQueue.put(task);
            }
        }
    }

    /**
     * 工作线程,对执行的任务做了一层包装处理
     */
    class Worker extends Thread {
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
        }

        @Override
        public void run() {
            // 如果任务不为空,或者可以从队列中获取任务
            while (task != null || (task = taskQueue.take()) != null) {
                try {
                    task.run();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    // 执行完后,设置任务为空
                    task = null;
                }
            }

              // 移除工作线程
            synchronized (workers){
                log.debug("remove worker successfully");
                workers.remove(this);
            }
        }
    }
}
  • Worker类是工作线程类,包装了执行任务,里面实现了从队列获取任务,然后执行任务。
  • execute方法的实现中,如果工作线程数量小于阈值的话,直接创建新的工作线程,否则将任务添加到队列中。

3.演示

@Test
    public void testThreadPool1() throws InterruptedException {
        Executor executor = new ThreadPool(2, 4);
        // 提交任务
        for (int i = 0; i < 6; i++) {
            final  int j = i;
            executor.execute(() -> {
                try {
                    Thread.sleep(10);
                    log.info("run task {}", j);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            Thread.sleep(10);
        }

        Thread.sleep(10000);
    }

运行结果:

获取任务超时设计

目前从队列中获取任务是永久阻塞等待的,可以改成阻塞一段时间没有获取任务,丢弃的策略。

@Slf4j(topic = "c.TimeoutBlockingQueue")
public class TimeoutBlockingQueue<T> {
    // 容量
    private int capcity;
    // 双端任务队列容器
    private Deque<T> deque = new ArrayDeque<>();
    // 重入锁
    private ReentrantLock lock = new ReentrantLock();
    // 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 生产者条件变量
    private Condition emptyWaitSet = lock.newCondition();

    public TimeoutBlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    // 带超时时间的获取
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.isEmpty()){
                try {
                    if (nanos<=0){
                        return null;
                    }
                    // 返回的是剩余的等待时间,更改navos的值,使虚假唤醒的时候可以继续等待
                    nanos = emptyWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            fullWaitSet.signal();
            return deque.getFirst();
        }finally {
            lock.unlock();
        }
    }

    // 带超时时间的增加
    public boolean offer(T task , long timeout , TimeUnit unit){
        lock.lock();
        try{
            // 将 timeout 统一转换为 纳秒
            long nanos = unit.toNanos(timeout);
            while (deque.size() == capcity){
                try {
                    if (nanos<=0){
                        return false;
                    }
                    // 更新剩余需要等待的时间
                    nanos = fullWaitSet.awaitNanos(nanos);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            log.debug("加入任务队列 {}", task);
            deque.addLast(task);
            emptyWaitSet.signal();
            return true;
        }finally {
            lock.unlock();
        }
    }
}

新加TimeoutBlockingQueue类,添加offer和poll待超时的添加和获取任务的方法。

拒绝策略设计

目前的实现还是有个漏洞,无法自定义任务超出阈值的一个拒绝策略,我们可以通过利用函数式编程+策略模式去实现。

1.定义策略模式的函数式接口

/**
 * <p>拒绝策略的函数式接口:</p>
 *
 * @author: cxw (332059317@qq.com)
 * @date: 2022/10/18  13:15
 * @version: 1.0.0
 */
@FunctionalInterface
public interface RejectPolicy<T> {

    /**
     * 拒绝策略的接口
     * @param queue
     * @param task
     */
    void reject(BlockingQueue<T> queue, T task);
}

2.添加函数式接口的调用入口

我们可以在阻塞队列添加任务新加一个api, 添加任务如果超过容量,调用函数式接口。

@Slf4j(topic = "c.BlockingQueue")
public class BlockingQueue<T> {
    ........

    /**
     * 尝试添加任务
     * @param rejectPolicy
     * @param task
     */
    public void tryPut(RejectPolicy<T> rejectPolicy, T task) {
        lock.lock();
        try{
            // 如果队列超过容量
            if (deque.size()> capcity){
                log.debug("task too much, do reject");
                rejectPolicy.reject(this, task);
            }else {
                deque.offer(task);
                emptyWaitSet.signal();
            }
        }finally {
            lock.unlock();
        }
    }
}

3.修改ThreadPool类

@Slf4j(topic = "c.ThreadPool")
public class ThreadPool implements Executor {
    .....

    /**
     * 拒绝策略
     */
    private RejectPolicy rejectPolicy;

    // 通过构造方法传入执行的拒绝策略
    public ThreadPool(int coreSize, int capcity, RejectPolicy rejectPolicy) {
        this.coreSize = coreSize;
        this.taskQueue = new BlockingQueue<>(capcity);
        this.rejectPolicy = rejectPolicy;
    }

    /**
     * 提交任务执行
     */
    @Override
    public void execute(Runnable task) {
        synchronized (workers) {
            // 如果工作线程数小于阈值,直接开始任务执行
            if(workers.size() < coreSize) {
                Worker worker = new Worker(task);
                workers.add(worker);
                worker.start();
            } else {
                // 如果超过了阈值,加入到队列中
                //taskQueue.put(task);

                // 调用tryPut的方式
                taskQueue.tryPut(rejectPolicy, task);
            }
        }
    }

   ....
}

通过构造方法的方式传入要执行的拒绝策略

调用tryPut方法添加任务

4.演示

以上就是Java实现手写一个线程池的示例代码的详细内容,更多关于Java线程池的资料请关注我们其它相关文章!

(0)

相关推荐

  • 学生视角手把手带你写Java 线程池

    目录 Java手写线程池(第一代) 手写线程池-定义参数 手写线程池-构造器 手写线程池-默认构造器 手写线程池-execute方法 手写线程池-处理任务 手写线程池-优雅关闭线程池 手写线程池-暴力关闭线程池 手写线程池-源代码 问题 Java手写线程池(第一代) 经常使用线程池,故今天突发奇想,手写一个线程池,会有很多不足,请多多宽容.因为这也是第一代的版本,后续会更完善. 手写线程池-定义参数 private final AtomicInteger taskcount=new Atomic

  • 学生视角手把手带你写Java 线程池改良版

    目录 Java手写线程池(第二代) 第二代线程池的优化 线程池构造器 线程池拒绝策略 execute方法 手写线程池源码 MyExecutorService MyRejectedExecutionException MyRejectedExecutionHandle 核心类MyThreadPoolExecutor 线程池测试类 Java手写线程池(第二代) 第二代线程池的优化 1:新增了4种拒绝策略.分别为:MyAbortPolicy.MyDiscardPolicy.MyDiscardOldes

  • Java实现手写乞丐版线程池的示例代码

    目录 前言 线程池的具体实现 线程池实现思路 线程池实现代码 线程池测试代码 杂谈 总结 前言 在上篇文章线程池的前世今生当中我们介绍了实现线程池的原理,在这篇文章当中我们主要介绍实现一个非常简易版的线程池,深入的去理解其中的原理,麻雀虽小,五脏俱全. 线程池的具体实现 线程池实现思路 任务保存到哪里? 在上篇文章线程池的前世今生当中我们具体去介绍了线程池当中的原理.在线程池当中我们有很多个线程不断的从任务池(用户在使用线程池的时候不断的使用execute方法将任务添加到线程池当中)里面去拿任务

  • Java手写线程池的实现方法

    本文实例为大家分享了Java手写线程池的实现代码,供大家参考,具体内容如下 1.线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务.线程池线程都是后台线程. 2.线程池简易架构 3.简易线程池代码(自行优化) import java.util.List; /** * 线程接口 * * @Author yjian * @Date 14:49 2017/10/14 **/ public interface IThreadPool { //加入任务 void ex

  • Java实现手写线程池的示例代码

    目录 前言 线程池给我们提供的功能 工具介绍 Worker设计 线程池设计 总结 前言 在我们的日常的编程当中,并发是始终离不开的主题,而在并发多线程当中,线程池又是一个不可规避的问题.多线程可以提高我们并发程序的效率,可以让我们不去频繁的申请和释放线程,这是一个很大的花销,而在线程池当中就不需要去频繁的申请线程,他的主要原理是申请完线程之后并不中断,而是不断的去队列当中领取任务,然后执行,反复这样的操作.在本篇文章当中我们主要是介绍线程池的原理,因此我们会自己写一个非常非常简单的线程池,主要帮

  • java编写属于自己的线程池

    什么是线程池 线程池就是以一个或多个线程[循环执行]多个应用逻辑的线程集合. 一般而言,线程池有以下几个部分: 完成主要任务的一个或多个线程. 用于调度管理的管理线程. 要求执行的任务队列. 线程池的作用: 线程池作用就是限制系统中执行线程的数量. 根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果:少了浪费了系统资源,多了造成系统拥挤效率不高.用线程池控制线程数量,其他线程排队等候.一个任务执行完毕,再从队列的中取最前面的任务开始执行.若队列中没有等待进程,线程池的这一资源处于

  • java实现手写一个简单版的线程池

    有些人可能对线程池比较陌生,并且更不熟悉线程池的工作原理.所以他们在使用线程的时候,多数情况下都是new Thread来实现多线程.但是,往往良好的多线程设计大多都是使用线程池来实现的. 为什么要使用线程 降低资源的消耗.降低线程创建和销毁的资源消耗.提高响应速度:线程的创建时间为T1,执行时间T2,销毁时间T3,免去T1和T3的时间提高线程的可管理性 下图所示为线程池的实现原理:调用方不断向线程池中提交任务:线程池中有一组线程,不断地从队列中取任务,这是一个典型的生产者-消费者模型. 要实现一

  • Java实现手写一个线程池的示例代码

    目录 概述 线程池框架设计 代码实现 阻塞队列的实现 线程池消费端实现 获取任务超时设计 拒绝策略设计 概述 线程池技术想必大家都不陌生把,相信在平时的工作中没有少用,而且这也是面试频率非常高的一个知识点,那么大家知道它的实现原理和细节吗?如果直接去看jdk源码的话,可能有一定的难度,那么我们可以先通过手写一个简单的线程池框架,去掌握线程池的基本原理后,再去看jdk的线程池源码就会相对容易,而且不容易忘记. 线程池框架设计 我们都知道,线程资源的创建和销毁并不是没有代价的,甚至开销是非常高的.同

  • Python快速实现一个线程池的示例代码

    目录 楔子 Future 对象 提交函数自动创建 Future 对象 future.set_result 到底干了什么事情 提交多个函数 使用 map 来提交多个函数 按照顺序等待执行 取消一个函数的执行 函数执行时出现异常 等待所有函数执行完毕 小结 楔子 当有多个 IO 密集型的任务要被处理时,我们自然而然会想到多线程.但如果任务非常多,我们不可能每一个任务都启动一个线程去处理,这个时候最好的办法就是实现一个线程池,至于池子里面的线程数量可以根据业务场景进行设置. 比如我们实现一个有 10

  • C++实现一个简单的线程池的示例代码

    目录 一.设计 二.参数选择 三.类设计 一.设计 线程池应该包括 保存线程的容器,保存任务的容器. 为了能保证避免线程对任务的竞态获取,需要对任务队列进行加锁. 为了使得工作线程感知任务的到来,需要使用条件变量来唤醒工作线程. 任务容器中的任务管理. 任务的处理API. 二.参数选择 使用数组存放线程,链表存放任务. 三.类设计 线程池类 template<typename T> class threadpool { public: threadpool(int thread_num,int

  • C语言实现手写字符串处理工具的示例代码

    目录 头文件 实现文件 头文件 #ifndef STUDY_STR_UTIL_H #define STUDY_STR_UTIL_H #include "../structure/charhashmap.h" #include "../structure/charlist.h" #include "../structure/json.h" #include <malloc.h> #include <stdio.h> #inc

  • C++单例模式实现线程池的示例代码

    C语言单例模式实现线程池. 该代码中,使用了单例模式来创建线程池对象,保证了整个程序中只有一个线程池对象. 线程池中包含了任务队列.工作线程数组.互斥锁.条件变量等成员,通过这些成员来实现任务的提交和执行. 在主函数中,提交了10个任务,每个任务都是一个简单的打印数字的函数,最后等待所有任务执行完毕后销毁线程池. #include <stdio.h> #include <stdlib.h> #include <pthread.h> #define THREAD_POOL

  • 用ES6的class模仿Vue写一个双向绑定的示例代码

    本文介绍了用ES6的class模仿Vue写一个双向绑定的示例代码,分享给大家,具体如下: 最终效果如下: 构造器(constructor) 构造一个TinyVue对象,包含基本的el,data,methods class TinyVue{ constructor({el, data, methods}){ this.$data = data this.$el = document.querySelector(el) this.$methods = methods // 初始化 this._com

  • JS手写一个自定义Promise操作示例

    本文实例讲述了JS手写一个自定义Promise操作.分享给大家供大家参考,具体如下: 经常在面试题中会看到,让你实现一个Promsie,或者问你实现Promise的原理,所以今天就尝试利用class类的形式来实现一个Promise 为了不与原生的Promise命名冲突,这里就简单命名为MyPromise. class MyPromise { constructor(executor) { let _this = this this.state = 'pending' // 当前状态 this.v

随机推荐