Java动态线程池插件dynamic-tp集成过程浅析

目录
  • 前言
  • 快速开始
  • 监控数据和第三方平台对接
    • logging方式
    • MicroMeter方式
  • 总结

前言

dynamic-tp是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改,目前支持的配置中心有Apollo,NacosZookeeper,同时dynamic-tp支持线程池的监控和报警,具体特性如下:

  • 基于Spring框架,现只支持SpringBoot项目使用,轻量级,引入 starter 即可使用
  • 基于配置中心实现线程池参数动态调整,实时生效;集成主流配置中心,已支持 Nacos、Apollo,Zookeeper, 同时也提供 SPI 接口可自定义扩展实现
  • 内置通知报警功能,提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝策略触发报警), 默认支持企业微信、钉钉报警,同时提供 SPI 接口可自定义扩展实现
  • 内置线程池指标采集功能,支持通过 MicroMeter、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现
  • 集成管理常用第三方组件的线程池,已集成 SpringBoot 内置 WebServer(tomcat、undertow、jetty)的线程池管理
  • Builder 提供TTL包装功能,生成的线程池支持上下文信息传递

具体介绍及使用方式

为了方便用户快速接入,笔者这里对Zookeeper配置中心接入做一个详细的介绍,其他配置中心的demo示例可以参考项目的example,该模块也是笔者贡献的。这里我会详细的介绍Zookeeper配置中心的接入和对接监控平台PrometheusGrafana

快速开始

pom依赖

dynamic-tp-spring-boot-starter-zookeeper是集成dynamic-tp的starter,这里引入了micrometer-registry-prometheusspring-boot-starter-actuator用于对接Prometheus

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-logging</artifactId>
        </dependency>
        <dependency>
            <groupId>io.github.lyh200</groupId>
            <artifactId>dynamic-tp-spring-boot-starter-zookeeper</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>io.micrometer</groupId>
            <artifactId>micrometer-registry-prometheus</artifactId>
            <version>1.6.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

YML配置

server:
  port: 8888

spring:
  application:
    name: dynamic-tp-zookeeper-demo
  # 下面是接入zk配置中心的配置  
  dynamic:
    tp:
      config-type: properties # zookeeper只支持properties配置
      zookeeper:
        config-version: 1.0.0 # 配置版本号
        zk-connect-str: 127.0.0.1:2181 # zk配置中心,如果是集群用逗号分开
        root-node: /configserver/userproject # 项目节点
        node: dynamic-tp-zookeeper-demo # 配置文件节点
# 对接prometheus
management:
  metrics:
    tags:
      application: ${spring.application.name}
  endpoints:
    web:
      exposure:
        include: '*'

配置中心dynamic-tp-zookeeper-demo配置文件配置

注:Zookeeper配置中心只支持properties类型配置,配置示例如下:

spring.dynamic.tp.executors部分是对线程池的配置,该配置是数组类型,可以定义多个线程池

spring.dynamic.tp.executors[1].notifyItems部分配置是对线程池报警平台的配置,可以配置多个报警平台

# 开启动态线程池
spring.dynamic.tp.enabled=true
# 打印动态线程池banner
spring.dynamic.tp.enabledBanner=true
# 开启线程池监控指标收集
spring.dynamic.tp.enabledCollect=true
# 线程池监控指标收集类型 logging-日志文件 micrometer-采集到第三方平台 
spring.dynamic.tp.collectorType=logging
# 采集监控数据时间间隔 5s
spring.dynamic.tp.monitorInterval=5
spring.dynamic.tp.executors[0].threadPoolName=dynamic-tp-test-1
spring.dynamic.tp.executors[0].corePoolSize=50
spring.dynamic.tp.executors[0].maximumPoolSize=50
spring.dynamic.tp.executors[0].queueCapacity=3000
spring.dynamic.tp.executors[0].queueType=VariableLinkedBlockingQueue
spring.dynamic.tp.executors[0].rejectedHandlerType=CallerRunsPolicy
spring.dynamic.tp.executors[0].keepAliveTime=50
spring.dynamic.tp.executors[0].allowCoreThreadTimeOut=false
spring.dynamic.tp.executors[0].threadNamePrefix=test1
spring.dynamic.tp.executors[0].notifyItems[0].type=capacity
spring.dynamic.tp.executors[0].notifyItems[0].enabled=false
spring.dynamic.tp.executors[0].notifyItems[0].threshold=80
spring.dynamic.tp.executors[0].notifyItems[0].platforms[0]=ding
spring.dynamic.tp.executors[0].notifyItems[0].platforms[1]=wechat
spring.dynamic.tp.executors[0].notifyItems[0].interval=120
spring.dynamic.tp.executors[0].notifyItems[1].type=change
spring.dynamic.tp.executors[0].notifyItems[1].enabled=false
spring.dynamic.tp.executors[0].notifyItems[2].type=liveness
spring.dynamic.tp.executors[0].notifyItems[2].enabled=false
spring.dynamic.tp.executors[0].notifyItems[2].threshold=80
spring.dynamic.tp.executors[0].notifyItems[3].type=reject
spring.dynamic.tp.executors[0].notifyItems[3].enabled=false
spring.dynamic.tp.executors[0].notifyItems[3].threshold=1
spring.dynamic.tp.executors[1].threadPoolName=dynamic-tp-test-2
spring.dynamic.tp.executors[1].corePoolSize=20
spring.dynamic.tp.executors[1].maximumPoolSize=30
spring.dynamic.tp.executors[1].queueCapacity=1000
spring.dynamic.tp.executors[1].queueType=VariableLinkedBlockingQueue
spring.dynamic.tp.executors[1].rejectedHandlerType=CallerRunsPolicy
spring.dynamic.tp.executors[1].keepAliveTime=50
spring.dynamic.tp.executors[1].allowCoreThreadTimeOut=false
spring.dynamic.tp.executors[1].threadNamePrefix=test2
spring.dynamic.tp.executors[1].notifyItems[0].type=capacity
spring.dynamic.tp.executors[1].notifyItems[0].enabled=false
spring.dynamic.tp.executors[1].notifyItems[0].threshold=80
spring.dynamic.tp.executors[1].notifyItems[0].platforms[0]=ding
spring.dynamic.tp.executors[1].notifyItems[0].platforms[1]=wechat
spring.dynamic.tp.executors[1].notifyItems[0].interval=120
spring.dynamic.tp.executors[1].notifyItems[1].type=change
spring.dynamic.tp.executors[1].notifyItems[1].enabled=false
spring.dynamic.tp.executors[1].notifyItems[2].type=liveness
spring.dynamic.tp.executors[1].notifyItems[2].enabled=false
spring.dynamic.tp.executors[1].notifyItems[2].threshold=80
spring.dynamic.tp.executors[1].notifyItems[3].type=reject
spring.dynamic.tp.executors[1].notifyItems[3].enabled=false
spring.dynamic.tp.executors[1].notifyItems[3].threshold=1

创建线程池

创建线程池,会注册到dynamic-tp

@Configuration
public class ThreadPoolConfiguration {
    @Bean
    public DtpExecutor dtpExecutor() {

        return ThreadPoolCreator.createDynamicFast("dynamic-tp-test-1");
    }
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        return ThreadPoolBuilder.newBuilder()
                .threadPoolName("dynamic-tp-test-2")
                .corePoolSize(10)
                .maximumPoolSize(15)
                .keepAliveTime(15000)
                .timeUnit(TimeUnit.MILLISECONDS)
                .workQueue(QueueTypeEnum.SYNCHRONOUS_QUEUE.getName(), null, false)
                .buildDynamic();
    }
}

使用

初次使用不注意官网文档的话可能会通过@Autowired的方式使用,目前版本这么使用是不正确的,因为当读取配置文件后,配置文件生成的线程池对象就会把@Bean创建的线程池覆盖了,所以目前版本只能根据名字从DtpRegistry获取线程池对象了,笔者也和项目作者沟通过,项目作者也觉得通过@Autowired方式更好一些,后面版本也会优化。

@Slf4j
@RestController
@SuppressWarnings("all")
public class TestController {
    @GetMapping("/dtp-zookeeper-example/test")
    public String test() {
        new Thread(() -> {
            try {
                task();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
        return "success";
    }
    public void task() throws InterruptedException {
        DtpExecutor dtpExecutor1 = DtpRegistry.getExecutor("dynamic-tp-test-1");
        DtpExecutor dtpExecutor2 = DtpRegistry.getExecutor("dynamic-tp-test-2");
        for (int i = 0; i < 100; i++) {
            Thread.sleep(100);
            dtpExecutor1.execute(() -> {
                log.info("i am dynamic-tp-test-1 task");
            });
            dtpExecutor2.execute(() -> {
                log.info("i am dynamic-tp-test-2 task");
            });
        }
    }
}

监控数据和第三方平台对接

logging方式

spring.dynamic.tp.collectorType=logging,logging方式的监控数据,采集日志文件中,文件的位置可以通过spring.dynamic.tp.logPath配置,默认 ${user.home}/logs,日志内容如下:

2022-03-07 13:48:31.585 INFO [dtp-monitor1:D.M.LOG] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":0,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":0,"corePoolSize":40,"queueType":"SynchronousQueue","completedTaskCount":0,"dtpName":"dynamic-tp-test-2","maximumPoolSize":55}
2022-03-07 13:48:31.585 INFO [dtp-monitor1:D.M.LOG] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"queueCapacity":2147483647,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":2147483647,"corePoolSize":10,"queueType":"TaskQueue","completedTaskCount":0,"dtpName":"tomcatWebServerTp","maximumPoolSize":200}
2022-03-07 13:48:36.590 INFO [dtp-monitor1:D.M.LOG] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":1024,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":1024,"corePoolSize":50,"queueType":"VariableLinkedBlockingQueue","completedTaskCount":0,"dtpName":"dynamic-tp-test-1","maximumPoolSize":50}
2022-03-07 13:48:36.590 INFO [dtp-monitor1:D.M.LOG] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"rejectHandlerName":"CallerRunsPolicy","queueCapacity":0,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":0,"corePoolSize":40,"queueType":"SynchronousQueue","completedTaskCount":0,"dtpName":"dynamic-tp-test-2","maximumPoolSize":55}
2022-03-07 13:48:36.590 INFO [dtp-monitor1:D.M.LOG] {"activeCount":0,"queueSize":0,"largestPoolSize":0,"poolSize":0,"queueCapacity":2147483647,"fair":false,"rejectCount":0,"waitTaskCount":0,"taskCount":0,"queueRemainingCapacity":2147483647,"corePoolSize":10,"queueType":"TaskQueue","completedTaskCount":0,"dtpName":"tomcatWebServerTp","maximumPoolSize":200}

MicroMeter方式

spring.dynamic.tp.collectorType=micrometer 采集数据到第三方平台,这里使用的是Prometheus。可视化展示需要在Grafana平台配置,数据源选择Prometheus,然后创建Dashboard,先要在Dashboard配置动态的参数Variables,如下图:

配置完参数后,我们创建Panel,可以创建多个Panel,然后配置Panel的数据查询,报警显示等等,如下图:

最终的Dashboard如下图:

总结

促使我使用dynamic-tp的一个最主要的点就是它轻量级,提供了线程池监控、报警、基于配置中心修改线程池参数功能,完全满足了我在业务中的需要,同时监控数据能够对接公司现有的监控平台(Prometheus+Grafana),并且在使用上也十分方便,我在刚接触这个插件的时候它还不支持Zookeeper,恰好我司使用的配置中心就是Zookeeper,因为我在此基础上把Zookeeper配置中心集成到了dynamic-tp并贡献给社区,希望可以帮到更多的人,下一篇简单介绍下dynamic-tp支持Zookeeper的实现细节。

到此这篇关于Java动态线程池插件dynamic-tp集成过程浅析的文章就介绍到这了,更多相关Java dynamic-tp内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java动态线程池插件dynamic-tp集成zookeeper

    目录 前言 配置刷新 Zookeeper配置中心接入扩展实现 总结 前言 dynamic-tp是一个轻量级的动态线程池插件,它是一个基于配置中心的动态线程池,线程池的参数可以通过配置中心配置进行动态的修改,在配置中心的支持上最开始的时候支持Nacos和Apollo,由于笔者公司用的配置中心是Zookeeper,所以就想着扩展支持Zookeeper,在了解源码支持发现dynamic-tp的扩展能力做的很好,提供了扩展接口,只要我开发对应的配置中心模块即可,最终笔者实现了Zookeeper的支持并贡

  • 深入理解Java编程线程池的实现原理

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间. 那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务? 在Java中可以通过线程池来达到这样的效果.今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,

  • Java自定义线程池的实现示例

    目录 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 二.JDK线程池工具类. 三.业界知名自定义线程池扩展使用. 一.Java语言本身也是多线程,回顾Java创建线程方式如下: 1.继承Thread类,(Thread类实现Runnable接口),来个类图加深印象. 2.实现Runnable接口实现无返回值.实现run()方法,啥时候run,黑话了. 3.实现Callable接口重写call()+FutureTask获取. public class CustomThread {

  • java基于线程池和反射机制实现定时任务完整实例

    本文实例讲述了java基于线程池和反射机制实现定时任务的方法.分享给大家供大家参考,具体如下: 主要包括如下实现类: 1. Main类: 任务执行的入口: 调用main方法,开始加载任务配置并执行任务 package com.yanek.task; import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import ja

  • java的线程池框架及线程池的原理

    java 线程池详解 什么是线程池? 提供一组线程资源用来复用线程资源的一个池子 为什么要用线程池? 线程的资源是有限的,当处理一组业务的时候,我们需要不断的创建和销毁线程,大多数情况下,我们需要反复的进行大量的创建和销毁工作,这个动作对于服务器而言,也是很浪费的一种情况,这时候我们可以利用线程池来复用这一部分已经创建过的线程资源,避免不断的创建和销毁的动作. 线程池的原理 创建好固定数量的线程,吧线程先存下来,有任务提交的时候,把资源放到等待队列中,等待线程池中的任务队列不断的去消费处理这个队

  • java 定时器线程池(ScheduledThreadPoolExecutor)的实现

    前言 定时器线程池提供了定时执行任务的能力,即可以延迟执行,可以周期性执行.但定时器线程池也还是线程池,最底层实现还是ThreadPoolExecutor,可以参考我的另外一篇文章多线程–精通ThreadPoolExecutor. 特点说明 1.构造函数 public ScheduledThreadPoolExecutor(int corePoolSize) { // 对于其他几个参数在ThreadPoolExecutor中都已经详细分析过了,所以这里,将不再展开 // 这里我们可以看到调用基类

  • JAVA 创建线程池的注意事项

    1.创建线程或线程池时请指定有意义的线程名称,方便出错时回溯.创建线程池的时候请使用带ThreadFactory的构造函数,并且提供自定义ThreadFactory实现或者使用第三方实现. ThreadFactory namedThreadFactory = new ThreadFactoryBuilder() .setNameFormat("demo-pool-%d").build(); ExecutorService singleThreadPool = new ThreadPoo

  • JAVA 自定义线程池的最大线程数设置方法

    一:CPU密集型: 定义:CPU密集型也是指计算密集型,大部分时间用来做计算逻辑判断等CPU动作的程序称为CPU密集型任务.该类型的任务需要进行大量的计算,主要消耗CPU资源.  这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数. 特点:    01:CPU 使用率较高(也就是经常计算一些复杂的运算,逻辑处理等情况)非常多的情况下使用    02:针对单台机

  • Java常用线程池原理及使用方法解析

    一.简介 什么是线程池? 池的概念大家也许都有所听闻,池就是相当于一个容器,里面有许许多多的东西你可以即拿即用.java中有线程池.连接池等等.线程池就是在系统启动或者实例化池时创建一些空闲的线程,等待工作调度,执行完任务后,线程并不会立即被销毁,而是重新处于空闲状态,等待下一次调度. 线程池的工作机制? 在线程池的编程模式中,任务提交并不是直接提交给线程,而是提交给池.线程池在拿到任务之后,就会寻找有没有空闲的线程,有则分配给空闲线程执行,暂时没有则会进入等待队列,继续等待空闲线程.如果超出最

  • Java 判断线程池所有任务是否执行完毕的操作

    我就废话不多说了,大家还是直接看代码吧~ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class Test { public static void main(String args[]) throws InterruptedException { ExecutorService exe = Executors.newFixedThreadPool(3); f

随机推荐