新手也能看懂的SpringBoot异步编程指南(简单易懂)

通过本文你可以了解到下面这些知识点:

  • Future 模式介绍以及核心思想
  • 核心线程数、最大线程数的区别,队列容量代表什么;
  • ThreadPoolTaskExecutor 饱和策略;
  • SpringBoot 异步编程实战,搞懂代码的执行逻辑。

Future 模式

异步编程在处理耗时操作以及多任务处理的场景下非常有用,我们可以更好的让我们的系统利用好机器的 CPU 和 内存,提高它们的利用率。多线程设计模式有很多种,Future模式是多线程开发中非常常见的一种设计模式,本文也是基于这种模式来说明 SpringBoot 对于异步编程的知识。

实战之前我先简单介绍一下 Future 模式的核心思想 吧!。

Future 模式的核心思想是 异步调用 。当我们执行一个方法时,假如这个方法中有多个耗时的任务需要同时去做,而且又不着急等待这个结果时可以让客户端立即返回然后,后台慢慢去计算任务。当然你也可以选择等这些任务都执行完了,再返回给客户端。这个在 Java 中都有很好的支持,我在后面的示例程序中会详细对比这两种方式的区别。

SpringBoot 异步编程实战

如果我们需要在 SpringBoot 实现异步编程的话,通过 Spring 提供的两个注解会让这件事情变的非常简单。

  1. @EnableAsync:通过在配置类或者Main类上加@EnableAsync开启对异步方法的支持。
  2. @Async 可以作用在类上或者方法上,作用在类上代表这个类的所有方法都是异步方法。

1. 自定义 TaskExecutor

很多人对于 TaskExecutor 不是太了解,所以我们花一点篇幅先介绍一下这个东西。从名字就能看出它是任务的执行者,它领导执行着线程来处理任务,就像司令官一样,而我们的线程就好比一只只军队一样,这些军队可以异步对敌人进行打击👊。
Spring 提供了TaskExecutor接口作为任务执行者的抽象,它和java.util.concurrent包下的Executor接口很像。稍微不同的 TaskExecutor接口用到了 Java 8 的语法@FunctionalInterface声明这个接口是一个函数式接口。

org.springframework.core.task.TaskExecutor
@FunctionalInterface
public interface TaskExecutor extends Executor {
  void execute(Runnable var1);
}

如果没有自定义Executor, Spring 将创建一个 SimpleAsyncTaskExecutor 并使用它。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

/** @author shuang.kou */
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {

 private static final int CORE_POOL_SIZE = 6;
 private static final int MAX_POOL_SIZE = 10;
 private static final int QUEUE_CAPACITY = 100;

 @Bean
 public Executor taskExecutor() {
  // Spring 默认配置是核心线程数大小为1,最大线程容量大小不受限制,队列容量也不受限制。
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  // 核心线程数
  executor.setCorePoolSize(CORE_POOL_SIZE);
  // 最大线程数
  executor.setMaxPoolSize(MAX_POOL_SIZE);
  // 队列大小
  executor.setQueueCapacity(QUEUE_CAPACITY);
  // 当最大池已满时,此策略保证不会丢失任务请求,但是可能会影响应用程序整体性能。
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
  executor.initialize();
  return executor;
 }
}

ThreadPoolTaskExecutor 常见概念:

  • Core Pool Size : 核心线程数线程数定义了最小可以同时运行的线程数量。
  • Queue Capacity : 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,信任就会被存放在队列中。
  • Maximum Pool Size : 当队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。

一般情况下不会将队列大小设为:Integer.MAX_VALUE,也不会将核心线程数和最大线程数设为同样的大小,这样的话最大线程数的设置都没什么意义了,你也无法确定当前 CPU 和内存利用率具体情况如何。

如果队列已满并且当前同时运行的线程数达到最大线程数的时候,如果再有新任务过来会发生什么呢?

Spring 默认使用的是 ThreadPoolExecutor.AbortPolicy。在Spring的默认情况下,ThreadPoolExecutor  将抛出 RejectedExecutionException 来拒绝新来的任务 ,这代表你将丢失对这个任务的处理。 对于可伸缩的应用程序,建议使用 ThreadPoolExecutor.CallerRunsPolicy。当最大池被填满时,此策略为我们提供可伸缩队列。

ThreadPoolTaskExecutor 饱和策略定义:

如果当前同时运行的线程数量达到最大线程数量时,ThreadPoolTaskExecutor 定义一些策略:

  • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

2. 编写一个异步的方法

下面模拟一个查找对应字符开头电影的方法,我们给这个方法加上了@Async注解来告诉 Spring 它是一个异步的方法。另外,这个方法的返回值 CompletableFuture.completedFuture(results)这代表我们需要返回结果,也就是说程序必须把任务执行完成之后再返回给用户。

请留意completableFutureTask方法中的第一行打印日志这句代码,后面分析程序中会用到,很重要!

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/** @author shuang.kou */
@Service
public class AsyncService {

 private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);

 private List<String> movies =
   new ArrayList<>(
     Arrays.asList(
       "Forrest Gump",
       "Titanic",
       "Spirited Away",
       "The Shawshank Redemption",
       "Zootopia",
       "Farewell ",
       "Joker",
       "Crawl"));

 /** 示范使用:找到特定字符/字符串开头的电影 */
 @Async
 public CompletableFuture<List<String>> completableFutureTask(String start) {
  // 打印日志
  logger.warn(Thread.currentThread().getName() + "start this task!");
  // 找到特定字符/字符串开头的电影
  List<String> results =
    movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
  // 模拟这是一个耗时的任务
  try {
   Thread.sleep(1000L);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  //返回一个已经用给定值完成的新的CompletableFuture。
  return CompletableFuture.completedFuture(results);
 }
}

3. 测试编写的异步方法

/** @author shuang.kou */
@RestController
@RequestMapping("/async")
public class AsyncController {
 @Autowired
 AsyncService asyncService;

 @GetMapping("/movies")
 public String completableFutureTask() throws ExecutionException, InterruptedException {
  //开始时间
  long start = System.currentTimeMillis();
  // 开始执行大量的异步任务
  List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
  List<CompletableFuture<List<String>>> completableFutureList =
    words.stream()
      .map(word -> asyncService.completableFutureTask(word))
      .collect(Collectors.toList());
  // CompletableFuture.join()方法可以获取他们的结果并将结果连接起来
  List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
  // 打印结果以及运行程序运行花费时间
  System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
  return results.toString();
 }
}

请求这个接口,控制台打印出下面的内容:

2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 13:50:17.007  WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task!
Elapsed time: 1010

首先我们可以看到处理所有任务花费的时间大概是 1 s。这与我们自定义的 ThreadPoolTaskExecutor 有关,我们配置的核心线程数是 6 ,然后通过通过下面的代码模拟分配了 6 个任务给系统执行。这样每个线程都会被分配到一个任务,每个任务执行花费时间是 1 s ,所以处理 6 个任务的总花费时间是 1 s。

List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
    words.stream()
      .map(word -> asyncService.completableFutureTask(word))
      .collect(Collectors.toList());

你可以自己验证一下,试着去把核心线程数的数量改为 3 ,再次请求这个接口你会发现处理所有任务花费的时间大概是 2 s。

另外,从上面的运行结果可以看出,当所有任务执行完成之后才返回结果。这种情况对应于我们需要返回结果给客户端请求的情况下,假如我们不需要返回任务执行结果给客户端的话呢? 就比如我们上传一个大文件到系统,上传之后只要大文件格式符合要求我们就上传成功。普通情况下我们需要等待文件上传完毕再返回给用户消息,但是这样会很慢。采用异步的话,当用户上传之后就立马返回给用户消息,然后系统再默默去处理上传任务。这样也会增加一点麻烦,因为文件可能会上传失败,所以系统也需要一点机制来补偿这个问题,比如当上传遇到问题的时候,发消息通知用户。

下面会演示一下客户端不需要返回结果的情况:

将completableFutureTask方法变为 void 类型

@Async
public void completableFutureTask(String start) {
 ......
 //这里可能是系统对任务执行结果的处理,比如存入到数据库等等......
 //doSomeThingWithResults(results);
}

Controller 代码修改如下:

 @GetMapping("/movies")
 public String completableFutureTask() throws ExecutionException, InterruptedException {
  // Start the clock
  long start = System.currentTimeMillis();
  // Kick of multiple, asynchronous lookups
  List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
    words.stream()
      .forEach(word -> asyncService.completableFutureTask(word));
  // Wait until they are all done
  // Print results, including elapsed time
  System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
  return "Done";
 }

请求这个接口,控制台打印出下面的内容:

Elapsed time: 0
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-2start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 14:02:44.052  WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService               : My ThreadPoolTaskExecutor-5start this task!

可以看到系统会直接返回给用户结果,然后系统才真正开始执行任务。

待办

Reference

https://spring.io/guides/gs/async-method/
https://medium.com/trendyol-tech/spring-boot-async-executor-management-with-threadpooltaskexecutor-f493903617d

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • SpringBoot开启异步调用方法

    异步调用无需等待,方法相当于子线程,后台执行,主线程执行完成,子线程开始执行. SpringBoot 开启异步执行仅需两步: 方法上加 @Async @Override @Async @Transactional(rollbackFor = Exception.class) public Integer init(DatePojo datePojo){ //xxxxxxxxxxx 业务略 xxxxxxx log.info(" 起止日期为 : {} , {} ", start, end)

  • SpringBoot异步调用方法并接收返回值

    项目中肯定会遇到异步调用其他方法的场景,比如有个计算过程,需要计算很多个指标的值,但是每个指标计算的效率快慢不同,如果采用同步执行的方式,运行这一个过程的时间是计算所有指标的时间之和.比如: 方法A:计算指标x,指标y,指标z的值,其中计算指标x需要1s,计算指标y需要2s,指标z需要3s.最终执行完方法A就是5s. 现在用异步的方式优化一下 方法A异步调用方法B,方法C,方法D,方法B,方法C,方法D分别计算指标x,指标y,指标z的值,那么最终执行完方法A的时间则是3s. 步骤1:配置线程池,

  • SpringBoot实现定时任务和异步调用

    本文实例为大家分享了SpringBoot实现定时任务和异步调用的具体代码,供大家参考,具体内容如下 环境: jdk1.8:spring boot2.0.2:Maven3.3 摘要说明: 定时任务:定时任务是业务场景中经常出现的一种情况如:定时发送邮件,短信.定时统计监控数据.定时对账等 异步调用:一个都买流程可能包括下单.发货通知.短信推送.消息推送等,其实除了下单这个主要程序是主程序,其他子程序可以同时进行且不影响主程序的运行,这个时候就可以使用异步调用来调用这些子程序: 步骤: 1.定时任务

  • springboot 异步调用的实现方法

    说异步调用前,我们说说它对应的同步调用.通常开发过程中,一般上我们都是同步调用,即:程序按定义的顺序依次执行的过程,每一行代码执行过程必须等待上一行代码执行完毕后才执行.而异步调用指:程序在执行时,无需等待执行的返回值可继续执行后面的代码.显而易见,同步有依赖相关性,而异步没有,所以异步可并发执行,可提高执行效率,在相同的时间做更多的事情. 同步 程序按照定义顺序依次执行,每一行程序都必须等待上一行程序执行完成之后才能执行,就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回. 异步 程

  • SpringBoot异步方法捕捉异常详解

    本文实例为大家分享了SpringBoot异步方法捕捉异常的具体代码,供大家参考,具体内容如下 由于项目中定时器都采用异步执行方式 需要定时监控异步方法执行进度,异常情况 1 执行进度 可以设置是否在执行,内存中添加执行标识即可. 防止多次执行可以通过拦截器对此,标识来判断,防止多次执行定时器 2 异常捕捉 监控异步方法执行是否异常. 1 无返回值 配置AsyncExceptionConfig类,统一处理. 定义异常捕获配置类AsyncExceptionConfig,配置类里面定义SpringAs

  • 简述Springboot @Async 异步方法

    1.异步调用 异步调用就是在不阻塞主线程的情况下执行高耗时方法 2.常规异步 通过开启新线程实现 3.在Springboot中启用异步方法 需要4个注解 1.@EnableAsync 开启异步 2.@Component 注册异步组件 3.@Async 标注异步方法 4.@Autowired 注入异步组件 4.进行一次异步调用 1.首先在一个Config类上标注开启异步 2.然后创建一个异步的组件类,就跟Service,Controller 一样一样的,用Component标注,Service也行

  • 新手也能看懂的SpringBoot异步编程指南(简单易懂)

    通过本文你可以了解到下面这些知识点: Future 模式介绍以及核心思想 核心线程数.最大线程数的区别,队列容量代表什么: ThreadPoolTaskExecutor 饱和策略: SpringBoot 异步编程实战,搞懂代码的执行逻辑. Future 模式 异步编程在处理耗时操作以及多任务处理的场景下非常有用,我们可以更好的让我们的系统利用好机器的 CPU 和 内存,提高它们的利用率.多线程设计模式有很多种,Future模式是多线程开发中非常常见的一种设计模式,本文也是基于这种模式来说明 Sp

  • SpringBoot 如何实现异步编程

    目录 为什么要用异步框架,它解决什么问题? SpringBoot如何实现异步调用? 实现异步调用 为什么要给@Async自定义线程池? 多个线程池处理 配置默认线程池 小结 首先我们来看看在Spring中为什么要使用异步编程,它能解决什么问题? 为什么要用异步框架,它解决什么问题? 在SpringBoot的日常开发中,一般都是同步调用的.但实际中有很多场景非常适合使用异步来处理,如:注册新用户,送100个积分:或下单成功,发送push消息等等. 就拿注册新用户这个用例来说,为什么要异步处理? 第

  • .NET中的异步编程-EAP/APM使用方法及案例介绍

    从.NET 4.5开始,支持的三种异步编程模式: •基于事件的异步编程设计模式 (EAP,Event-based Asynchronous Pattern) •异步编程模型(APM,Asynchronous Programming Model) •基于任务的编程模型(TAP,Task-based Asynchronous Pattern) 基于任务的异步模式 (TAP) 是基于 System.Threading.Tasks 命名空间的 Task 和 Task<TResult>,用于表示任意异步

  • 浅谈JavaScript异步编程

    在一年前初学js的时候,看过很多关于异步编程的讲解.但是由于实践经验少,没有办法理解的太多,太理论的东西也往往是看完就忘. 经过公司的三两个项目的锻炼,终于对js异步编程有了比较具体的理解.但始终入门较浅,在这里就当是给自己一个阶段性的总结. 在异步编程中,一条语句的执行不能依赖上一条语句执行完毕的结果,因为无法预测一条语句什么时候执行完毕,它与代码顺序无关,语句是并发执行的. 例如以下代码: $.get($C.apiPath+'ucenter/padCharge/findMember',{id

  • 浅谈Async和Await如何简化异步编程(几个实例让你彻底明白)

    引言 C#5.0中async和await两个关键字,这两个关键字简化了异步编程,之所以简化了,还是因为编译器给我们做了更多的工作,下面就具体看看编译器到底在背后帮我们做了哪些复杂的工作的. 同步代码存在的问题 对于同步的代码,大家肯定都不陌生,因为我们平常写的代码大部分都是同步的,然而同步代码却存在一个很严重的问题,例如我们向一个Web服务器发出一个请求时,如果我们发出请求的代码是同步实现的话,这时候我们的应用程序就会处于等待状态,直到收回一个响应信息为止,然而在这个等待的状态,对于用户不能操作

  • .net4.5使用async和await异步编程实例

    关于异步编程的简单理解: 在.NET4.5中新增了异步编程的新特性async和await,使得异步编程更为简单.通过特性可以将这项复杂的工作交给编译器来完成了.之前传统的方式来实现异步编程较为复杂,这样对于程序猿来说处理起来比较困难,调试也没那么方便,后续的维护工作也比较痛苦. Async和Await关键字是C#异步编程的核心.通过使用这两个关键字,你可以使用.NET Framework 或 Windows Runtime的资源创建一个异步方法如同创建一个同步方法一样容易. 接下来通过VS201

  • c# 异步编程基础讲解

    现代应用程序广泛使用文件和网络 I/O.I/O 相关 API 传统上默认是阻塞的,导致用户体验和硬件利用率不佳,此类问题的学习和编码的难度也较大.而今基于 Task 的异步 API 和语言级异步编程模式颠覆了传统模式,使得异步编程非常简单,几乎没有新的概念需要学习. 异步代码有如下特点: 在等待 I/O 请求返回的过程中,通过让出线程来处理更多的服务器请求. 通过在等待 I/O 请求时让出线程进行 UI 交互,并将长期运行的工作过渡到其他 CPU,使用户界面的响应性更强. 许多较新的 .NET

  • C#5.0中的异步编程关键字async和await

    一.Asynchronous methods 异步方法 .NET 4.5 的推出,对于C#又有了新特性的增加——就是C#5.0中async和await两个关键字,这两个关键字简化了异步编程. 使用async修饰的方法被称为异步方法,这个方法调用时应该在前面加上await. 异步方法命名应该以Async结尾,这样大家知道调用的时候使用await. async和await关键字只是编译器的功能,编译器最终会用Task类创建代码. 1.创建返回任务的异步方法 建立一个同步方法Greeting,该方法在

  • 教你如何看懂SQL Server查询计划

    对于SQL Server的优化来说,优化查询可能是很常见的事情.由于数据库的优化,本身也是一个涉及面比较的广的话题,因此本文只谈优化查询时如何看懂SQL Server查询计划.毕竟我对SQL Server的认识有限,如有错误,也恳请您在发现后及时批评指正. 首先,打开[SQL Server Management Studio],输入一个查询语句看看SQL Server是如何显示查询计划的吧. 说明:本文所演示的数据库,是我为一个演示程序专用准备的数据库,可以在此网页中下载. select v.O

  • 一篇文章让你看懂IOS中的block为何再也不需要WeakSelf弱引用

    前言: 最近都在折腾Sagit架框的内存释放的问题,所以对这一块有些心得. 对于新手,学到的文章都在教你用:typeof(self) __weak weakSelf = self. 对于老手,可能早习惯了到处了WeakSelf了. 这次,就来学学,如何不用WeakSelf. 1:从引用计数器开始: 这里先设计一个TableBlock类: @interface BlockTable : NSObject typedef void (^AddCellBlock)(); @property (nona

随机推荐