解决线程池中ThreadGroup的坑

目录
  • 线程池中ThreadGroup的坑
    • ThreadGroup是否可行
    • Executors内部类DefaultThreadFactory
  • ThreadGroup的使用及手写线程池
    • 监听线程异常关闭
    • 如何拿到Thread线程中异常
    • ThreadGroup
    • 线程池使用

线程池中ThreadGroup的坑

在Java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员。简单地说,线程组(ThreadGroup)就是由线程组成的管理线程的类,这个类是java.lang.ThreadGroup类。

定义一个线程组,通过以下代码可以实现。

ThreadGroup group=new ThreadGroup(“groupName”);
Thread thread=new Thread(group,”the first thread of group”);

ThreadGroup类中的某些方法,可以对线程组中的线程产生作用。例如,setMaxPriority()方法可以设定线程组中的所有线程拥有最大的优先权。

所有线程都隶属于一个线程组。那可以是一个默认线程组(不指定group),亦可是一个创建线程时明确指定的组。在创建之初,线程被限制到一个组里,而且不能改变到一个不同的组。每个应用都至少有一个线程从属于系统线程组。若创建多个线程而不指定一个组,它们就会自动归属于系统线程组。

线程组也必须从属于其他线程组。必须在构建器里指定新线程组从属于哪个线程组。若在创建一个线程组的时候没有指定它的归属,则同样会自动成为系统线程组的一名属下。因此,一个应用程序中的所有线程组最终都会将系统线程组作为自己的“父”。

那么假如我们需要在线程池中实现一个带自定义ThreadGroup的线程分组,该怎么实现呢?

我们在给线程池(ThreadPoolExecutor)提交任务的时候可以通过execute(Runnable command)来将一个线程任务加入到该线程池,那么我们是否可以通过new一个指定了ThreadGroup的Thread实例来加入线程池来达到前面说到的目的呢?

ThreadGroup是否可行

通过new Thread(threadGroup,runnable)实现线程池中任务分组

public static void main(String[] args) {
        ThreadPoolExecutor pool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
        final ThreadGroup group = new ThreadGroup("Main_Test_Group");
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(group, new Runnable() {
                @Override
                public void run() {
                    int sleep = (int)(Math.random() * 10);
                    try {
                        Thread.sleep(1000 * 3);
                        System.out.println(Thread.currentThread().getName()+"执行完毕");
                        System.out.println("当前线程组中的运行线程数"+group.activeCount());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, group.getName()+" #"+i+"");
            pool.execute(thread);
        }
    }

运行结果

pool-1-thread-3执行完毕
pool-1-thread-1执行完毕
当前线程组中的运行线程数0
pool-1-thread-2执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0
pool-1-thread-4执行完毕
pool-1-thread-5执行完毕
当前线程组中的运行线程数0
当前线程组中的运行线程数0

运行结果中可以看到group中的线程并没有因为线程池启动了这个线程任务而运行起来.因此通过线程组来对线程池中的线层任务分组不可行.

从java.util.concurrent.ThreadPoolExecutor源码中可以看到如下构造函数:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

如果我们在实例化ThreadPoolExecutor时不指定ThreadFactory,那么将以默认的ThreadFactory来创建Thread.

Executors内部类DefaultThreadFactory

下面的源码即是默认的Thread工厂

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

从唯一的构造函数可以看到DefaultThreadFactory以SecurityManager 实例中的ThreadGroup来指定线程的group,如果SecurityManager 获取到的ThreadGroup为null才默认以当前线程的group来指定.public Thread newThread(Runnable r) 则以group来new 一个Thead.这样我们可以在实例化ThreadPoolExecutor对象的时候在其构造函数内传入自定义的ThreadFactory实例即可达到目的.

public class MyTheadFactory implements ThreadFactory {
    private static final AtomicInteger poolNumber = new AtomicInteger(1);
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;
    private ThreadGroup defaultGroup;
    public MyTheadFactory() {
        SecurityManager s = System.getSecurityManager();
        defaultGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }
    public MyTheadFactory(ThreadGroup group) {
       this.defaultGroup = group;
        namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-";
    }
    public Thread newThread(Runnable r) {
        Thread t = new Thread(defaultGroup, null, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

ThreadGroup的使用及手写线程池

监听线程异常关闭

以下代码在window下不方便测试,需在linux 上 测试

// 以下线程如果强制关闭的话,是无法打印`线程被杀掉了`
// 模拟关闭 kill PID
public static void main(String[] args)  {
        Runtime.getRuntime().addShutdownHook(new Thread( () -> {
            System.out.println("线程被杀掉了");
        }));
        while(true){
            System.out.println("i am working ...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

如何拿到Thread线程中异常

public static void main(String[] args) {
        Thread thread = new Thread(() -> {
            try {
                Thread.sleep(1000);
                int i = 10/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        thread.setUncaughtExceptionHandler((t,e)->{
            System.out.println("线程的名字"+ t.getName());
            System.out.println(e);
        });  // 通过注入接口的方式
        thread.start();
    }

ThreadGroup

注意: threadGroup 设置为isDaemon 后,会随最后一个线程结束而销毁,如果没有设置isDaemon ,则需要手动调用 destory()

线程池使用

自己搭建的简单线程池实现

其中ThreadGroup 的应用没有写,但是我们可以观察线程关闭后,检查ThreadGroup 中是否还有活跃的线程等,具体参考ThreadGroup API

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.stream.IntStream;
/**
 * @Author: shengjm
 * @Date: 2020/2/10 9:52
 * @Description:
 */
public class SimpleThreadPool extends Thread{
    /**
     * 线程数量
     */
    private int size;
    private final int queueSize;
    /**
     * 默认线程队列数量
     */
    private final static int DEFAULR_TASK_QUEUE_SIZE = 2000;
    private static volatile int seq = 0;
    private final static String THREAD_PREFIX = "SIMPLE_THREAD_POLL_";
    private final static ThreadGroup GROUP = new ThreadGroup("Pool_Group");
    private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();
    private final static List<WorkerTask> THREAD_QUEUE = new ArrayList<>();
    private final DiscardPolicy discardPolicy;
    private volatile boolean destory = false;
    private int min;
    private int max;
    private int active;
    /**
     * 定义异常策略的实现
     */
    private final static DiscardPolicy DEFAULT_DISCARD_POLICY = () -> {
        throw new DiscardException("线程池已经被撑爆了,后继多余的人将丢失");
    };
    /**
     *
     */
    public SimpleThreadPool(){
        this(4,8,12,DEFAULR_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);
    }
    /**
     *
     */
    public SimpleThreadPool(int min , int active , int max , int queueSize,DiscardPolicy discardPolicy) {
        this.min = min;
        this.active = active;
        this.max = max;
        this.queueSize = queueSize;
        this.discardPolicy = discardPolicy;
        init();
    }
 /**
  * 初始化
  */
    private void init() {
        for(int i = 0; i < min; i++){
            createWorkTask();
        }
        this.size = min;
        this.start();
    }
    private void createWorkTask(){
        WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));
        task.start();
        THREAD_QUEUE.add(task);
    }
 /**
  * 线程池自动扩充
  */
    @Override
    public void run() {
        while(!destory){
            System.out.println(this.min +" --- "+this.active+" --- "+this.max + " --- "+ this.size + " --- "+  TASK_QUEUE.size());
            try {
                Thread.sleep(1000);
                if(TASK_QUEUE.size() > active && size < active){
                    for (int i = size; i < active;i++){
                        createWorkTask();
                    }
                    size = active;
                }else if(TASK_QUEUE.size() > max && size < max){
                    for (int i = size; i < max;i++){
                        createWorkTask();
                    }
                    size = max;
                }
                synchronized (THREAD_QUEUE){
                    if(TASK_QUEUE.isEmpty() && size > active){
                        int release = size - active;
                        for (Iterator<WorkerTask> it = THREAD_QUEUE.iterator();it.hasNext();){
                            if(release <=0){
                                break;
                            }
                            WorkerTask task = it.next();
                            task.close();
                            task.interrupt();
                            it.remove();
                            release--;
                        }
                        size = active;
                    }
                }
            } catch (InterruptedException e) {
                break;
            }
        }
    }
    public void submit(Runnable runnable){
        synchronized (TASK_QUEUE){
            if(destory){
                throw new DiscardException("线程池已经被摧毁了...");
            }
            if(TASK_QUEUE.size() > queueSize){
                discardPolicy.discard();
            }
            TASK_QUEUE.addLast(runnable);
            TASK_QUEUE.notifyAll();
        }
    }
 /**
  * 关闭
  */
    public void shutdown(){
        while(!TASK_QUEUE.isEmpty()){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        synchronized (THREAD_QUEUE) {
            int initVal = THREAD_QUEUE.size();
            while (initVal > 0) {
                for (WorkerTask workerTask : THREAD_QUEUE) {
                    if (workerTask.getTaskState() == TaskState.BLOCKED) {
                        workerTask.interrupt();
                        workerTask.close();
                        initVal--;
                    } else {
                        try {
                            Thread.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
            this.destory = true;
        }
    }
    public int getSize() {
        return size;
    }
    public int getMin() {
        return min;
    }
    public int getMax() {
        return max;
    }
    public int getActive() {
        return active;
    }
    /**
     * 线程状态
     */
    private enum TaskState{
        FREE , RUNNING , BLOCKED , DEAD
    }
    /**
     * 自定义异常类
     */
    public static class DiscardException extends RuntimeException{
        public DiscardException(String message){
            super(message);
        }
    }
    /**
     * 定义异常策略
     */
    @FunctionalInterface
    public interface DiscardPolicy{
        void discard() throws DiscardException;
    }
    private static class WorkerTask extends Thread{
        private volatile TaskState taskState = TaskState.FREE;
        public TaskState getTaskState(){
            return this.taskState;
        }
        public WorkerTask(ThreadGroup group , String name){
            super(group , name);
        }
        @Override
        public void run(){
            OUTER:
            while(this.taskState != TaskState.DEAD){
                Runnable runnable;
                synchronized (TASK_QUEUE){
                    while(TASK_QUEUE.isEmpty()){
                        try {
                            taskState = TaskState.BLOCKED;
                            TASK_QUEUE.wait();
                        } catch (InterruptedException e) {
                            break OUTER;
                        }
                    }
                    runnable = TASK_QUEUE.removeFirst();
                }
                if(runnable != null){
                    taskState = TaskState.RUNNING;
                    runnable.run();
                    taskState = TaskState.FREE;
                }
            }
        }
        public void close(){
            this.taskState = TaskState.DEAD;
        }
    }
    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        SimpleThreadPool simpleThreadPool = new SimpleThreadPool();
//        SimpleThreadPool simpleThreadPool = new SimpleThreadPool(6,15,SimpleThreadPool.DEFAULT_DISCARD_POLICY);
        IntStream.rangeClosed(0,40).forEach(i -> {
            simpleThreadPool.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("the runnable " + i + "be servered by " + Thread.currentThread());
            });
        });
//        try {
//            Thread.sleep(15000);
//        } catch (InterruptedException e) {
//            e.printStackTrace();
//        }
        simpleThreadPool.shutdown();
    }
}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 解决python ThreadPoolExecutor 线程池中的异常捕获问题

    问题 最近写了涉及线程池及线程的 python 脚本,运行过程中发现一个有趣的现象,线程池中的工作线程出现问题,引发了异常,但是主线程没有捕获异常,还在发现 BUG 之前一度以为线程池代码正常返回. 先说重点 这里主要想介绍 python concurrent.futuresthread.ThreadPoolExecutor 线程池中的 worker 引发异常的时候,并不会直接向上抛起异常,而是需要主线程通过调用concurrent.futures.Future.exception(timeou

  • Java多线程之线程池七个参数详解

    ThreadPoolExecutor是JDK中的线程池实现,这个类实现了一个线程池需要的各个方法,它提供了任务提交.线程管理.监控等方法. 下面是ThreadPoolExecutor类的构造方法源码,其他创建线程池的方法最终都会导向这个构造方法,共有7个参数:corePoolSize.maximumPoolSize.keepAliveTime.unit.workQueue.threadFactory.handler. public ThreadPoolExecutor(int corePoolS

  • 一篇文章带你了解Java中ThreadPool线程池

    目录 ThreadPool 线程池的优势 线程池的特点 1 线程池的方法 (1) newFixedThreadPool (2) newSingleThreadExecutor (3) newScheduledThreadPool (4) newCachedThreadPool 2 线程池底层原理 3 线程池策略及分析 拒绝策略 如何设置maximumPoolSize大小 ThreadPool 线程池的优势 线程池做的工作主要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些

  • Java线程池的几种实现方法及常见问题解答

    工作中,经常会涉及到线程.比如有些任务,经常会交与线程去异步执行.抑或服务端程序为每个请求单独建立一个线程处理任务.线程之外的,比如我们用的数据库连接.这些创建销毁或者打开关闭的操作,非常影响系统性能.所以,"池"的用处就凸显出来了. 1. 为什么要使用线程池 在3.6.1节介绍的实现方式中,对每个客户都分配一个新的工作线程.当工作线程与客户通信结束,这个线程就被销毁.这种实现方式有以下不足之处: •服务器创建和销毁工作的开销( 包括所花费的时间和系统资源 )很大.这一项不用解释,可以

  • 解决线程池中ThreadGroup的坑

    目录 线程池中ThreadGroup的坑 ThreadGroup是否可行 Executors内部类DefaultThreadFactory ThreadGroup的使用及手写线程池 监听线程异常关闭 如何拿到Thread线程中异常 ThreadGroup 线程池使用 线程池中ThreadGroup的坑 在Java中每一个线程都归属于某个线程组管理的一员,例如在主函数main()主工作流程中产生一个线程,则产生的线程属于main这个线程组管理的一员.简单地说,线程组(ThreadGroup)就是由

  • C#线程处理系列之线程池中的I/O线程

    一.I/O线程实现对文件的异步  1.1  I/O线程介绍: 对于线程所执行的任务来说,可以把线程分为两种类型:工作者线程和I/O线程. 工作者线程用来完成一些计算的任务,在任务执行的过程中,需要CPU不间断地处理,所以,在工作者线程的执行过程中,CPU和线程的资源是充分利用的. I/O线程主要用来完成输入和输出的工作的,在这种情况下, 计算机需要I/O设备完成输入和输出的任务,在处理过程中,CPU是不需要参与处理过程的,此时正在运行的线程将处于等待状态,只有等任务完成后才会有事可做, 这样就造

  • 线程池中使用spring aop事务增强

    这篇文章主要介绍了线程池中使用spring aop事务增强,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 问题描述: 在项目里使用了线程池运行同一个类的实例方法,代码大致如下,运行时发现job方法的事务不生效 @Transactional public void doJob() { EXECOTOR.execute(() ->job()); } @Transactional public void job(){ //db operation }

  • Java线程池中的各个参数如何合理设置

    一.前言 在开发过程中,好多场景要用到线程池.每次都是自己根据业务场景来设置线程池中的各个参数. 这两天又有需求碰到了,索性总结一下方便以后再遇到可以直接看着用. 虽说根据业务场景来设置各个参数的值,但有些万变不离其宗,掌握它的原理对如何用好线程池起了至关重要的作用. 那我们接下来就来进行线程池的分析. 二.ThreadPoolExecutor的重要参数 我们先来看下ThreadPoolExecutor的带的那些重要参数的构造器. public ThreadPoolExecutor(int co

  • java ThreadPoolExecutor线程池拒绝策略避坑

    目录 1.场景 2. 原因分析 3.总结 4.思考 1.场景 线程池使用DiscardOldestPolicy拒绝策略,阻塞队列使用ArrayBlockingQueue,发现在某些情形下对于得到的Future,调用get()方法当前线程会一直阻塞. 为了便于理解,将实际情景抽象为下面的代码: ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 1, TimeUnit.SECONDS, new ArrayBlo

  • Java线程池中多余的线程是如何回收的

    最近阅读了JDK线程池ThreadPoolExecutor的源码,对线程池执行任务的流程有了大体了解,实际上这个流程也十分通俗易懂,就不再赘述了,别人写的比我好多了. 不过,我倒是对线程池是如何回收工作线程比较感兴趣,所以简单分析了一下,加深对线程池的理解吧. 那么,就以JDK1.8为例分析吧. 1.runWorker(Worker w) 工作线程启动后,就进入runWorker(Worker w)方法. 里面是一个while循环,循环判断任务是否为空,若不为空,执行任务:若取不到任务,或发生异

  • 线程池中execute与submit的区别说明

    目录 线程池execute与submit区别 execute submit 例1 例2 线程池submit和execute方法原理 线程池的作用 线程池的原理 线程池execute与submit区别 在使用线程池的时候,看到execute()与submit()方法.都可以使用线程池执行一个任务,但是两者有什么区别呢? execute void execute(Runnable command); submit <T> Future<T> submit(Callable<T&g

  • Java如何自定义线程池中队列

    目录 背景 问题分析 问题解决 总结 两个队列的UML关系图 SynchronousQueue的定义 ArrayBlockingQueue的定义 分析 jdk源码中关于线程池队列的说明 背景 业务交互的过程中涉及到了很多关于SFTP下载的问题,因此在代码中定义了一些线程池,使用中发现了一些问题, 代码类似如下所示: public class ExecutorTest { private static ExecutorService es = new ThreadPoolExecutor(2, 1

  • java线程池中线程数量到底是几

    目录 线程池配置 线程池里的业务线程数量小于最小数量(5) 第一个请求 第二个请求 第三个请求 第五个请求 小于阻塞队列容量(10) 第六个请求 第七个请求 第15个请求 小于最大数量(20) 第16个请求 第35个请求 拒绝策略 第36个请求 复用线程 线程池配置 线程池配置,假设是: 1.最小数量是5 2.阻塞队列容量是10 3.最大数量是20 线程池里的业务线程数量小于最小数量(5) 第一个请求 第一个请求进来的时候,这个时候,线程池没有线程,就创建新的工作线程(即Worker线程). 然

随机推荐