DUCC配置平台实现一个动态化线程池示例代码

目录
  • 1.背景
  • 2.代码实现
  • 3.动态线程池应用
  • 4.小结

作者:京东零售 张宾

1.背景

在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验。然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数。在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高。一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置。

本文以公司DUCC配置平台作为服务配置中心,以修改线程池核心线程数、最大线程数为例,实现一个简单的动态化线程池。

2.代码实现

当前项目中使用的是Spring 框架提供的线程池类ThreadPoolTaskExecutor,而ThreadPoolTaskExecutor底层又使用里了JDK中线程池类ThreadPoolExecutor,线程池类ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize可以在运行时设置核心线程数和最大线程数。

setCorePoolSize方法执行流程是:首先会覆盖之前构造函数设置的corePoolSize,然后,如果新的值比原始值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁,如果新的值比原来的值要大且工作队列不为空,则会创建新的工作线程。流程图如下:

setMaximumPoolSize方法: 首先会覆盖之前构造函数设置的maximumPoolSize,然后,如果新的值比原来的值要小,当多余的工作线程下次变成空闲状态的时候会被中断并销毁。

Spring 框架提供的线程池类ThreadPoolTaskExecutor,此类封装了对ThreadPoolExecutor有两个成员方法setCorePoolSize、setMaximumPoolSize的调用。

基于以上源代码分析,要实现一个简单的动态线程池需要以下几步:

(1)定义一个动态线程池类,继承ThreadPoolTaskExecutor,目的跟非动态配置的线程池类ThreadPoolTaskExecutor区分开;

(2)定义和实现一个动态线程池配置定时刷的类,目的定时对比ducc配置的线程池数和本地应用中线程数是否一致,若不一致,则更新本地动态线程池线程池数;

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key;

(4)定义和实现一个应用启动后根据动态线程池Bean和从ducc配置平台拉取配置刷新应用中的线程数配置;

接下来代码一一实现:

(1)动态线程池类

/**
 * 动态线程池
 *
 */
public class DynamicThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
}

(2)动态线程池配置定时刷新类

@Slf4j
public class DynamicThreadPoolRefresh implements InitializingBean {
    /**
     * Maintain all automatically registered and manually registered DynamicThreadPoolTaskExecutor.
     */
    private static final ConcurrentMap<String, DynamicThreadPoolTaskExecutor> DTP_REGISTRY = new ConcurrentHashMap<>();
    /**
     * @param threadPoolBeanName
     * @param threadPoolTaskExecutor
     */
    public static void registerDynamicThreadPool(String threadPoolBeanName, DynamicThreadPoolTaskExecutor threadPoolTaskExecutor) {
        log.info("DynamicThreadPool register ThreadPoolTaskExecutor, threadPoolBeanName: {}, executor: {}", threadPoolBeanName, ExecutorConverter.convert(threadPoolBeanName, threadPoolTaskExecutor.getThreadPoolExecutor()));
        DTP_REGISTRY.putIfAbsent(threadPoolBeanName, threadPoolTaskExecutor);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        this.refresh();
        //创建定时任务线程池
        ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, (new BasicThreadFactory.Builder()).namingPattern("DynamicThreadPoolRefresh-%d").daemon(true).build());
        //延迟1秒执行,每个1分钟check一次
        executorService.scheduleAtFixedRate(new RefreshThreadPoolConfig(), 1000L, 60000L, TimeUnit.MILLISECONDS);
    }
    private void refresh() {
        String dynamicThreadPool = "";
        try {
            if (DTP_REGISTRY.isEmpty()) {
                log.debug("DynamicThreadPool refresh DTP_REGISTRY is empty");
                return;
            }
            dynamicThreadPool = DuccConfigUtil.getValue(DuccConfigConstants.DYNAMIC_THREAD_POOL);
            if (StringUtils.isBlank(dynamicThreadPool)) {
                log.debug("DynamicThreadPool refresh dynamicThreadPool not config");
                return;
            }
            log.debug("DynamicThreadPool refresh dynamicThreadPool:{}", dynamicThreadPool);
            List<ThreadPoolProperties> threadPoolPropertiesList = JsonUtil.json2Object(dynamicThreadPool, new TypeReference<List<ThreadPoolProperties>>() {
            });
            if (CollectionUtils.isEmpty(threadPoolPropertiesList)) {
                log.error("DynamicThreadPool refresh dynamicThreadPool json2Object error!{}", dynamicThreadPool);
                return;
            }
            for (ThreadPoolProperties properties : threadPoolPropertiesList) {
                doRefresh(properties);
            }
        } catch (Exception e) {
            log.error("DynamicThreadPool refresh exception!dynamicThreadPool:{}", dynamicThreadPool, e);
        }
    }
    /**
     * @param properties
     */
    private void doRefresh(ThreadPoolProperties properties) {
        if (StringUtils.isBlank(properties.getThreadPoolBeanName())
                || properties.getCorePoolSize() < 1
                || properties.getMaxPoolSize() < 1
                || properties.getMaxPoolSize() < properties.getCorePoolSize()) {
            log.error("DynamicThreadPool refresh, invalid parameters exist, properties: {}", properties);
            return;
        }
        DynamicThreadPoolTaskExecutor threadPoolTaskExecutor = DTP_REGISTRY.get(properties.getThreadPoolBeanName());
        if (Objects.isNull(threadPoolTaskExecutor)) {
            log.warn("DynamicThreadPool refresh, DTP_REGISTRY not found {}", properties.getThreadPoolBeanName());
            return;
        }
        ThreadPoolProperties oldProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        if (Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())
                && Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            log.warn("DynamicThreadPool refresh, properties of [{}] have not changed.", properties.getThreadPoolBeanName());
            return;
        }
        if (!Objects.equals(oldProp.getCorePoolSize(), properties.getCorePoolSize())) {
            threadPoolTaskExecutor.setCorePoolSize(properties.getCorePoolSize());
            log.info("DynamicThreadPool refresh, corePoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getCorePoolSize());
        }
        if (!Objects.equals(oldProp.getMaxPoolSize(), properties.getMaxPoolSize())) {
            threadPoolTaskExecutor.setMaxPoolSize(properties.getMaxPoolSize());
            log.info("DynamicThreadPool refresh, maxPoolSize changed!{} {}", properties.getThreadPoolBeanName(), properties.getMaxPoolSize());
        }
        ThreadPoolProperties newProp = ExecutorConverter.convert(properties.getThreadPoolBeanName(), threadPoolTaskExecutor.getThreadPoolExecutor());
        log.info("DynamicThreadPool refresh result!{} oldProp:{},newProp:{}", properties.getThreadPoolBeanName(), oldProp, newProp);
    }
    private class RefreshThreadPoolConfig extends TimerTask {
        private RefreshThreadPoolConfig() {
        }
        @Override
        public void run() {
            DynamicThreadPoolRefresh.this.refresh();
        }
    }
}

线程池配置类

@Data
public class ThreadPoolProperties {
    /**
     * 线程池名称
     */
    private String threadPoolBeanName;
    /**
     * 线程池核心线程数量
     */
    private int corePoolSize;
    /**
     * 线程池最大线程池数量
     */
    private int maxPoolSize;
}

(3)引入公司ducc配置平台相关jar包并创建一个动态线程池配置key

ducc配置平台使用见:cf.jd.com/pages/viewp…

动态线程池配置key:dynamic.thread.pool

配置value:

[  {    "threadPoolBeanName": "submitOrderThreadPoolTaskExecutor",    "corePoolSize": 32,    "maxPoolSize": 128  }]

(4) 应用启动刷新应用本地动态线程池配置

@Slf4j
public class DynamicThreadPoolPostProcessor implements BeanPostProcessor {
    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        if (bean instanceof DynamicThreadPoolTaskExecutor) {
            DynamicThreadPoolRefresh.registerDynamicThreadPool(beanName, (DynamicThreadPoolTaskExecutor) bean);
        }
        return bean;
    }
}

3.动态线程池应用

动态线程池Bean声明

    <!-- 普通线程池 -->
    <bean id="threadPoolTaskExecutor" class="com.jd.concurrent.ThreadPoolTaskExecutorWrapper">
        <!-- 核心线程数,默认为 -->
        <property name="corePoolSize" value="128"/>
        <!-- 最大线程数,默认为Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="512"/>
        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动态线程池 -->
    <bean id="submitOrderThreadPoolTaskExecutor" class="com.jd.concurrent.DynamicThreadPoolTaskExecutor">
        <!-- 核心线程数,默认为 -->
        <property name="corePoolSize" value="32"/>
        <!-- 最大线程数,默认为Integer.MAX_VALUE -->
        <property name="maxPoolSize" value="128"/>
        <!-- 队列最大长度,一般需要设置值>=notifyScheduledMainExecutor.maxNum;默认为Integer.MAX_VALUE -->
        <property name="queueCapacity" value="500"/>
        <!-- 线程池维护线程所允许的空闲时间,默认为60s -->
        <property name="keepAliveSeconds" value="60"/>
        <!-- 线程池对拒绝任务(无线程可用)的处理策略,目前只支持AbortPolicy、CallerRunsPolicy;默认为后者 -->
        <property name="rejectedExecutionHandler">
            <!-- AbortPolicy:直接抛出java.util.concurrent.RejectedExecutionException异常 -->
            <!-- CallerRunsPolicy:主线程直接执行该任务,执行完之后尝试添加下一个任务到线程池中,可以有效降低向线程池内添加任务的速度 -->
            <!-- DiscardOldestPolicy:抛弃旧的任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <!-- DiscardPolicy:抛弃当前任务、暂不支持;会导致被丢弃的任务无法再次被执行 -->
            <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy"/>
        </property>
    </bean>
    <!-- 动态线程池刷新配置 -->
    <bean class="com.jd.concurrent.DynamicThreadPoolPostProcessor"/>
    <bean class="com.jd.concurrent.DynamicThreadPoolRefresh"/>

业务类注入Spring Bean后,直接使用即可

 @Resource
 private ThreadPoolTaskExecutor submitOrderThreadPoolTaskExecutor;
 Runnable asyncTask = ()-&gt;{...};
 CompletableFuture.runAsync(asyncTask, this.submitOrderThreadPoolTaskExecutor);

4.小结

本文从实际项目的业务痛点场景出发,并基于公司已有的ducc配置平台简单实现了线程池线程数量可配置。

以上就是DUCC配置平台实现一个动态化线程池示例代码的详细内容,更多关于DUCC配置平台动态化线程池的资料请关注我们其它相关文章!

(0)

相关推荐

  • Springboot 配置线程池创建线程及配置 @Async 异步操作线程池详解

    目录 前言 一.创建一个Springboot Web项目 二.新建ThreadPoolConfig 三.新建controller测试 四.演示结果 前言 众所周知,创建显示线程和直接使用未配置的线程池创建线程,都会被阿里的大佬给diss,所以我们要规范的创建线程. 至于 @Async 异步任务的用处是不想等待方法执行完就返回结果,提高软件前台响应速度,一个程序中会用到很多异步方法,所以需要使用线程池管理,防止影响性能. 一.创建一个Springboot Web项目 需要一个Springboot项

  • Springboot应用中线程池配置详细教程(最新2021版)

    前言:日常开发中我们常用ThreadPoolExecutor提供的线程池服务帮我们管理线程,在Springboot中更是提供了@Async注解来简化业务逻辑提交到线程池中执行的过程.由于Springboot中默认设置的corePoolSize=1和queyeCapacity=Integer.MAX_VALUE,相当于采用单线程处理所有任务,这就与多线程的目的背道而驰,所以这就要求我们在使用@Async注解时要配置线程池.本文就讲述下Springboot应用下的线程池配置. 背景知识:Spring

  • 浅析Tomcat使用线程池配置高并发连接

    目录 Tomcat使用线程池配置高并发连接 1:配置executor属性 2:配置Connector 一.Tomcat内存优化 1.JAVA_OPTS参数说明 二.Tomcat并发优化 1.Tomcat连接相关参数 1.参数说明 2.Tomcat中的配置示例 2.调整连接器connector的并发处理能力 1.参数说明 2.Tomcat中的配置示例 3.Tomcat缓存优化 1.参数说明 2.Tomcat中的配置示例 4.参考配置 1.旧有的配置 2.更改后的配置 Tomcat使用线程池配置高并

  • Java线程池配置的一些常见误区总结

    前言 由于线程的创建和销毁对操作系统来说都是比较重量级的操作,所以线程的池化在各种语言内都有实践,当然在 Java 语言中线程池是也非常重要的一部分,有 Doug Lea 大神对线程池的封装,我们使用的时候是非常方便,但也可能会因为不了解其具体实现,对线程池的配置参数存在误解. 我们经常在一些技术书籍或博客上看到,向线程池提交任务时,线程池的执行逻辑如下: 当一个任务被提交后,线程池首先检查正在运行的线程数是否达到核心线程数,如果未达到则创建一个线程. 如果线程池内正在运行的线程数已经达到了核心

  • SpringBoot异步使用@Async的原理以及线程池配置详解

    目录 前言 使用步骤 配置线程池类参数配置 自定义线程任务 总结 原理刨析 文章参考 前言 在实际项目开发中很多业务场景需要使用异步去完成,比如消息通知,日志记录,等非常常用的都可以通过异步去执行,提高效率,那么在Spring框架中应该如何去使用异步呢 使用步骤 完成异步操作一般有两种,消息队列MQ,和线程池处理ThreadPoolExecutor 而在Spring4中提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用@Asyn

  • DUCC配置平台实现一个动态化线程池示例代码

    目录 1.背景 2.代码实现 3.动态线程池应用 4.小结 作者:京东零售 张宾 1.背景 在后台开发中,会经常用到线程池技术,对于线程池核心参数的配置很大程度上依靠经验.然而,由于系统运行过程中存在的不确定性,我们很难一劳永逸地规划一个合理的线程池参数.在对线程池配置参数进行调整时,一般需要对服务进行重启,这样修改的成本就会偏高.一种解决办法就是,将线程池的配置放到配置平台侧,系统运行期间开发人员根据系统运行情况对核心参数进行动态配置. 本文以公司DUCC配置平台作为服务配置中心,以修改线程池

  • 用python实现的线程池实例代码

    python3标准库里自带线程池ThreadPoolExecutor和进程池ProcessPoolExecutor. 如果你用的是python2,那可以下载一个模块,叫threadpool,这是线程池.对于进程池可以使用python自带的multiprocessing.Pool. 当然也可以自己写一个threadpool. # coding:utf-8 import Queue import threading import sys import time import math class W

  • Python 使用threading+Queue实现线程池示例

    一.线程池 1.为什么需要使用线程池 1.1 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处理效率. 记创建线程消耗时间T1,执行任务消耗时间T2,销毁线程消耗时间T3,如果T1+T3>T2,那说明开启一个线程来执行这个任务太不划算了!在线程池缓存线程可用已有的闲置线程来执行新任务,避免了创建/销毁带来的系统开销. 1.2 线程并发数量过多,抢占系统资源从而导致阻塞. 线程能共享系统资源,如果同时执行的线程过多,就有可能导致系统资源不足而产生阻塞的情况. 1.3 对线

  • java中常见的6种线程池示例详解

    之前我们介绍了线程池的四种拒绝策略,了解了线程池参数的含义,那么今天我们来聊聊Java 中常见的几种线程池,以及在jdk7 加入的 ForkJoin 新型线程池 首先我们列出Java 中的六种线程池如下 线程池名称 描述 FixedThreadPool 核心线程数与最大线程数相同 SingleThreadExecutor 一个线程的线程池 CachedThreadPool 核心线程为0,最大线程数为Integer. MAX_VALUE ScheduledThreadPool 指定核心线程数的定时

  • Python异步爬虫多线程与线程池示例详解

    目录 背景 异步爬虫方式 多线程,多进程(不建议) 线程池,进程池(适当使用) 单线程+异步协程(推荐) 多线程 线程池 背景 当对多个url发送请求时,只有请求完第一个url才会接着请求第二个url(requests是一个阻塞的操作),存在等待的时间,这样效率是很低的.那我们能不能在发送请求等待的时候,为其单独开启进程或者线程,继续请求下一个url,执行并行请求 异步爬虫方式 多线程,多进程(不建议) 好处:可以为相关阻塞的操作单独开启线程或者进程,阻塞操作就可以异步会执行 弊端:不能无限制开

  • C++线程池实现代码

    前言 这段时间看了<C++并发编程实战>的基础内容,想着利用最近学的知识自己实现一个简单的线程池. 什么是线程池 线程池(thread pool)是一种线程使用模式.线程过多或者频繁创建和销毁线程会带来调度开销,进而影响缓存局部性和整体性能.而线程池维护着多个线程,等待着管理器分配可并发执行的任务.这避免了在处理短时间任务时创建与销毁线程的代价,以及保证了线程的可复用性.线程池不仅能够保证内核的充分利用,还能防止过分调度. 思路 个人对线程池的理解是:利用已经创建的固定数量的线程去执行指定的任

  • 使用Vue3实现一个Upload组件的示例代码

    通用上传组件开发 开发上传组件前我们需要了解: FormData上传文件所需API dragOver文件拖拽到区域时触发 dragLeave文件离开拖动区域 drop文件移动到有效目标时 首先实现一个最基本的上传流程: 基本上传流程,点击按钮选择,完成上传 代码如下: <template> <div class="app-container"> <!--使用change事件--> <input type="file" @ch

  • 基于Python编写一个点名器的示例代码

    目录 前言 主界面 添加姓名 查看花名册 使用指南 名字转动功能 完整代码 前言 想起小学的时候老师想点名找小伙伴回答问题的时候,老师竟斥巨资买了个点名器.今日无聊便敲了敲小时候老师斥巨资买的点名器. 本人姓白,就取名小白点名器啦,嘿嘿 代码包含:添加姓名.查看花名册.使用指南.随机抽取名字的功能(完整源码在最后) 主界面 定义主界面.使用“w+”模式创建test.txt文件(我添加了个背景图片,若不需要可省略) #打开时预加载储存在test.txt文件中的花名册 namelist = [] w

  • 用ES6的class模仿Vue写一个双向绑定的示例代码

    本文介绍了用ES6的class模仿Vue写一个双向绑定的示例代码,分享给大家,具体如下: 最终效果如下: 构造器(constructor) 构造一个TinyVue对象,包含基本的el,data,methods class TinyVue{ constructor({el, data, methods}){ this.$data = data this.$el = document.querySelector(el) this.$methods = methods // 初始化 this._com

  • QT实现制作一个ListView列表的示例代码

    目录 1.概述 2.代码示例 1.自定义QListWidget 2.自定义QListWidgetItem 3.使用 3.图片演示 1.概述 案例:使用Qt制作一个ListView.点击ListView的Item可以用于测试OpenCV的各种效果 自定义一个:MainListView继承QListWidget .MainListViewItem继承QListWidgetItem 2.代码示例 1.自定义QListWidget mainlistview.h class MainListView :

随机推荐