SpringBoot使用异步线程池实现生产环境批量数据推送

目录
  • 前言
  • 编写线程池配置类
  • 编写异步服务
  • 异步批量上报数据
  • 总结

前言

SpringBoot使用异步线程池:

1、编写线程池配置类,自定义一个线程池;

2、定义一个异步服务;

3、使用@Async注解指向定义的线程池;

这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为。但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用线程池上报,极大提高了速度。

编写线程池配置类

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * 类名称:ExecutorConfig
 * ********************************
 * <p>
 * 类描述:线程池配置
 *
 * @author guoj
 * @date 2021-09-07 09:00
 */
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
    /**
     * 定义数据上报线程池
     * @return
     */
    @Bean("dataCollectionExecutor")
    public Executor dataCollectionExecutor() {
 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 
        // 核心线程数量:当前机器的核心数
        executor.setCorePoolSize(
                Runtime.getRuntime().availableProcessors());
 
        // 最大线程数
        executor.setMaxPoolSize(
                Runtime.getRuntime().availableProcessors() * 2);
 
        // 队列大小
        executor.setQueueCapacity(Integer.MAX_VALUE);
 
        // 线程池中的线程名前缀
        executor.setThreadNamePrefix("sjsb-");
 
        // 拒绝策略:直接拒绝
        executor.setRejectedExecutionHandler(
                new ThreadPoolExecutor.AbortPolicy());
 
        // 执行初始化
        executor.initialize();
 
        return executor;
    }
}

PS:

1)、需要注意,这里一定要自己定义ThreadPoolTaskExecutor线程池,否则springboot的异步注解会执行默认线程池,存在线程阻塞导致CPU飙高及内存溢出的风险。这一点可以参考阿里开发手册,线程池定义这块明确提到了这一点;

2)、在@Bean注解中定义线程池名称,后面异步注解会用到。

编写异步服务

/**
 * 异步方法的服务, 不影响主程序运行。
 */
@Service
public class AsyncService {
 
    private final Logger log = LoggerFactory.getLogger(AsyncService.class);
 
    /**
     * 发送短信
     */
    @Async("sendMsgExecutor")
    public void sendMsg(String access_token, Consult item, Map<String, String> configMap) {
        // 此处编写发送短信业务
        // 1、buildConsultData();
        // 2、sendMsg();
    }
 
    /**
     * 发送微信订阅消息
     */
    @Async
    public void sendSubscribeMsg(String access_token, Consult item, Map<String, String> configMap) {
        // 此处编写发送微信订阅消息业务
        // 1、buildConsultData();
        // 2、sendSubscribeMsg();
    }
 
    /**
     * 数据并上报
     */
    @Async("dataCollectionExecutor")
    public void buildAndPostData(String access_token, Consult item, Map<String, String> configMap) {
        // 此处编写上报业务,如拼接数据,然后执行上报。
        // 1、buildConsultData();
        // 2、postData();
    }
}

PS:
1)、以上是代码片段,个人经验认为专门定义一个异步service存放各个异步方法最佳,这样可以避免编码时一些误操作比如异步方法不是void或者是private修饰,导致@Async注解失效的情况,同时可以安排每个注解指向不同的自定义线程池更加灵活;
2)、@Async注解中的名称就是上面定义的自定义线程池名称,这样业务执行时就会从指定线程池中获取异步线程。

异步批量上报数据

@Autowired
private AsyncService asyncService;
 
/**
 * 手动上报问诊记录,线程池方式。
 */
public void manualUploadConsultRecordsAsync(String channel, Date startTime, Date endTime) {
 
    // 查询指定时间内的问诊记录
   List<Consult> consultList = consultService
       .findPaidListByChannelAndTime(channel, startTime, endTime, configMap.get("serviceId"));
 
   if (!CollectionUtils.isEmpty(consultList)) {
 
       log.debug("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录, 一共[{}]条", consultList.size());
 
       consultList.forEach((item) -> {
           try {
               // 异步调用,使用线程池。
               asyncService.buildAndPostData(access_token, item, configMap);
           } catch (Exception ex) {
               log.error("[SendWZDataService][manualUploadConsultRecordsAsync]>>>> 手动上报问诊记录发生异常: ", ex);
           }
       });
   }
}

总结

以上方式已经在生产环境运行,在工作时间内执行过很多次,一次数万条记录基本是几分钟内就全部上报完毕,而正常循环遍历时一次大概需要半个小时左右。

线程池的使用方式往往来源于业务场景,如果类似的业务不存在紧急处理的情况,大体还是以任务调度执行为主,因为更安全。如果存在紧急处理的情况,那么使用SpringBoot+线程池的方式不仅能节省非常多的时间,且不占用主线程的执行空间。

到此这篇关于SpringBoot使用异步线程池实现生产环境批量数据推送的文章就介绍到这了,更多相关SpringBoot 生产环境批量数据推送内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot生产环境和测试环境配置分离的教程详解

    第一步:项目中资源配置文件夹(resources文件夹)下先新增测试环境application-dev.yml和application-prod.yml两个配置文件,分别代表测试环境配置和生产环境配置 第二步:在application.yml配置文件中设置如下配置(PS:active后定义的名字要和配置文件-后的名字一致,如下则系统执行application-dev.yml) spring: profiles: active: dev 第三步:启动项目 启动方式一:idea中 springboo

  • SpringBoot使用异步线程池实现生产环境批量数据推送

    目录 前言 编写线程池配置类 编写异步服务 异步批量上报数据 总结 前言 SpringBoot使用异步线程池: 1.编写线程池配置类,自定义一个线程池: 2.定义一个异步服务: 3.使用@Async注解指向定义的线程池: 这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为.但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用

  • Springboot应用中线程池配置详细教程(最新2021版)

    前言:日常开发中我们常用ThreadPoolExecutor提供的线程池服务帮我们管理线程,在Springboot中更是提供了@Async注解来简化业务逻辑提交到线程池中执行的过程.由于Springboot中默认设置的corePoolSize=1和queyeCapacity=Integer.MAX_VALUE,相当于采用单线程处理所有任务,这就与多线程的目的背道而驰,所以这就要求我们在使用@Async注解时要配置线程池.本文就讲述下Springboot应用下的线程池配置. 背景知识:Spring

  • 基于springcloud异步线程池、高并发请求feign的解决方案

    ScenTaskTestApplication.java package com.test; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.openfeign.EnableFeignClients; /** * @author scen *

  • 浅谈Spring @Async异步线程池用法总结

    本文介绍了Spring @Async异步线程池用法总结,分享给大家,希望对大家有帮助 1. TaskExecutor spring异步线程池的接口类,其实质是Java.util.concurrent.Executor Spring 已经实现的异常线程池: 1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程. 2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作.只适用于不需要多线程的地方 3. Conc

  • Spring Boot使用Spring的异步线程池的实现

    前言 线程池,从名字上来看,就是一个保存线程的"池子",凡事都有其道理,那线程池的好处在哪里呢? 我们要让计算机为我们干一些活,其实都是在使用线程,使用方法就是new一个Runnable接口或者新建一个子类,继承于Thread类,这就会涉及到线程对象的创建与销毁,这两个操作无疑是耗费我们系统处理器资源的,那如何解决这个问题呢? 线程池其实就是为了解决这个问题而生的. 线程池提供了处理系统性能和大用户量请求之间的矛盾的方法,通过对多个任务重用已经存在的线程对象,降低了对线程对象创建和销毁

  • Spring Boot之@Async异步线程池示例详解

    目录 前言 一. Spring异步线程池的接口类 :TaskExecutor 二.简单使用说明 三.定义通用线程池 1.定义线程池 2.异步方法使用线程池 3.通过xml配置定义线程池 四.异常处理 五.问题 前言 很多业务场景需要使用异步去完成,比如:发送短信通知.要完成异步操作一般有两种: 1.消息队列MQ 2.线程池处理. 我们来看看Spring框架中如何去使用线程池来完成异步操作,以及分析背后的原理. 一. Spring异步线程池的接口类 :TaskExecutor 在Spring4中,

  • python3线程池ThreadPoolExecutor处理csv文件数据

    目录 背景 知识点 拓展 库 流程 实现代码 解释 背景 由于不同乙方对服务商业务接口字段理解不一致,导致线上上千万数据量数据存在问题,为了修复数据,通过 Python 脚本进行修改 知识点 Python3.线程池.pymysql.CSV 文件操作.requests 拓展 当我们程序在使用到线程.进程或协程的时候,以下三个知识点可以先做个基本认知 CPU 密集型.IO 密集型.GIL 全局解释器锁 库 pip3 install requests pip3 install pymysql 流程 实

  • SpringBoot 多任务并行+线程池处理的实现

    前言 前几篇文章着重介绍了后端服务数据库和多线程并行处理优化,并示例了改造前后的伪代码逻辑.当然了,优化是无止境的,前人栽树后人乘凉.作为我们开发者来说,既然站在了巨人的肩膀上,就要写出更加优化的程序. SpringBoot开发案例之JdbcTemplate批量操作 SpringBoot开发案例之CountDownLatch多任务并行处理 改造 理论上讲,线程越多程序可能更快,但是在实际使用中我们需要考虑到线程本身的创建以及销毁的资源消耗,以及保护操作系统本身的目的.我们通常需要将线程限制在一定

  • @Async异步线程池以及线程的命名方式

    本文记录@Async的基本使用以及通过实现ThreadFactory来实现对线程的命名. @Async的基本使用 近日有一个道友提出到一个问题,大意如下: 业务场景需要进行批量更新,已有数据id主键.更新的状态.单条更新性能太慢,所以使用in进行批量更新.但是会导致锁表使得其他业务无法访问该表,in的量级太低又导致性能太慢. 龙道友提出了一个解决方案,把要处理的数据分成几个list之后使用多线程进行数据更新.提到多线程可直接使用@Async注解来进行异步操作. 好的,接下来上面的问题我们不予解答

  • SpringBoot异步使用@Async的原理以及线程池配置详解

    目录 前言 使用步骤 配置线程池类参数配置 自定义线程任务 总结 原理刨析 文章参考 前言 在实际项目开发中很多业务场景需要使用异步去完成,比如消息通知,日志记录,等非常常用的都可以通过异步去执行,提高效率,那么在Spring框架中应该如何去使用异步呢 使用步骤 完成异步操作一般有两种,消息队列MQ,和线程池处理ThreadPoolExecutor 而在Spring4中提供的对ThreadPoolExecutor封装的线程池ThreadPoolTaskExecutor,直接使用注解启用@Asyn

随机推荐