SpringBoot 多任务并行+线程池处理的实现

前言

前几篇文章着重介绍了后端服务数据库和多线程并行处理优化,并示例了改造前后的伪代码逻辑。当然了,优化是无止境的,前人栽树后人乘凉。作为我们开发者来说,既然站在了巨人的肩膀上,就要写出更加优化的程序。

SpringBoot开发案例之JdbcTemplate批量操作
SpringBoot开发案例之CountDownLatch多任务并行处理

改造

理论上讲,线程越多程序可能更快,但是在实际使用中我们需要考虑到线程本身的创建以及销毁的资源消耗,以及保护操作系统本身的目的。我们通常需要将线程限制在一定的范围之类,线程池就起到了这样的作用。

程序逻辑

多任务并行+线程池处理.png

一张图能解决的问题,就应该尽可能的少BB,当然底层原理性的东西还是需要大家去记忆并理解的。

Java 线程池

Java通过Executors提供四种线程池,分别为:

  1. newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  2. newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
  3. newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
  4. newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

优点

  1. 重用存在的线程,减少对象创建、消亡的开销,性能佳。
  2. 可有效控制最大并发线程数,提高系统资源的使用率,同时避免过多资源竞争,避免堵塞。
  3. 提供定时执行、定期执行、单线程、并发数控制等功能。

代码实现

方式一(CountDownLatch)

/**
 * 多任务并行+线程池统计
 * 创建时间  2018年4月17日
 */
public class StatsDemo {
  final static SimpleDateFormat sdf = new SimpleDateFormat(
      "yyyy-MM-dd HH:mm:ss");

  final static String startTime = sdf.format(new Date());

  /**
   * IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
   * CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
   * 混合型任务 = 视机器配置和复杂度自测而定
   */
  private static int corePoolSize = Runtime.getRuntime().availableProcessors();
  /**
   * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
   *              TimeUnit unit,BlockingQueue<Runnable> workQueue)
   * corePoolSize用于指定核心线程数量
   * maximumPoolSize指定最大线程数
   * keepAliveTime和TimeUnit指定线程空闲后的最大存活时间
   * workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待
   * 监控队列长度,确保队列有界
   * 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。
   * 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
   * ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。
   */
  private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(1000));

  public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(5);
    //使用execute方法
     executor.execute(new Stats("任务A", 1000, latch));
     executor.execute(new Stats("任务B", 1000, latch));
     executor.execute(new Stats("任务C", 1000, latch));
     executor.execute(new Stats("任务D", 1000, latch));
     executor.execute(new Stats("任务E", 1000, latch));
    latch.await();// 等待所有人任务结束
    System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));
  }

  static class Stats implements Runnable {
    String statsName;
    int runTime;
    CountDownLatch latch;

    public Stats(String statsName, int runTime, CountDownLatch latch) {
      this.statsName = statsName;
      this.runTime = runTime;
      this.latch = latch;
    }

    public void run() {
      try {
        System.out.println(statsName+ " do stats begin at "+ startTime);
        //模拟任务执行时间
        Thread.sleep(runTime);
        System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
        latch.countDown();//单次任务结束,计数器减一
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }
}

方式二(Future)

/**
 * 多任务并行+线程池统计
 * 创建时间  2018年4月17日
 */
public class StatsDemo {
  final static SimpleDateFormat sdf = new SimpleDateFormat(
      "yyyy-MM-dd HH:mm:ss");

  final static String startTime = sdf.format(new Date());

  /**
   * IO密集型任务 = 一般为2*CPU核心数(常出现于线程中:数据库数据交互、文件上传下载、网络数据传输等等)
   * CPU密集型任务 = 一般为CPU核心数+1(常出现于线程中:复杂算法)
   * 混合型任务 = 视机器配置和复杂度自测而定
   */
  private static int corePoolSize = Runtime.getRuntime().availableProcessors();
  /**
   * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,
   *              TimeUnit unit,BlockingQueue<Runnable> workQueue)
   * corePoolSize用于指定核心线程数量
   * maximumPoolSize指定最大线程数
   * keepAliveTime和TimeUnit指定线程空闲后的最大存活时间
   * workQueue则是线程池的缓冲队列,还未执行的线程会在队列中等待
   * 监控队列长度,确保队列有界
   * 不当的线程池大小会使得处理速度变慢,稳定性下降,并且导致内存泄露。如果配置的线程过少,则队列会持续变大,消耗过多内存。
   * 而过多的线程又会 由于频繁的上下文切换导致整个系统的速度变缓——殊途而同归。队列的长度至关重要,它必须得是有界的,这样如果线程池不堪重负了它可以暂时拒绝掉新的请求。
   * ExecutorService 默认的实现是一个无界的 LinkedBlockingQueue。
   */
  private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS,
      new LinkedBlockingQueue<Runnable>(1000));

  public static void main(String[] args) throws InterruptedException {
    List<Future<String>> resultList = new ArrayList<Future<String>>();
    //使用submit提交异步任务,并且获取返回值为future
    resultList.add(executor.submit(new Stats("任务A", 1000)));
    resultList.add(executor.submit(new Stats("任务B", 1000)));
    resultList.add(executor.submit(new Stats("任务C", 1000)));
    resultList.add(executor.submit(new Stats("任务D", 1000)));
    resultList.add(executor.submit(new Stats("任务E", 1000)));
     //遍历任务的结果
    for (Future<String> fs : resultList) {
      try {
        System.out.println(fs.get());//打印各个线任务执行的结果,调用future.get() 阻塞主线程,获取异步任务的返回结果
      } catch (InterruptedException e) {
        e.printStackTrace();
      } catch (ExecutionException e) {
        e.printStackTrace();
      } finally {
        //启动一次顺序关闭,执行以前提交的任务,但不接受新任务。如果已经关闭,则调用没有其他作用。
        executor.shutdown();
      }
    }
    System.out.println("所有的统计任务执行完成:" + sdf.format(new Date()));
  }

  static class Stats implements Callable<String> {
    String statsName;
    int runTime;

    public Stats(String statsName, int runTime) {
      this.statsName = statsName;
      this.runTime = runTime;
    }

    public String call() {
      try {
        System.out.println(statsName+ " do stats begin at "+ startTime);
        //模拟任务执行时间
        Thread.sleep(runTime);
        System.out.println(statsName + " do stats complete at "+ sdf.format(new Date()));
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      return call();
    }
  }
}

执行时间

以上代码,均是伪代码,下面是2000+个学生的真实测试记录。

2018-04-17 17:42:29.284 INFO   测试记录81e51ab031eb4ada92743ddf66528d82-单线程顺序执行,花费时间:3797
2018-04-17 17:42:31.452 INFO   测试记录81e51ab031eb4ada92743ddf66528d82-多线程并行任务,花费时间:2167
2018-04-17 17:42:33.170 INFO   测试记录81e51ab031eb4ada92743ddf66528d82-多线程并行任务+线程池,花费时间:1717

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • spring boot使用自定义的线程池执行Async任务
  • spring boot使用自定义配置的线程池执行Async异步任务
(0)

相关推荐

  • spring boot使用自定义的线程池执行Async任务

    在前面的博客中,//www.jb51.net/article/134866.htm 我们使用了spring boot的异步操作,当时,我们使用的是默认的线程池,但是,如果我们想根据项目来定制自己的线程池了,下面就来说说,如何定制线程池! 一.增加配置属性类 package com.chhliu.springboot.async.configuration; import org.springframework.boot.context.properties.ConfigurationProper

  • spring boot使用自定义配置的线程池执行Async异步任务

    在前面的博客中,http://www.jb51.net/article/106718.htm 我们使用了spring boot的异步操作,当时,我们使用的是默认的线程池,但是,如果我们想根据项目来定制自己的线程池了,下面就来说说,如何定制线程池! 一.增加配置属性类 package com.chhliu.springboot.async.configuration; import org.springframework.boot.context.properties.ConfigurationP

  • SpringBoot 多任务并行+线程池处理的实现

    前言 前几篇文章着重介绍了后端服务数据库和多线程并行处理优化,并示例了改造前后的伪代码逻辑.当然了,优化是无止境的,前人栽树后人乘凉.作为我们开发者来说,既然站在了巨人的肩膀上,就要写出更加优化的程序. SpringBoot开发案例之JdbcTemplate批量操作 SpringBoot开发案例之CountDownLatch多任务并行处理 改造 理论上讲,线程越多程序可能更快,但是在实际使用中我们需要考虑到线程本身的创建以及销毁的资源消耗,以及保护操作系统本身的目的.我们通常需要将线程限制在一定

  • Springboot应用中线程池配置详细教程(最新2021版)

    前言:日常开发中我们常用ThreadPoolExecutor提供的线程池服务帮我们管理线程,在Springboot中更是提供了@Async注解来简化业务逻辑提交到线程池中执行的过程.由于Springboot中默认设置的corePoolSize=1和queyeCapacity=Integer.MAX_VALUE,相当于采用单线程处理所有任务,这就与多线程的目的背道而驰,所以这就要求我们在使用@Async注解时要配置线程池.本文就讲述下Springboot应用下的线程池配置. 背景知识:Spring

  • SpringBoot使用异步线程池实现生产环境批量数据推送

    目录 前言 编写线程池配置类 编写异步服务 异步批量上报数据 总结 前言 SpringBoot使用异步线程池: 1.编写线程池配置类,自定义一个线程池: 2.定义一个异步服务: 3.使用@Async注解指向定义的线程池: 这里以我工作中使用过的一个案例来做描述,我所在公司是医疗行业,敏感数据需要上报到某监管平台,所以有一个定时任务在流量较小时(一般是凌晨后)执行上报行为.但特殊时期会存在一定要在工作时间大批量上报数据的情况,且要求短时间内就要完成,此时就考虑写一个专门的异步上报接口手动执行,利用

  • Springboot 如何使用@Async整合线程池

    Springboot @Async整合线程池 开篇咱们先来聊聊线程池这个概念,或者说为什么要使用线程池:简言之,充分利用cpu资源,提高程序执行时间,但是相反,线程池异常提示.主线程和子线程事务问题也是显而易见的. 那么@Async这个注解又是什么做用呢?其实就是标识方法为异步任务的一个注解,默认会自己维护一个线程池(存在弊端),利用子线程去执行任务:那么如果把这两者结合的话,线程池+Async又会有什么效果呢! 循序渐进 提到线程池,可以采用Executors提供四种线程池下,使用某些特性的场

  • springboot为异步任务规划自定义线程池的实现

    目录 一.Spring Boot任务线程池 二.自定义线程池 三.优雅地关闭线程池 一.Spring Boot任务线程池 线程池的作用 防止资源占用无限的扩张 调用过程省去资源的创建和销毁所占用的时间 在高并发环境下,不断的分配新资源,可能导致系统资源耗尽.所以为了避免这个问题,我们为异步任务规划一个线程池.当然,如果没有配置线程池的话,springboot会自动配置一个ThreadPoolTaskExecutor 线程池到bean当中. # 核心线程数 spring.task.executio

  • 浅谈Java线程池的7大核心参数

    前言 java中经常需要用到多线程来处理一些业务,我不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源.线程上下文切换问题. 同时创建过多的线程也可能引发资源耗尽的风险,这个时候引入线程池比较合理,方便线程任务的管理. java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括: Executor.Executors.ExecutorService.ThreadPoolE

  • Java8并行流中自定义线程池操作示例

    本文实例讲述了Java8并行流中自定义线程池操作.分享给大家供大家参考,具体如下: 1.概览 java8引入了流的概念,流是作为一种对数据执行大量操作的有效方式.并行流可以被包含于支持并发的环境中.这些流可以提高执行性能-以牺牲多线程的开销为代价 在这篇短文中,我们将看一下 Stream API的最大限制,同时看一下如何让并行流和线程池实例(ThreadPool instance)一起工作. 2.并行流Parallel Stream 我们先以一个简单的例子来开始-在任一个Collection类型

  • 深入学习springboot线程池的使用和扩展

    前言 我们常用ThreadPoolExecutor提供的线程池服务,springboot框架提供了@Async注解,帮助我们更方便的将业务逻辑提交到线程池中异步执行,今天我们就来实战体验这个线程池服务: 实战环境 windowns10: jdk1.8: springboot 1.5.9.RELEASE: 开发工具:IntelliJ IDEA: 实战源码 本次实战的源码可以在我的GitHub下载,地址:git@github.com:zq2599/blog_demos.git,项目主页: 这里面有多

  • springboot中@Async默认线程池导致OOM问题

    前言: 1.最近项目上在测试人员压测过程中发现了OOM问题,项目使用springboot搭建项目工程,通过查看日志中包含信息:unable to create new native thread 内存溢出的三种类型: 1.第一种OutOfMemoryError: PermGen space,发生这种问题的原意是程序中使用了大量的jar或class 2.第二种OutOfMemoryError: Java heap space,发生这种问题的原因是java虚拟机创建的对象太多 3.第三种OutOfM

  • SpringBoot+slf4j线程池全链路调用日志跟踪问题及解决思路(二)

    本项目源码已在多个项目中实践 接着上一篇文章,项目中使用了线程池,那么子线程中日志就会丢失traceId,下面讲解如何实现子线程中的traceId日志跟踪. 解决思路 子线程在打印日志的过程中traceId将丢失,解决方式为重写线程池,将主线程的traceId继续传递到子线程中.当然,对于直接new创建线程的情况不考略[实际应用中应该避免这种用法]. 继承ThreadPoolExecutor,重写执行任务的方法 public final class OverrideThreadPoolExecu

随机推荐