springboot线程池监控的简单实现

目录
  • 背景
  • 代码
    • 代码类结构
    • 线程池扩展类
    • 线程工具类
    • 线程bean类
    • 线程池实现类
    • 线程池监控接口类
    • 运行结果

背景

  • 在我们实际项目开发中,常常会为不同的优先级的任务设置相对应的线程池。
  • 一般我们只关注相关池的相关参数如核心线程数据,最大线程数据等等参数,容易忽略了对线程池中实际运行情况的监控。
  • 综上所述:线程池如果相当于黑盒一样在运行的话,对系统的不利的。本文提供了一种简单获取线程池运行状态的方式,可以将详情打印到日志或者对接到Prometheus上进行展示。
  • 详细有不少博主给出了动态修改线程的方式,但是由于生产环境是禁止,因此本文只提供了监控的功能。
  • 本代码应用项目架构为springboot。

代码

代码类结构

  • ThreadPoolMonitor:线程池扩展类
  • ThreadPoolUtil:线程池工具类
  • ThreadPoolDetailInfo:bean类
  • ExecutorThreadPoolManager:线程池实现类
  • ThreadPoolController:线程池测试方法

线程池扩展类

  • 从类主要重写了ThreadPoolExecutor类中的shutdown/shutdownNow/beforeExecute/afterExecute,用于对每个任务进行执行前后的拦截,计算出每个任务的运行时间。
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
 * @ClassName ThreadPoolMonitor
 * @authors kantlin
 * @Date 2021/12/16 17:45
 **/
public class ThreadPoolMonitor extends ThreadPoolExecutor {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);
    private final ConcurrentHashMap<String, Date> startTimes;
    private final String poolName;
    private long totalDiff;

    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.startTimes = new ConcurrentHashMap();
        this.poolName = poolName;
    }

    @Override
    public void shutdown() {
        LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()});
        super.shutdown();
    }
    @Override
    public List<Runnable> shutdownNow() {
        LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()});
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        this.startTimes.put(String.valueOf(r.hashCode()), new Date());
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = this.startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        this.totalDiff += diff;
    }

    public long getTotalDiff() {
        return this.totalDiff;
    }
}

线程工具类

import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName ThreadPoolUtil
 * @authors kantlin
 * @Date 2021/12/16 17:45
 **/

@Component
public class ThreadPoolUtil {
    private final HashMap<String, ThreadPoolMonitor> threadPoolExecutorHashMap = new HashMap();

    public ThreadPoolUtil() {
    }

    public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,String poolName) {
        ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, poolName);
        this.threadPoolExecutorHashMap.put(poolName, threadPoolExecutor);
        return threadPoolExecutor;
    }

    public HashMap<String, ThreadPoolMonitor> getThreadPoolExecutorHashMap() {
        return this.threadPoolExecutorHashMap;
    }

线程bean类

import lombok.Data;

@Data
public class ThreadPoolDetailInfo {
    //线程池名字
    private String threadPoolName;
    //当前线程池大小
    private Integer poolSize;
    //线程池核心线程数量
    private Integer corePoolSize;
    //线程池生命周期中最大线程数量
    private Integer largestPoolSize;
    //线程池中允许的最大线程数
    private Integer maximumPoolSize;
    //线程池完成的任务数目
    private long completedTaskCount;
    //线程池中当前活跃个数
    private Integer active;
    //线程池完成的任务个数
    private long task;
    //线程最大空闲时间
    private long keepAliveTime;
    //当前活跃线程的占比
    private int activePercent;
    //任务队列容量(阻塞队列)
    private Integer queueCapacity;
    //当前队列中任务的数量
    private Integer queueSize;
    //线程池中任务平均执行时长
    private long avgExecuteTime;

    public ThreadPoolDetailInfo(String threadPoolName, Integer poolSize, Integer corePoolSize, Integer largestPoolSize, Integer maximumPoolSize, long completedTaskCount, Integer active, long task, long keepAliveTime, int activePercent, Integer queueCapacity, Integer queueSize, long avgExecuteTime) {
        this.threadPoolName = threadPoolName;
        this.poolSize = poolSize;
        this.corePoolSize = corePoolSize;
        this.largestPoolSize = largestPoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.completedTaskCount = completedTaskCount;
        this.active = active;
        this.task = task;
        this.keepAliveTime = keepAliveTime;
        this.activePercent = activePercent;
        this.queueCapacity = queueCapacity;
        this.queueSize = queueSize;
        this.avgExecuteTime = avgExecuteTime;
    }
}

线程池实现类

  • 在我的项目中,将线程池依次划分为high、normal、low、single四种线程池类型。不同优先级的任务将会被submit到不同的线程池中执行。
  • 在业务中有判断线程池中queue的长度来决定是否投递任务,由于没有相应的拒绝策略,所以队列不设置长度。
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.*.newThread.ThreadPoolUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component
public class ExecutorThreadPoolManager {

    @Autowired
    private ThreadPoolUtil threadPoolUtil;

    @Value("${thread_pool_normal_level_thread_max_num}")
    private Integer normalLevelThreadPoolThreadMaxNum;
    @Value("${thread_pool_normal_level_core_thread_num}")
    private Integer normalLevelThreadPoolCoreThreadNum;
    @Value("${thread_pool_low_level_thread_max_num}")
    private Integer lowLevelThreadPoolThreadMaxNum;
    @Value("${thread_pool_low_level_core_thread_num}")
    private Integer lowLevelThreadPoolCoreThreadNum;

    private ThreadPoolExecutor normalThreadPoolExecutor;

    private ThreadPoolExecutor highPriorityExecutor;

    private ThreadPoolExecutor lowPriorityExecutor;

    private ThreadPoolExecutor singleThreadPoolExecutor;

    @PostConstruct
    public void initExecutor() {
        ThreadFactory normalThreadFactory = new ThreadFactoryBuilder().setNameFormat("normal_task_thread_%d").build();
        normalThreadPoolExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), normalThreadFactory,"normal_level_thread_pool");

        ThreadFactory highPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("high_priority_level_task_thread_%d").build();
        highPriorityExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), highPriorityThreadFactory,"high_level_thread_pool");

        ThreadFactory lowPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("low_priority_level_task_thread_%d").build();
        lowPriorityExecutor = threadPoolUtil.creatThreadPool(lowLevelThreadPoolCoreThreadNum, lowLevelThreadPoolThreadMaxNum, 0L,
                TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), lowPriorityThreadFactory,"low_level_thread_pool");

        ThreadFactory singleFactory = new ThreadFactoryBuilder().setNameFormat("single_task_thread_%d").build();
        singleThreadPoolExecutor =threadPoolUtil.creatThreadPool(1, 1,
                0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), singleFactory,"single_level_thread_pool");
    }

    /**
     * @author kantlin
     * @date 2021/9/9
     * @describe 定义三种线程池, 一般采集类的用低优, 正常业务的用中优, 用户手动请求API的用高优线程池
     **/
    public ThreadPoolExecutor getNormalThreadPoolExecutor() {
        return normalThreadPoolExecutor;
    }

    public ThreadPoolExecutor getHighPriorityExecutor() {
        return highPriorityExecutor;
    }

    public ThreadPoolExecutor getLowPriorityExecutor() {
        return lowPriorityExecutor;
    }

    public ThreadPoolExecutor getSingleThreadPoolExecutor() {
        return singleThreadPoolExecutor;
    }

}

线程池监控接口类

import com.alibaba.fastjson.JSONObject;
import com.*.newThread.ThreadPoolDetailInfo;
import com.*.newThread.ThreadPoolMonitor;
import com.*.newThread.ThreadPoolUtil;
import com.*.thread.ExecutorThreadPoolManager;
import io.swagger.annotations.Api;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.*;
import java.math.BigDecimal;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @ClassName ThreadPoolController
 * @authors kantlin
 * @Date 2021/12/17 14:53
 **/
@Api(description = "线程池监控接口")
@RestController
@RequestMapping(value = "api/threadpool")
public class ThreadPoolController {
    private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolController.class);

    @Autowired
    private ExecutorThreadPoolManager threadPool;

    @Autowired
    private ThreadPoolUtil threadPoolUtil;

    @GetMapping(value = "/getThreadPools")
    private List<String> getThreadPools() {
        List<String> threadPools = new ArrayList();
        if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) {
            Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator();

            while (var2.hasNext()) {
                Map.Entry<String, ThreadPoolMonitor> entry = (Map.Entry) var2.next();
                threadPools.add(entry.getKey());
            }
        }

        return threadPools;
    }

    @GetMapping(value = "/getThreadPoolListInfo")
    @Scheduled(cron = "${thread.poll.status.cron}")
    private List<ThreadPoolDetailInfo> getThreadPoolListInfo() {
        List<ThreadPoolDetailInfo> detailInfoList = new ArrayList();
        if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) {
            Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator();
            while (var2.hasNext()) {
                Map.Entry<String, ThreadPoolMonitor> entry = (Map.Entry) var2.next();
                ThreadPoolDetailInfo threadPoolDetailInfo = this.threadPoolInfo(entry.getValue(), (String) entry.getKey());
                detailInfoList.add(threadPoolDetailInfo);
            }
        }
        LOGGER.info("Execute details of cuurent thread poll:{}", JSONObject.toJSONString(detailInfoList));
        return detailInfoList;
    }

    private ThreadPoolDetailInfo threadPoolInfo(ThreadPoolMonitor threadPool, String threadPoolName) {
        BigDecimal activeCount = new BigDecimal(threadPool.getActiveCount());
        BigDecimal maximumPoolSize = new BigDecimal(threadPool.getMaximumPoolSize());
        BigDecimal result = activeCount.divide(maximumPoolSize, 2, 4);
        NumberFormat numberFormat = NumberFormat.getPercentInstance();
        numberFormat.setMaximumFractionDigits(2);
        int queueCapacity = 0;
        return new ThreadPoolDetailInfo(threadPoolName, threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getLargestPoolSize(), threadPool.getMaximumPoolSize(), threadPool.getCompletedTaskCount(), threadPool.getActiveCount(), threadPool.getTaskCount(), threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS), new Double(result.doubleValue() * 100).intValue(), queueCapacity, threadPool.getQueue().size(), threadPool.getTaskCount() == 0L ? 0L : threadPool.getTotalDiff() / threadPool.getTaskCount());
    }
}

运行结果

  • 上面controller中的方法除了可以通过接口进行暴露外,还设置了定时任务定期的打印到日志中。方便对系统状态进行排查。
[
  {
    "active": 0,
    "activePercent": 0,
    "avgExecuteTime": 0,
    "completedTaskCount": 0,
    "corePoolSize": 20,
    "keepAliveTime": 0,
    "largestPoolSize": 0,
    "maximumPoolSize": 20,
    "poolSize": 0,
    "queueCapacity": 0,
    "queueSize": 0,
    "task": 0,
    "threadPoolName": "high_level_thread_pool"
  },
  {
    "active": 0,
    "activePercent": 0,
    "avgExecuteTime": 0,
    "completedTaskCount": 0,
    "corePoolSize": 33,
    "keepAliveTime": 0,
    "largestPoolSize": 0,
    "maximumPoolSize": 33,
    "poolSize": 0,
    "queueCapacity": 0,
    "queueSize": 0,
    "task": 0,
    "threadPoolName": "low_level_thread_pool"
  },
  {
    "active": 0,
    "activePercent": 0,
    "avgExecuteTime": 371,
    "completedTaskCount": 14,
    "corePoolSize": 20,
    "keepAliveTime": 0,
    "largestPoolSize": 14,
    "maximumPoolSize": 20,
    "poolSize": 14,
    "queueCapacity": 0,
    "queueSize": 0,
    "task": 14,
    "threadPoolName": "normal_level_thread_pool"
  },
  {
    "active": 0,
    "activePercent": 0,
    "avgExecuteTime": 0,
    "completedTaskCount": 0,
    "corePoolSize": 1,
    "keepAliveTime": 0,
    "largestPoolSize": 0,
    "maximumPoolSize": 1,
    "poolSize": 0,
    "queueCapacity": 0,
    "queueSize": 0,
    "task": 0,
    "threadPoolName": "single_level_thread_pool"
  }
]

到此这篇关于springboot线程池监控的简单实现的文章就介绍到这了,更多相关springboot线程池监控内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Spring boot注解@Async线程池实例详解

    这篇文章主要介绍了Spring boot注解@Async线程池实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 从Spring3开始提供了@Async注解,该注解可以被标注在方法上,以便异步地调用该方法.调用者将在调用时立即返回,方法的实际执行将提交给Spring TaskExecutor的任务中,由指定的线程池中的线程执行. 1. TaskExecutor Spring异步线程池的接口类,其实质是java.util.concurrent

  • 深入学习springboot线程池的使用和扩展

    前言 我们常用ThreadPoolExecutor提供的线程池服务,springboot框架提供了@Async注解,帮助我们更方便的将业务逻辑提交到线程池中异步执行,今天我们就来实战体验这个线程池服务: 实战环境 windowns10: jdk1.8: springboot 1.5.9.RELEASE: 开发工具:IntelliJ IDEA: 实战源码 本次实战的源码可以在我的GitHub下载,地址:git@github.com:zq2599/blog_demos.git,项目主页: 这里面有多

  • SpringBoot2线程池定义使用方法解析

    我们都知道spring只是为我们简单的处理线程池,每次用到线程总会new 一个新的线程,效率不高,所以我们需要自定义一个线程池. 定义线程池 @Slf4j @EnableAsync @Configuration public class AsyncExecutorConfig implements AsyncConfigurer { @Bean public ThreadPoolTaskExecutor asyncServiceExecutor() { //返回可用处理器的虚拟机的最大数量不小于

  • SpringBoot实现线程池

    现在由于系统越来越复杂,导致很多接口速度变慢,这时候就会想到可以利用线程池来处理一些耗时并不影响系统的操作. 新建Spring Boot项目 1. ExecutorConfig.xml 新建线程池配置文件. @Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Val

  • Spring Boot 配置和使用多线程池的实现

    某些情况下,我们需要在项目中对多种任务分配不同的线程池进行执行.从而通过监控不同的线程池来控制不同的任务.为了达到这个目的,需要在项目中配置多线程池. spring boot 提供了简单高效的线程池配置和使用方案. 配置 首先是配置线程池的bean交给spring 管理: @Configuration public class TaskExecutePool { @Bean(name ="threadPoolA") public ThreadPoolTaskExecutormyTask

  • spring boot使用自定义的线程池执行Async任务

    在前面的博客中,//www.jb51.net/article/134866.htm 我们使用了spring boot的异步操作,当时,我们使用的是默认的线程池,但是,如果我们想根据项目来定制自己的线程池了,下面就来说说,如何定制线程池! 一.增加配置属性类 package com.chhliu.springboot.async.configuration; import org.springframework.boot.context.properties.ConfigurationProper

  • Spring Boot线程池使用的一些实用心得

    目录 前言 使用步骤 用postmain或者其他工具来多次测试请求一下 总结 前言 前两天做项目的时候,想提高一下插入表的性能优化,因为是两张表,先插旧的表,紧接着插新的表,一万多条数据就有点慢了 后面就想到了线程池ThreadPoolExecutor,而用的是Spring Boot项目,可以用Spring提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用 使用步骤 先创建一个线程池的配置,让Spring Boot加载,用来定义

  • SpringBoot 多任务并行+线程池处理的实现

    前言 前几篇文章着重介绍了后端服务数据库和多线程并行处理优化,并示例了改造前后的伪代码逻辑.当然了,优化是无止境的,前人栽树后人乘凉.作为我们开发者来说,既然站在了巨人的肩膀上,就要写出更加优化的程序. SpringBoot开发案例之JdbcTemplate批量操作 SpringBoot开发案例之CountDownLatch多任务并行处理 改造 理论上讲,线程越多程序可能更快,但是在实际使用中我们需要考虑到线程本身的创建以及销毁的资源消耗,以及保护操作系统本身的目的.我们通常需要将线程限制在一定

  • springboot创建线程池的两种方式小结

    目录 springboot创建线程池两种方式 1.使用static代码块创建 2.使用@Configuration @bean注解,程序启动时创建 springboot开启线程池 定义线程池 使用 springboot创建线程池两种方式 1.使用static代码块创建 这样的方式创建的好处是当代码用到线程池的时候才会初始化核心线程数 具体代码如下: public class HttpApiThreadPool { /** 获取当前系统的CPU 数目*/ static int cpuNums =

  • Springboot应用中线程池配置详细教程(最新2021版)

    前言:日常开发中我们常用ThreadPoolExecutor提供的线程池服务帮我们管理线程,在Springboot中更是提供了@Async注解来简化业务逻辑提交到线程池中执行的过程.由于Springboot中默认设置的corePoolSize=1和queyeCapacity=Integer.MAX_VALUE,相当于采用单线程处理所有任务,这就与多线程的目的背道而驰,所以这就要求我们在使用@Async注解时要配置线程池.本文就讲述下Springboot应用下的线程池配置. 背景知识:Spring

随机推荐