Spring5源码解析之Spring中的异步和计划任务

Java提供了许多创建线程池的方式,并得到一个Future实例来作为任务结果。对于Spring同样小菜一碟,通过其scheduling包就可以做到将任务线程中后台执行。

在本文的第一部分中,我们将讨论下Spring中执行计划任务的一些基础知识。之后,我们将解释这些类是如何一起协作来启动并执行计划任务的。下一部分将介绍计划和异步任务的配置。最后,我们来写个Demo,看看如何通过单元测试来编排计划任务。

什么是Spring中的异步任务?

在我们正式的进入话题之前,对于Spring,我们需要理解下它实现的两个不同的概念:异步任务和调度任务。显然,两者有一个很大的共同点:都在后台工作。但是,它们之间存在了很大差异。调度任务与异步不同,其作用与Linux中的CRON job完全相同(windows里面也有计划任务)。举个栗子,有一个任务必须每40分钟执行一次,那么,可以通过XML文件或者注解来进行此配置。简单的异步任务在后台执行就好,无需配置执行频率。

因为它们是两种不同的任务类型,它们两个的执行者自然也就不同。第一个看起来有点像Java的并发执行器(concurrency executor),这里会有专门去写一篇关于Java中的执行器来具体了解。根据Spring文档TaskExecutor所述,它提供了基于Spring的抽象来处理线程池,这点,也可以通过其类的注释去了解。另一个抽象接口是TaskScheduler,它用于在将来给定的时间点来安排任务,并执行一次或定期执行。

在分析源码的过程中,发现另一个比较有趣的点是触发器。它存在两种类型:CronTrigger或PeriodTrigger。第一个模拟CRON任务的行为。所以我们可以在将来确切时间点提交一个任务的执行。另一个触发器可用于定期执行任务。

Spring的异步任务类

让我们从org.springframework.core.task.TaskExecutor类的分析开始。你会发现,其简单的不行,它是一个扩展Java的Executor接口的接口。它的唯一方法也就是是执行,在参数中使用Runnable类型的任务。

package org.springframework.core.task;
import java.util.concurrent.Executor;
/**
 * Simple task executor interface that abstracts the execution
 * of a {@link Runnable}.
 *
 * <p>Implementations can use all sorts of different execution strategies,
 * such as: synchronous, asynchronous, using a thread pool, and more.
 *
 * <p>Equivalent to JDK 1.5's {@link java.util.concurrent.Executor}
 * interface; extending it now in Spring 3.0, so that clients may declare
 * a dependency on an Executor and receive any TaskExecutor implementation.
 * This interface remains separate from the standard Executor interface
 * mainly for backwards compatibility with JDK 1.4 in Spring 2.x.
 *
 * @author Juergen Hoeller
 * @since 2.0
 * @see java.util.concurrent.Executor
 */
@FunctionalInterface
public interface TaskExecutor extends Executor {
 /**
 * Execute the given {@code task}.
 * <p>The call might return immediately if the implementation uses
 * an asynchronous execution strategy, or might block in the case
 * of synchronous execution.
 * @param task the {@code Runnable} to execute (never {@code null})
 * @throws TaskRejectedException if the given task was not accepted
 */
 @Override
 void execute(Runnable task);
}

相对来说,org.springframework.scheduling.TaskScheduler接口就有点复杂了。它定义了一组以schedule开头的名称的方法允许我们定义将来要执行的任务。所有 schedule* 方法返回java.util.concurrent.ScheduledFuture实例。Spring5中对scheduleAtFixedRate方法做了进一步的充实,其实最终调用的还是ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);

public interface TaskScheduler {
 /**
 * Schedule the given {@link Runnable}, invoking it whenever the trigger
 * indicates a next execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param trigger an implementation of the {@link Trigger} interface,
 * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
 * wrapping a cron expression
 * @return a {@link ScheduledFuture} representing pending completion of the task,
 * or {@code null} if the given Trigger object never fires (i.e. returns
 * {@code null} from {@link Trigger#nextExecutionTime})
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * @see org.springframework.scheduling.support.CronTrigger
 */
 @Nullable
 ScheduledFuture<?> schedule(Runnable task, Trigger trigger);

 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * 使用了默认实现,值得我们学习使用的,Java9中同样可以有私有实现的,从这里我们可以做到我只通过  * 一个接口你来实现,我把其他相应的功能默认实现下,最后调用你自定义实现的接口即可,使接口功能更  * 加一目了然
 * @since 5.0
 * @see #schedule(Runnable, Date)
 */
 default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
 return schedule(task, Date.from(startTime));
 }
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 */
 ScheduledFuture<?> schedule(Runnable task, Date startTime);
...
/**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time
 * and subsequently with the given period.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired first execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @param period the interval between successive executions of the task
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * @since 5.0
 * @see #scheduleAtFixedRate(Runnable, Date, long)
 */
 default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
 return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());
 }
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time
 * and subsequently with the given period.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired first execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @param period the interval between successive executions of the task (in milliseconds)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 */
 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
...
}

之前提到两个触发器组件,都实现了org.springframework.scheduling.Trigger接口。这里,我们只需关注一个的方法nextExecutionTime ,其定义下一个触发任务的执行时间。它的两个实现,CronTrigger和PeriodicTrigger,由org.springframework.scheduling.TriggerContext来实现信息的存储,由此,我们可以很轻松获得一个任务的最后一个执行时间(lastScheduledExecutionTime),给定任务的最后完成时间(lastCompletionTime)或最后一个实际执行时间(lastActualExecutionTime)。接下来,我们通过阅读源代码来简单的了解下这些东西。org.springframework.scheduling.concurrent.ConcurrentTaskScheduler包含一个私有类EnterpriseConcurrentTriggerScheduler。在这个class里面,我们可以找到schedule方法:

public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {
 ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
 return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
  @Override
  public Date getNextRunTime(LastExecution le, Date taskScheduledTime) {
   return trigger.nextExecutionTime(le != null ?
    new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
    new SimpleTriggerContext());
  }
  @Override
  public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
   return false;
  }
 });
}

SimpleTriggerContext从其名字就可以看到很多东西了,因为它实现了TriggerContext接口。

/**
 * Simple data holder implementation of the {@link TriggerContext} interface.
 *
 * @author Juergen Hoeller
 * @since 3.0
 */
public class SimpleTriggerContext implements TriggerContext {
 @Nullable
 private volatile Date lastScheduledExecutionTime;
 @Nullable
 private volatile Date lastActualExecutionTime;
 @Nullable
 private volatile Date lastCompletionTime;
...
 /**
 * Create a SimpleTriggerContext with the given time values.
 * @param lastScheduledExecutionTime last <i>scheduled</i> execution time
 * @param lastActualExecutionTime last <i>actual</i> execution time
 * @param lastCompletionTime last completion time
 */
 public SimpleTriggerContext(Date lastScheduledExecutionTime, Date lastActualExecutionTime, Date lastCompletionTime) {
 this.lastScheduledExecutionTime = lastScheduledExecutionTime;
 this.lastActualExecutionTime = lastActualExecutionTime;
 this.lastCompletionTime = lastCompletionTime;
 }
 ...
}

也正如你看到的,在构造函数中设置的时间值来自javax.enterprise.concurrent.LastExecution的实现,其中:

  • getScheduledStart:返回上次开始执行任务的时间。它对应于TriggerContext的lastScheduledExecutionTime属性。
  • getRunStart:返回给定任务开始运行的时间。在TriggerContext中,它对应于lastActualExecutionTime。
  • getRunEnd:任务终止时返回。它用于在TriggerContext中设置lastCompletionTime。

Spring调度和异步执行中的另一个重要类是org.springframework.core.task.support.TaskExecutorAdapter。它是一个将java.util.concurrent.Executor作为Spring基本的执行器的适配器(描述的有点拗口,看下面代码就明了了),之前已经描述了TaskExecutor。实际上,它引用了Java的ExecutorService,它也是继承了Executor接口。此引用用于完成所有提交的任务。

/**
 * Adapter that takes a JDK {@code java.util.concurrent.Executor} and
 * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
 * Also detects an extended {@code java.util.concurrent.ExecutorService 从此解释上面的说明}, adapting
 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
 *
 * @author Juergen Hoeller
 * @since 3.0
 * @see java.util.concurrent.Executor
 * @see java.util.concurrent.ExecutorService
 * @see java.util.concurrent.Executors
 */
public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
 private final Executor concurrentExecutor;
 @Nullable
 private TaskDecorator taskDecorator;
 ...
  /**
 * Create a new TaskExecutorAdapter,
 * using the given JDK concurrent executor.
 * @param concurrentExecutor the JDK concurrent executor to delegate to
 */
 public TaskExecutorAdapter(Executor concurrentExecutor) {
 Assert.notNull(concurrentExecutor, "Executor must not be null");
 this.concurrentExecutor = concurrentExecutor;
 }
  /**
 * Delegates to the specified JDK concurrent executor.
 * @see java.util.concurrent.Executor#execute(Runnable)
 */
 @Override
 public void execute(Runnable task) {
 try {
  doExecute(this.concurrentExecutor, this.taskDecorator, task);
 }
 catch (RejectedExecutionException ex) {
  throw new TaskRejectedException(
   "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
 }
 }
 @Override
 public void execute(Runnable task, long startTimeout) {
 execute(task);
 }
 @Override
 public Future<?> submit(Runnable task) {
 try {
  if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
  return ((ExecutorService) this.concurrentExecutor).submit(task);
  }
  else {
  FutureTask<Object> future = new FutureTask<>(task, null);
  doExecute(this.concurrentExecutor, this.taskDecorator, future);
  return future;
  }
 }
 catch (RejectedExecutionException ex) {
  throw new TaskRejectedException(
   "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
 }
 }
 ...
}

在Spring中配置异步和计划任务

下面我们通过代码的方式来实现异步任务。首先,我们需要通过注解来启用配置。它的XML配置如下:

<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="2" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
<context:component-scan base-package="com.migo.async"/>

可以通过将@EnableScheduling和@EnableAsync注解添加到配置类(用@Configuration注解)来激活两者。完事,我们就可以开始着手实现调度和异步任务。为了实现调度任务,我们可以使用@Scheduled注解。我们可以从org.springframework.scheduling.annotation包中找到这个注解。它包含了以下几个属性:

  • cron:使用CRON风格(Linux配置定时任务的风格)的配置来配置需要启动的带注解的任务。
  • zone:要解析CRON表达式的时区。
  • fixedDelay或fixedDelayString:在固定延迟时间后执行任务。即任务将在最后一次调用结束和下一次调用的开始之间的这个固定时间段后执行。
  • fixedRate或fixedRateString:使用fixedRate注解的方法的调用将以固定的时间段(例如:每10秒钟)进行,与执行生命周期(开始,结束)无关。
  • initialDelay或initialDelayString:延迟首次执行调度方法的时间。请注意,所有值(fixedDelay ,fixedRate ,initialDelay )必须以毫秒表示。 需要特别记住的是 ,用@Scheduled注解的方法不能接受任何参数,并且不返回任何内容(void),如果有返回值,返回值也会被忽略掉的,没什么卵用。定时任务方法由容器管理,而不是由调用者在运行时调用。它们由 org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor来解析,其中包含以下方法来拒绝执行所有不正确定义的函数:
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
 try {
  Assert.isTrue(method.getParameterCount() == 0,
   "Only no-arg methods may be annotated with @Scheduled");
 /**
 *  之前的版本中直接把返回值非空的给拒掉了,在Spring 4.3 Spring5 的版本中就没那么严格了
   * Assert.isTrue(void.class.equals(method.getReturnType()),
   *        "Only void-returning methods may be annotated with @Scheduled");
   **/
// ...
/**
 * 注释很重要
 * An annotation that marks a method to be scheduled. Exactly one of
 * the {@link #cron()}, {@link #fixedDelay()}, or {@link #fixedRate()}
 * attributes must be specified.
 *
 * <p>The annotated method must expect no arguments. It will typically have
 * a {@code void} return type; if not, the returned value will be ignored
 * when called through the scheduler.
 *
 * <p>Processing of {@code @Scheduled} annotations is performed by
 * registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be
 * done manually or, more conveniently, through the {@code <task:annotation-driven/>}
 * element or @{@link EnableScheduling} annotation.
 *
 * <p>This annotation may be used as a <em>meta-annotation</em> to create custom
 * <em>composed annotations</em> with attribute overrides.
 *
 * @author Mark Fisher
 * @author Dave Syer
 * @author Chris Beams
 * @since 3.0
 * @see EnableScheduling
 * @see ScheduledAnnotationBeanPostProcessor
 * @see Schedules
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
...
}

使用@Async注解标记一个方法或一个类(通过标记一个类,我们自动将其所有方法标记为异步)。与@Scheduled不同,异步任务可以接受参数,并可能返回某些东西。

写一个在Spring中执行异步任务的Demo

有了上面这些知识,我们可以来编写异步和计划任务。我们将通过测试用例来展示。我们从不同的任务执行器(task executors)的测试开始:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:applicationContext-test.xml"})
@WebAppConfiguration
public class TaskExecutorsTest {

 @Test
 public void simpeAsync() throws InterruptedException {
  /**
   * SimpleAsyncTaskExecutor creates new Thread for every task and executes it asynchronously. The threads aren't reused as in
   * native Java's thread pools.
   *
   * The number of concurrently executed threads can be specified through concurrencyLimit bean property
   * (concurrencyLimit XML attribute). Here it's more simple to invoke setConcurrencyLimit method.
   * Here the tasks will be executed by 2 simultaneous threads. Without specifying this value,
   * the number of executed threads will be indefinite.
   *
   * You can observe that only 2 tasks are executed at a given time - even if 3 are submitted to execution (lines 40-42).
   **/
  SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("thread_name_prefix_____");
  executor.setConcurrencyLimit(2);
  executor.execute(new SimpleTask("SimpleAsyncTask-1", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-2", Counters.simpleAsyncTask, 1000));

  Thread.sleep(1050);
  assertTrue("2 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 2);

  executor.execute(new SimpleTask("SimpleAsyncTask-3", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-4", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-5", Counters.simpleAsyncTask, 2000));

  Thread.sleep(1050);
  assertTrue("4 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 4);
  executor.execute(new SimpleTask("SimpleAsyncTask-6", Counters.simpleAsyncTask, 1000));

  Thread.sleep(1050);
  assertTrue("6 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead",
   Counters.simpleAsyncTask.getNb() == 6);
 }

 @Test
 public void syncTaskTest() {
  /**
   * SyncTask works almost as Java's CountDownLatch. In fact, this executor is synchronous with the calling thread. In our case,
   * SyncTaskExecutor tasks will be synchronous with JUnit thread. It means that the testing thread will sleep 5
   * seconds after executing the third task ('SyncTask-3'). To prove that, we check if the total execution time is ~5 seconds.
   **/
  long start = System.currentTimeMillis();
  SyncTaskExecutor executor = new SyncTaskExecutor();
  executor.execute(new SimpleTask("SyncTask-1", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-2", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-3", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-4", Counters.syncTask, 5000));
  executor.execute(new SimpleTask("SyncTask-5", Counters.syncTask, 0));
  long end = System.currentTimeMillis();
  int execTime = Math.round((end-start)/1000);
  assertTrue("Execution time should be 5 seconds but was "+execTime+" seconds", execTime == 5);
 }

 @Test
 public void threadPoolTest() throws InterruptedException {
  /**
   * This executor can be used to expose Java's native ThreadPoolExecutor as Spring bean, with the
   * possibility to set core pool size, max pool size and queue capacity through bean properties.
   *
   * It works exactly as ThreadPoolExecutor from java.util.concurrent package. It means that our pool starts
   * with 2 threads (core pool size) and can be growth until 3 (max pool size).
   * In additionally, 1 task can be stored in the queue. This task will be treated
   * as soon as one from 3 threads ends to execute provided task. In our case, we try to execute 5 tasks
   * in 3 places pool and 1 place queue. So the 5th task should be rejected and TaskRejectedException should be thrown.
   **/
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(2);
  executor.setMaxPoolSize(3);
  executor.setQueueCapacity(1);
  executor.initialize();

  executor.execute(new SimpleTask("ThreadPoolTask-1", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-2", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-3", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-4", Counters.threadPool, 1000));
  boolean wasTre = false;
  try {
   executor.execute(new SimpleTask("ThreadPoolTask-5", Counters.threadPool, 1000));
  } catch (TaskRejectedException tre) {
   wasTre = true;
  }
  assertTrue("The last task should throw a TaskRejectedException but it wasn't", wasTre);

  Thread.sleep(3000);

  assertTrue("4 tasks should be terminated, but "+Counters.threadPool.getNb()+" were instead",
   Counters.threadPool.getNb()==4);
 }

}

class SimpleTask implements Runnable {
 private String name;
 private Counters counter;
 private int sleepTime;

 public SimpleTask(String name, Counters counter, int sleepTime) {
  this.name = name;
  this.counter = counter;
  this.sleepTime = sleepTime;
 }

 @Override
 public void run() {
  try {
   Thread.sleep(this.sleepTime);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  this.counter.increment();
  System.out.println("Running task '"+this.name+"' in Thread "+Thread.currentThread().getName());
 }

 @Override
 public String toString() {
     return "Task {"+this.name+"}";
 }
}

enum Counters {

 simpleAsyncTask(0),
 syncTask(0),
 threadPool(0);

 private int nb;

 public int getNb() {
  return this.nb;
 }

 public synchronized void increment() {
  this.nb++;
 }

 private Counters(int n) {
  this.nb = n;
 }
}

在过去,我们可以有更多的执行器可以使用(SimpleThreadPoolTaskExecutor,TimerTaskExecutor 这些都2.x 3.x的老古董了)。但都被弃用并由本地Java的执行器取代成为Spring的首选。看看输出的结果:

Running task 'SimpleAsyncTask-1' in Thread thread_name_prefix_____1
Running task 'SimpleAsyncTask-2' in Thread thread_name_prefix_____2
Running task 'SimpleAsyncTask-3' in Thread thread_name_prefix_____3
Running task 'SimpleAsyncTask-4' in Thread thread_name_prefix_____4
Running task 'SimpleAsyncTask-5' in Thread thread_name_prefix_____5
Running task 'SimpleAsyncTask-6' in Thread thread_name_prefix_____6
Running task 'SyncTask-1' in Thread main
Running task 'SyncTask-2' in Thread main
Running task 'SyncTask-3' in Thread main
Running task 'SyncTask-4' in Thread main
Running task 'SyncTask-5' in Thread main
Running task 'ThreadPoolTask-2' in Thread ThreadPoolTaskExecutor-2
Running task 'ThreadPoolTask-1' in Thread ThreadPoolTaskExecutor-1
Running task 'ThreadPoolTask-4' in Thread ThreadPoolTaskExecutor-3
Running task 'ThreadPoolTask-3' in Thread ThreadPoolTaskExecutor-2

以此我们可以推断出,第一个测试为每个任务创建新的线程。通过使用不同的线程名称,我们可以看到相应区别。第二个,同步执行器,应该执行所调用线程中的任务。这里可以看到'main'是主线程的名称,它的主线程调用执行同步所有任务。最后一种例子涉及最大可创建3个线程的线程池。从结果可以看到,他们也确实只有3个创建线程。

现在,我们将编写一些单元测试来看看@Async和@Scheduled实现。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:applicationContext-test.xml"})
@WebAppConfiguration
public class AnnotationTest {

 @Autowired
 private GenericApplicationContext context;

 @Test
 public void testScheduled() throws InterruptedException {

   System.out.println("Start sleeping");
   Thread.sleep(6000);
   System.out.println("Wake up !");

   TestScheduledTask scheduledTask = (TestScheduledTask) context.getBean("testScheduledTask");
    /**
    * Test fixed delay. It's executed every 6 seconds. The first execution is registered after application's context start.
    **/
   assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedDelayCounter(),
    scheduledTask.getFixedDelayCounter() == 2);

    /**
    * Test fixed rate. It's executed every 6 seconds. The first execution is registered after application's context start.
    * Unlike fixed delay, a fixed rate configuration executes one task with specified time. For example, it will execute on
    * 6 seconds delayed task at 10:30:30, 10:30:36, 10:30:42 and so on - even if the task 10:30:30 taken 30 seconds to
    * be terminated. In teh case of fixed delay, if the first task takes 30 seconds, the next will be executed 6 seconds
    * after the first one, so the execution flow will be: 10:30:30, 10:31:06, 10:31:12.
    **/
   assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedRateCounter(),
    scheduledTask.getFixedRateCounter() == 2);
    /**
    * Test fixed rate with initial delay attribute. The initialDelay attribute is set to 6 seconds. It causes that
    * scheduled method is executed 6 seconds after application's context start. In our case, it should be executed
    * only once because of previous Thread.sleep(6000) invocation.
    **/
   assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getInitialDelayCounter(), scheduledTask.getInitialDelayCounter() == 1);
    /**
    * Test cron scheduled task. Cron is scheduled to be executed every 6 seconds. It's executed only once,
    * so we can deduce that it's not invoked directly before applications
    * context start, but only after configured time (6 seconds in our case).
    **/
   assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getCronCounter(), scheduledTask.getCronCounter() == 1);
 }

 @Test
 public void testAsyc() throws InterruptedException {
    /**
    * To test @Async annotation, we can create a bean in-the-fly. AsyncCounter bean is a
    * simple counter which value should be equals to 2 at the end of the test. A supplementary test
    * concerns threads which execute both of AsyncCounter methods: one which
    * isn't annotated with @Async and another one which is annotated with it. For the first one, invoking
    * thread should have the same name as the main thread. For annotated method, it can't be executed in
    * the main thread. It must be executed asynchrounously in a new thread.
    **/
   context.registerBeanDefinition("asyncCounter", new RootBeanDefinition(AsyncCounter.class));

   String currentName = Thread.currentThread().getName();
   AsyncCounter asyncCounter = context.getBean("asyncCounter", AsyncCounter.class);
   asyncCounter.incrementNormal();
   assertTrue("Thread executing normal increment should be the same as JUnit thread but it wasn't (expected '"+currentName+"', got '"+asyncCounter.getNormalThreadName()+"')",
           asyncCounter.getNormalThreadName().equals(currentName));
   asyncCounter.incrementAsync();
   // sleep 50ms and give some time to AsyncCounter to update asyncThreadName value
   Thread.sleep(50);

   assertFalse("Thread executing @Async increment shouldn't be the same as JUnit thread but it wasn (JUnit thread '"+currentName+"', @Async thread '"+asyncCounter.getAsyncThreadName()+"')",
           asyncCounter.getAsyncThreadName().equals(currentName));
   System.out.println("Main thread execution's name: "+currentName);
   System.out.println("AsyncCounter normal increment thread execution's name: "+asyncCounter.getNormalThreadName());
   System.out.println("AsyncCounter @Async increment thread execution's name: "+asyncCounter.getAsyncThreadName());
   assertTrue("Counter should be 2, but was "+asyncCounter.getCounter(), asyncCounter.getCounter()==2);
 }

}

class AsyncCounter {

 private int counter = 0;
 private String normalThreadName;
 private String asyncThreadName;

 public void incrementNormal() {
  normalThreadName = Thread.currentThread().getName();
  this.counter++;
 }

 @Async
 public void incrementAsync() {
  asyncThreadName = Thread.currentThread().getName();
  this.counter++;
 }

 public String getAsyncThreadName() {
  return asyncThreadName;
 }

 public String getNormalThreadName() {
  return normalThreadName;
 }

 public int getCounter() {
  return this.counter;
 }

}

另外,我们需要创建新的配置文件和一个包含定时任务方法的类:

<!-- imported configuration file first -->
<!-- Activates various annotations to be detected in bean classes -->
<context:annotation-config />

<!-- Scans the classpath for annotated components that will be auto-registered as Spring beans.
 For example @Controller and @Service. Make sure to set the correct base-package-->
<context:component-scan base-package="com.migo.test.schedulers" />

<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="40" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
// scheduled methods after, all are executed every 6 seconds (scheduledAtFixedRate and scheduledAtFixedDelay start to execute at
// application context start, two other methods begin 6 seconds after application's context start)
@Component
public class TestScheduledTask {

 private int fixedRateCounter = 0;
 private int fixedDelayCounter = 0;
 private int initialDelayCounter = 0;
 private int cronCounter = 0;

 @Scheduled(fixedRate = 6000)
 public void scheduledAtFixedRate() {
  System.out.println("<R> Increment at fixed rate");
  fixedRateCounter++;
 }

 @Scheduled(fixedDelay = 6000)
 public void scheduledAtFixedDelay() {
  System.out.println("<D> Incrementing at fixed delay");
  fixedDelayCounter++;
 }

 @Scheduled(fixedDelay = 6000, initialDelay = 6000)
 public void scheduledWithInitialDelay() {
  System.out.println("<DI> Incrementing with initial delay");
  initialDelayCounter++;
 }

 @Scheduled(cron = "**/6 ** ** ** ** **")
 public void scheduledWithCron() {
  System.out.println("<C> Incrementing with cron");
  cronCounter++;

 }

 public int getFixedRateCounter() {
  return this.fixedRateCounter;
 }

 public int getFixedDelayCounter() {
  return this.fixedDelayCounter;
 }

 public int getInitialDelayCounter() {
  return this.initialDelayCounter;
 }

 public int getCronCounter() {
  return this.cronCounter;
 }

}

该测试的输出:

<R> Increment at fixed rate
<D> Incrementing at fixed delay
Start sleeping
<C> Incrementing with cron
<DI> Incrementing with initial delay
<R> Increment at fixed rate
<D> Incrementing at fixed delay
Wake up !
Main thread execution's name: main
AsyncCounter normal increment thread execution's name: main
AsyncCounter @Async increment thread execution's name: taskExecutor-1

本文向我们介绍了关于Spring框架另一个大家比较感兴趣的功能–定时任务。我们可以看到,与Linux CRON风格配置类似,这些任务同样可以按照固定的频率进行定时任务的设置。我们还通过例子证明了使用@Async注解的方法会在不同线程中执行。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解在Spring3中使用注解(@Scheduled)创建计划任务

    Spring3中加强了注解的使用,其中计划任务也得到了增强,现在创建一个计划任务只需要两步就完成了: 创建一个Java类,添加一个无参无返回值的方法,在方法上用@Scheduled注解修饰一下: 在Spring配置文件中添加三个<task:**** />节点: 最后说明一下,第一步创建的Java类要成为spring可管理的Bean,可以直接写在XML里,也可以@Component一下 示例如下 计划任务类: /** * com.zywang.spring.task.SpringTaskDemo

  • 浅谈Spring @Async异步线程池用法总结

    本文介绍了Spring @Async异步线程池用法总结,分享给大家,希望对大家有帮助 1. TaskExecutor spring异步线程池的接口类,其实质是Java.util.concurrent.Executor Spring 已经实现的异常线程池: 1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程. 2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作.只适用于不需要多线程的地方 3. Conc

  • Spring5源码解析之Spring中的异步和计划任务

    Java提供了许多创建线程池的方式,并得到一个Future实例来作为任务结果.对于Spring同样小菜一碟,通过其scheduling包就可以做到将任务线程中后台执行. 在本文的第一部分中,我们将讨论下Spring中执行计划任务的一些基础知识.之后,我们将解释这些类是如何一起协作来启动并执行计划任务的.下一部分将介绍计划和异步任务的配置.最后,我们来写个Demo,看看如何通过单元测试来编排计划任务. 什么是Spring中的异步任务? 在我们正式的进入话题之前,对于Spring,我们需要理解下它实

  • Android源码解析onResume方法中获取不到View宽高

    目录 前言 问题1.为什么onCreate和onResume中获取不到view的宽高? 问题2.为什么View.post为什么可以获取View宽高? 结论 前言 有一个经典的问题,我们在Activity的onCreate中可以获取View的宽高吗?onResume中呢? 对于这类八股问题,只要看过都能很容易得出答案:不能. 紧跟着追问一个,那为什么View.post为什么可以获取View宽高? 今天来看看这些问题,到底为何? 今日份问题: 为什么onCreate和onResume中获取不到vie

  • Android源码解析之截屏事件流程

    今天这篇文章我们主要讲一下Android系统中的截屏事件处理流程.用过android系统手机的同学应该都知道,一般的android手机按下音量减少键和电源按键就会触发截屏事件(国内定制机做个修改的这里就不做考虑了).那么这里的截屏事件是如何触发的呢?触发之后android系统是如何实现截屏操作的呢?带着这两个问题,开始我们的源码阅读流程. 我们知道这里的截屏事件是通过我们的按键操作触发的,所以这里就需要我们从android系统的按键触发模块开始看起,由于我们在不同的App页面,操作音量减少键和电

  • ClassLoader类加载源码解析

    Java类加载器 1.BootClassLoader: 用于加载Android Framework层class文件. 2.PathClassLoader: 用于Android应用程序类加载器.可以加载指定的dex,jar.zip.zpk中的classes.dex 3.DexClassLoader:加载指定的dex,以及jar.zip.apk中的classes.dex 源码解析 1.ClassLoader中提供loadClass用于加载指定类 //ClassLoader.java public C

  • Spring源码解析容器初始化构造方法

    目录 前言 构造方法 前言 Spring框架被广泛应用于我们的日常工作中,但是很长时间以来我都是只会使用,不懂它的作用原理.通过最近一段时间的阅读源码,个人发现通过阅读源码,能够帮助我们了解Spring的设计理念,并且对Java编程中的一些设计模式更加熟悉,所以记录一下自己对Spring源码的理解. 在开始进行源码学习前,首先再回顾一下三种Spring编程风格: 基于Schema,即通过xml标签的配置方式 基于Annotation的注解技术,使用@Component等注解配置bean 基于Ja

  • Spring SpringMVC在启动完成后执行方法源码解析

    关键字:spring容器加载完毕做一件事情(利用ContextRefreshedEvent事件) 应用场景:很多时候我们想要在某个类加载完毕时干某件事情,但是使用了spring管理对象,我们这个类引用了其他类(可能是更复杂的关联),所以当我们去使用这个类做事情时发现包空指针错误,这是因为我们这个类有可能已经初始化完成,但是引用的其他类不一定初始化完成,所以发生了空指针错误,解决方案如下: 1.写一个类继承spring的ApplicationListener监听,并监控ContextRefresh

  • spring获取bean的源码解析

    介绍 前面一章说了AbstractApplicationContext中的refresh方法中的invokeBeanFactoryPostProcessors.主要是调用BeanFactoryPostProcessor.其中也有获取bean的过程,就是beanFactory.getBean的方法.这一章就说下getBean这个方法.由于spring中获取bean的方法比较复杂,涉及到的流程也非常多,这一章就先说下整个大体的流程.其中的细节会在后面也会慢慢说. 源码 直接看源码吧 @Overrid

  • Spring Security源码解析之权限访问控制是如何做到的

    〇.前文回顾 在实战篇<话说Spring Security权限管理(源码详解)>我们学习了Spring Security强大的访问控制能力,只需要进行寥寥几行的配置就能做到权限的控制,本篇来看看它到底是如何做到的. 一.再聊过滤器链 源码篇中反复提到,请求进来需要经过的是一堆过滤器形成的过滤器链,走完过滤器链未抛出异常则可以继续访问后台接口资源,而最后一个过滤器就是来判断请求是否有权限继续访问后台资源,如果没有则会将拒绝访问的异常往上向异常过滤器抛,异常过滤器会对异常进行翻译,然后响应给客户端

  • Spring源码解析之Configuration

    一.@Configuration 1.1 未加@Configuration <!--logback-test.xml,配置不打印日志--> <?xml version="1.0" encoding="UTF-8"?> <configuration> <include resource="org/springframework/boot/logging/logback/base.xml" /> <

  • Spring源码解析之事务传播特性

    一.使用方式 可以采用Transactional,配置propagation即可. 打开org.springframework.transaction.annotation.Transactional可见默认传播特性是REQUIRED. /** * The transaction propagation type. * <p>Defaults to {@link Propagation#REQUIRED}. * @see org.springframework.transaction.inte

随机推荐