Spring Boot使用线程池处理上万条数据插入功能

目录
  • # 前言
  • # 使用步骤

# 前言

前两天做项目的时候,想提高一下插入表的性能优化,因为是两张表,先插旧的表,紧接着插新的表,一万多条数据就有点慢了

后面就想到了线程池ThreadPoolExecutor,而用的是Spring Boot项目,可以用Spring提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用

# 使用步骤

先创建一个线程池的配置,让Spring Boot加载,用来定义如何创建一个ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类

@Configuration
@EnableAsync
public class ExecutorConfig {
    private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);
    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;   

    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;   

    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;   

    @Value("${async.executor.thread.name.prefix}")
    private String namePrefix;
    @Bean(name = "asyncServiceExecutor")
    public Executor asyncServiceExecutor() {
        logger.info("start asyncServiceExecutor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 

        //配置核心线程数
        executor.setCorePoolSize(corePoolSize);   

        //配置最大线程数
        executor.setMaxPoolSize(maxPoolSize);   

        //配置队列大小
        executor.setQueueCapacity(queueCapacity);    

        //配置线程池中的线程的名称前缀
        executor.setThreadNamePrefix(namePrefix);
        // rejection-policy:当pool已经达到max size的时候,如何处理新任务
        // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行  

         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
        return executor;
     }
}

@Value是我配置在application.properties,可以参考配置,自由定义

# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-

创建一个Service接口,是异步线程的接口

public interface AsyncService {
    /**
      * 执行异步任务
      * 可以根据需求,自己加参数拟定,我这里就做个测试演示
      */
    void executeAsync();
}

实现类

@Service
public class AsyncServiceImpl implements AsyncService {
    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
    @Override
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        logger.info("start executeAsync");
        System.out.println("异步线程要做的事情");
        System.out.println("可以在这里执行批量插入等耗时的事情");
        logger.info("end executeAsync");
    }
}

在executeAsync()方法上增加注解@Async("asyncServiceExecutor"),asyncServiceExecutor方法是前面ExecutorConfig.java中的方法名,表明executeAsync方法进入的线程池是asyncServiceExecutor方法创建的

接下来就是在Controller里或者是哪里通过注解@Autowired注入这个Service

@Autowiredprivate
AsyncService asyncService;

@GetMapping("/async")
public void async(){
    asyncService.executeAsync();
}

日志打印

2022-07-16 22:15:47.655  INFO 10516 --- [async-service-5] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
 异步线程要做的事情
 可以在这里执行批量插入等耗时的事情
 2022-07-16 22:15:47.655  INFO 10516 --- [async-service-5] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync
 2022-07-16 22:15:47.770  INFO 10516 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
 异步线程要做的事情
 可以在这里执行批量插入等耗时的事情
 2022-07-16 22:15:47.770  INFO 10516 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync
 2022-07-16 22:15:47.816  INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
 异步线程要做的事情
 可以在这里执行批量插入等耗时的事情
 2022-07-16 22:15:47.816  INFO 10516 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync
 2022-07-16 22:15:48.833  INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
 异步线程要做的事情
 可以在这里执行批量插入等耗时的事情
 2022-07-16 22:15:48.834  INFO 10516 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync
 2022-07-16 22:15:48.986  INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
 异步线程要做的事情
 可以在这里执行批量插入等耗时的事情
 2022-07-16 22:15:48.987  INFO 10516 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync

通过以上日志可以发现,[async-service-]是有多个线程的,显然已经在我们配置的线程池中执行了,并且每次请求中,controller的起始和结束日志都是连续打印的,表明每次请求都快速响应了,而耗时的操作都留给线程池中的线程去异步执行;

虽然我们已经用上了线程池,但是还不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?这里我创建了一个ThreadPoolTaskExecutor的子类,在每次提交线程的时候都会将当前线程池的运行状况打印出来

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.concurrent.Callable;import java.util.concurrent.Future;import java.util.concurrent.ThreadPoolExecutor;
/**
* @Author: 腾腾
* @Date: 2022/7/16/0016 22:19
*/
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
    private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class);
    private void showThreadPoolInfo(String prefix) {
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
        if (null == threadPoolExecutor) {
            return;
        }
        logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
        this.getThreadNamePrefix(),
        prefix,
        threadPoolExecutor.getTaskCount(),
        threadPoolExecutor.getCompletedTaskCount(),
        threadPoolExecutor.getActiveCount(),
        threadPoolExecutor.getQueue().size());
     }
    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }
    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }
    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }
    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }
    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }
    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }
}

如上所示,showThreadPoolInfo方法中将任务总数、已完成数、活跃线程数,队列大小都打印出来了,然后Override了父类的execute、submit等方法,在里面调用showThreadPoolInfo方法,这样每次有任务被提交到线程池的时候,都会将当前线程池的基本情况打印到日志中;

修改ExecutorConfig.java的asyncServiceExecutor方法,将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改为ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()

@Bean(name = "asyncServiceExecutor")
public Executor asyncServiceExecutor() {
    logger.info("start asyncServiceExecutor");
    //在这里修改
    ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();
    //配置核心线程数
    executor.setCorePoolSize(corePoolSize);
    //配置最大线程数
    executor.setMaxPoolSize(maxPoolSize);
    //配置队列大小
    executor.setQueueCapacity(queueCapacity);
    //配置线程池中的线程的名称前缀
    executor.setThreadNamePrefix(namePrefix);

    // rejection-policy:当pool已经达到max size的时候,如何处理新任务
    // CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    //执行初始化
    executor.initialize();
    return executor;
}

再次启动该工程测试

2022-07-16 22:23:30.951  INFO 14088 --- [nio-8087-exec-2] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [0], completedTaskCount [0], activeCount [0], queueSize [0]
2022-07-16 22:23:30.952  INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2022-07-16 22:23:30.953  INFO 14088 --- [async-service-1] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync
2022-07-16 22:23:31.351  INFO 14088 --- [nio-8087-exec-3] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [1], completedTaskCount [1], activeCount [0], queueSize [0]
2022-07-16 22:23:31.353  INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2022-07-16 22:23:31.353  INFO 14088 --- [async-service-2] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync
2022-07-16 22:23:31.927  INFO 14088 --- [nio-8087-exec-5] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [2], completedTaskCount [2], activeCount [0], queueSize [0]
2022-07-16 22:23:31.929  INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2022-07-16 22:23:31.930  INFO 14088 --- [async-service-3] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync
2022-07-16 22:23:32.496  INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [3], completedTaskCount [3], activeCount [0], queueSize [0]
2022-07-16 22:23:32.498  INFO 14088 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl   : start executeAsync
异步线程要做的事情
可以在这里执行批量插入等耗时的事情
2022-07-16 22:23:32.499  INFO 14088 --- [async-service-4] c.u.d.e.executor.impl.AsyncServiceImpl   : end executeAsync

注意这一行日志:

2022-07-16 22:23:32.496  INFO 14088 --- [nio-8087-exec-7] u.d.e.e.i.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [3], completedTaskCount [3], activeCount [0], queueSize [0]

这说明提交任务到线程池的时候,调用的是submit(Callable task)这个方法,当前已经提交了3个任务,完成了3个,当前有0个线程在处理任务,还剩0个任务在队列中等待,线程池的基本情况一路了然;

到此这篇关于Spring Boot使用线程池处理上万条数据插入的文章就介绍到这了,更多相关Spring Boot线程池处理上万条数据插入内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Springboot线程池并发处理数据优化方式

    目录 第一步:首先配置线程基本参数 第二步:让Spring Boot加载 第三步:创建一个service接口 第四步:编写现实类 第五步:测试结果如下 第一步:首先配置线程基本参数 可以放在application.propertes文件种也可以放在自己新建的config/文件目录下,注意:但是需要使用@PropertySource把配置文件进行加载. # 异步线程配置 # 配置核心线程数 async.executor.thread.core_pool_size = 8 # 配置最大线程数 asy

  • SpringBoot使用异步线程池实现生产环境批量数据推送

    目录 前言 编写线程池配置类 编写异步服务 异步批量上报数据 总结 前言 SpringBoot使用异步线程池: 1.编写线程池配置类,自定义一个线程池: 2.定义一个异步服务: 3.使用@Async注解指向定义的线程池: 这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为.但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用

  • Spring Boot使用线程池处理上万条数据插入功能

    目录 # 前言 # 使用步骤 # 前言 前两天做项目的时候,想提高一下插入表的性能优化,因为是两张表,先插旧的表,紧接着插新的表,一万多条数据就有点慢了 后面就想到了线程池ThreadPoolExecutor,而用的是Spring Boot项目,可以用Spring提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用 # 使用步骤 先创建一个线程池的配置,让Spring Boot加载,用来定义如何创建一个ThreadPoolTask

  • Spring Boot配置线程池拒绝策略的场景分析(妥善处理好溢出的任务)

    目录 场景重现 配置拒绝策略 代码示例 通过之前三篇关于Spring Boot异步任务实现的博文,我们分别学会了用@Async创建异步任务.为异步任务配置线程池.使用多个线程池隔离不同的异步任务.今天这篇,我们继续对上面的知识进行完善和优化! 如果你已经看过上面几篇内容并已经掌握之后,一起来思考下面这个问题: 假设,线程池配置为核心线程数2.最大线程数2.缓冲队列长度2.此时,有5个异步任务同时开始,会发生什么? 场景重现 我们先来把上面的假设用代码实现一下: 第一步:创建Spring Boot

  • 基于Spring Boot的线程池监控问题及解决方案

    目录 前言 为什么需要对线程池进行监控 如何做线程池的监控 数据采集 数据存储以及大盘的展示 进一步扩展以及思考 如何合理配置线程池参数 如何动态调整线程池参数 如何给不同的服务之间做线程池的隔离 实现方案 前言 这篇是推动大家异步编程的思想的线程池的准备篇,要做好监控,让大家使用无后顾之忧,敬畏生产. 为什么需要对线程池进行监控 Java线程池作为最常使用到的并发工具,相信大家都不陌生,但是你真的确定使用对了吗?大名鼎鼎的阿里Java代码规范要求我们不使用 Executors来快速创建线程池,

  • Spring Boot使用Spring的异步线程池的实现

    前言 线程池,从名字上来看,就是一个保存线程的"池子",凡事都有其道理,那线程池的好处在哪里呢? 我们要让计算机为我们干一些活,其实都是在使用线程,使用方法就是new一个Runnable接口或者新建一个子类,继承于Thread类,这就会涉及到线程对象的创建与销毁,这两个操作无疑是耗费我们系统处理器资源的,那如何解决这个问题呢? 线程池其实就是为了解决这个问题而生的. 线程池提供了处理系统性能和大用户量请求之间的矛盾的方法,通过对多个任务重用已经存在的线程对象,降低了对线程对象创建和销毁

  • Spring Boot异步线程间数据传递的四种方式

    目录 Spring Boot 自定义线程池实现异步开发 1. 手动设置 2. 线程池设置TaskDecorator 3. InheritableThreadLocal 4. TransmittableThreadLocal TransmittableThreadLocal原理 总结 Spring Boot 自定义线程池实现异步开发 Spring Boot 自定义线程池实现异步开发相信看过的都了解,但是在实际开发中需要在父子线程之间传递一些数据,比如用户信息,链路信息等等 比如用户登录信息使用Th

  • Spring 与 JDK 线程池的简单使用示例详解

    1.配置自定义共享线程池(Spring线程池) @Configuration @EnableAsync public class ThreadPoolConfig{ //主要任务的调度,计划执行 @Bean("taskScheduler") public Executor createScheduler(){ // 创建一个线程池对象 ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler(); // 定义一个线程

  • Spring Boot数据库链接池配置方法

    配置方法 基于当前的1.5.2.RELEASE的Spring Boot. 依照官方文档,如果增加了如下依赖的配置,或者类路径中存在spring-boot-starter-jdbc的jar,那么已默认启用了数据库链接池. <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dep

  • 基于Spring中的线程池和定时任务功能解析

    1.功能介绍 Spring框架提供了线程池和定时任务执行的抽象接口:TaskExecutor和TaskScheduler来支持异步执行任务和定时执行任务功能.同时使用框架自己定义的抽象接口来屏蔽掉底层JDK版本间以及Java EE中的线程池和定时任务处理的差异. 另外Spring还支持集成JDK内部的定时器Timer和Quartz Scheduler框架. 2.线程池的抽象:TaskExecutor TaskExecutor涉及到的相关类图如下: TaskExecutor接口源代码如下所示: p

  • 浅谈Spring @Async异步线程池用法总结

    本文介绍了Spring @Async异步线程池用法总结,分享给大家,希望对大家有帮助 1. TaskExecutor spring异步线程池的接口类,其实质是Java.util.concurrent.Executor Spring 已经实现的异常线程池: 1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程. 2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作.只适用于不需要多线程的地方 3. Conc

  • 对spring task和线程池的深入研究

    目录 spring task和线程池的研究 1.如何实现spring task定时任务的配置 2.task里面的一个job方法如何使用多线程,配置线程池 spring 线程池配置 默认线程池ThreadPoolTaskExecutor配置 自定义线程池ThreadPoolTaskExecutor配置 spring task和线程池的研究 最近因工作需求,研究了一下spring task定时任务,和线程池,有了一定收获,记录一下 涉及如下内容 1.如何实现spring task定时任务的配置 2.

随机推荐