Spring动态管理定时任务之ThreadPoolTaskScheduler解读

目录
  • Spring动态管理定时任务ThreadPoolTaskScheduler
    • 实现思路
  • ThreadPoolTaskScheduler 定时任务实现
  • 总结

Spring动态管理定时任务ThreadPoolTaskScheduler

Spring任务调度核心类ThreadPoolTaskScheduler,API文档解释如下:

Implementation of Spring's TaskScheduler interface, wrapping a native java.util.concurrent.ScheduledThreadPoolExecutor.

Spring的TaskScheduler接口的实现,包装了一个本地java.util.concurrent.ScheduledThreadPoolExecutor。

实现思路

注入调度类bean,初始化一个ConcurrentHashMap容器,用来保存多个定时任务的状态,每一个任务的运行状态被封装在ScheduledFuture中,借此类可取消对应的定时任务。

import java.time.LocalDateTime;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

import javax.annotation.Resource;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.cjia.spidercommon.model.SpiderJob;
import com.cjia.spiderjob.mapper.SpiderJobMapper;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;

/**
 * 用来管理(启动、停止、新增、删除、更新编辑、查看运行状态)定时任务(增量任务)
 */
@Slf4j
@RestController
@RequestMapping("spiderJob/cron")
public class CronJobController extends SpiderJobController {

    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;

    private Map<Integer, ScheduledFuture<?>> futureMap = new ConcurrentHashMap<>();

    @Resource
    private SpiderJobMapper spiderJobMapper;

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        return new ThreadPoolTaskScheduler();
    }

    /**
     * 启动单个定时任务
     */
    @RequestMapping("/start/{jobId}")
    public String start(@PathVariable Integer jobId) {
        SpiderJob job = spiderJobMapper.selectById(jobId);
        if (job == null) {
            log.warn("任务[{}]已不存在,无法启动!", jobId);
            return "任务[" + jobId + "]已不存在,无法启动!";
        }
        int enable = job.getEnable();
        if (enable == 0) {
            log.warn("任务[{}]已被禁用,无法启动!", jobId);
            return "任务[" + jobId + "]已被禁用,无法启动!";
        }
        // 检测该任务是否已在运行调度中
        if (futureMap.get(jobId) != null) {
            log.warn("任务[{}]已在调度运行,无法重复启动!", jobId);
            return "任务[" + jobId + "]已在调度运行,无法重复启动!";
        }
        String cron = job.getCron();
        // TODO check cron
        ScheduledFuture<?> future = threadPoolTaskScheduler.schedule(new MyRunnable(job), new CronTrigger(cron));
        log.info("任务[{}]已被启动!", jobId);
        futureMap.put(jobId, future);
        return "任务[" + jobId + "]已被启动!";
    }

    /**
     * 批量启动定时任务
     */
    @RequestMapping("/startBatch/{jobIds}")
    public String startBatch(@PathVariable String jobIds) {
        // TODO jobIds valid
        String[] jobIdsArr = jobIds.split(",");
        StringBuffer sb = new StringBuffer();
        for (String jobId : jobIdsArr) {
            String result = start(Integer.valueOf(jobId));
            sb.append(result).append("<br>");
        }
        return sb.toString();
    }

    /**
     * 停止单个定时任务
     */
    @RequestMapping("/stop/{jobId}")
    public String stop(@PathVariable Integer jobId) {
        // 检测该任务是否已在运行调度中
        ScheduledFuture<?> future = futureMap.get(jobId);
        if (future == null) {
            log.warn("任务[{}]已不在调度中,无法停止!", jobId);
            return "任务[" + jobId + "]已不在调度中,无法停止!";
        } else {
            future.cancel(true);
            futureMap.remove(jobId);
            log.info("任务[{}]已被停止!", jobId);
            return "任务[" + jobId + "]已被停止!";
        }
    }

    /**
     * 批量停止定时任务
     */
    @RequestMapping("/stopBatch/{jobIds}")
    public String stopBatch(@PathVariable String jobIds) {
        // TODO jobIds valid
        String[] jobIdsArr = jobIds.split(",");
        StringBuffer sb = new StringBuffer();
        for (String jobId : jobIdsArr) {
            String result = stop(Integer.valueOf(jobId));
            sb.append(result).append("<br>");
        }
        return sb.toString();
    }

    /**
     * 查看当前时刻调度中的定时任务
     */
    @RequestMapping("/status")
    public String getAllStatus() {
        Set<Integer> runningKeys = futureMap.keySet();
        return "当前正在调度的任务列表:" + runningKeys.toString();
    }

    @Data
    private class MyRunnable implements Runnable {
        private SpiderJob job;

        public MyRunnable(SpiderJob job) {
            this.job = job;
        }

        @Override
        public void run() {
            log.info("运行定时任务[{}: {}] at {}!", job.getId(), job.getBizName(), LocalDateTime.now());
            executeIncrementJob(job.getBizName());
        }
    }

}

ThreadPoolTaskScheduler 定时任务实现

org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler定时任务调度线程池

CREATE TABLE `sys_job` (
  `id` bigint(20) NOT NULL COMMENT '任务key',
  `job_name` varchar(64) NOT NULL COMMENT '任务名称',
  `bean_class` varchar(128) NOT NULL COMMENT '类路径',
  `cron_expression` varchar(64) NOT NULL COMMENT 'cron表达式',
  `status` tinyint(1) NOT NULL COMMENT '状态值 @JobStatusEnum 详见具体枚举类',
  `is_deleted` tinyint(1) DEFAULT '0' COMMENT '删除标识 1是 0否',
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
@Configuration
@Slf4j
public class SchedulingConfigure {

    @Bean
    public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
        log.info("开始创建定时任务调度线程池");
        ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
        threadPoolTaskScheduler.setPoolSize(20);
        threadPoolTaskScheduler.setThreadNamePrefix("schedule-task-");
        threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
        threadPoolTaskScheduler.setAwaitTerminationSeconds(60);
        log.info("创建定时任务调度线程池完成!");
        return threadPoolTaskScheduler;
    }
}
public enum JobStatusEnum {

    /**
     * 未加入调度器
     */
    NOT_SCHEDULE(0, "未加入调度器"),

    /**
     * 加入调度器,但未运行
     */
    SCHEDULED_BUT_NOT_RUNNING(1, "加入调度器,但未运行"),

    /**
     * 从调度器中已删除
     */
    DELETED(2, "从调度器中已删除"),
    ;

    private Integer status;

    private String detail;

    JobStatusEnum(Integer status, String detail) {
        this.status = status;
        this.detail = detail;
    }

    public Integer getStatus() {
        return status;
    }

    public void setStatus(Integer status) {
        this.status = status;
    }

    public String getDetail() {
        return detail;
    }

    public void setDetail(String detail) {
        this.detail = detail;
    }
}
@Component
@Slf4j
public class ScheduledJobService {

    private final ReentrantLock lock = new ReentrantLock();

    @Autowired
    private ThreadPoolTaskScheduler threadPoolTaskScheduler;
    @Autowired
    private SysJobService jobService;
    @Autowired
    private SpringBeanUtils springBeanUtils;

    /**
     * 已经加入调度器的任务map
     */
    private final ConcurrentHashMap<Long, ScheduledFuture<?>> scheduledFutureMap = new ConcurrentHashMap<>();

    /**
     * 初始化启动任务
     *
     * @param sysJobs 数据库任务集合
     */
    public void initAllJob(List<SysJob> sysJobs) {
        if (CollectionUtils.isEmpty(sysJobs)) {
            return;
        }
        for (SysJob sysJob : sysJobs) {
            if (JobStatusEnum.NOT_SCHEDULE.getStatus().equals(sysJob.getStatus())
                    || JobStatusEnum.DELETED.getStatus().equals(sysJob.getStatus())
                    || this.isScheduled(sysJob.getId())) {
                // 任务初始化状态或已删除或已加载到调度器中
                continue;
            }
            // 将任务加入调度器
            this.doScheduleJob(sysJob);
        }
    }

    /**
     * 启动任务
     *
     * @param jobId job主键id
     */
    public void start(Long jobId) {
        log.info("启动任务:-> jobId_{}", jobId);
        // 加入调度器
        schedule(jobId);
        log.info("启动任务结束:-> jobId_{}", jobId);
        // 更新任务状态
        jobService.updateJobStatus(jobId, JobStatusEnum.SCHEDULED_BUT_NOT_RUNNING.getStatus());
    }

    /**
     * 停止任务
     *
     * @param jobId job主键id
     */
    public void stop(Long jobId) {
        log.info("停止任务:-> jobId_{}", jobId);
        // 取消任务
        cancel(jobId);
        log.info("停止任务结束:-> jobId_{}", jobId);
        // 更新表中任务状态为已停止
        jobService.updateJobStatus(jobId, JobStatusEnum.NOT_SCHEDULE.getStatus());
    }

    /**
     * 移除任务
     *
     * @param jobId job主键id
     */
    public void remove(Long jobId) {
        log.info("移除任务:-> jobId_{}", jobId);
        // 取消任务
        cancel(jobId);
        log.info("移除任务结束:-> jobId_{}", jobId);
        // 更新表中任务状态为已删除
        jobService.updateJobStatus(jobId, JobStatusEnum.DELETED.getStatus());
    }

    /**
     * 取消
     *
     * @param jobId 工作id
     */
    private void cancel(Long jobId) {
        // 任务是否存在
        if (scheduledFutureMap.containsKey(jobId)) {
            ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(jobId);
            if (!scheduledFuture.isCancelled()) {
                // 取消调度
                scheduledFuture.cancel(true);
            }
        }
    }

    private void schedule(Long jobId) {
        // 添加锁,只允许单个线程访问,防止任务启动多次
        lock.lock();
        try {
            if (isScheduled(jobId)) {
                log.error("任务jobId_{}已经加入调度器,无需重复操作", jobId);
                return;
            }
            // 通过jobKey查询jobBean对象
            SysJob sysJob = jobService.getById(jobId);
            // 启动定时任务
            doScheduleJob(sysJob);
        } finally {
            // 释放锁资源
            lock.unlock();
        }
    }

    /**
     * 执行启动任务
     *
     * @param sysJob 任务实体类对象
     */
    private void doScheduleJob(SysJob sysJob) {
        Long jobId = sysJob.getId();
        String beanClass = sysJob.getBeanClass();
        String jobName = sysJob.getJobName();
        String cron = sysJob.getCronExpression();
        // 从Spring中获取目标的job业务实现类
        ScheduledJob scheduledJob = parseFrom(beanClass);
        if (scheduledJob == null) {
            return;
        }
        scheduledJob.setJobId(jobId);
        scheduledJob.setJobName(jobName);

        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledJob,
                triggerContext -> {
                    CronTrigger cronTrigger = new CronTrigger(cron);
                    return cronTrigger.nextExecutionTime(triggerContext);
                });

        log.info("任务加入调度器 -> jobId:{},jobName:{}", jobId, jobName);

        // 将启动的任务放入map
        assert scheduledFuture != null;
        scheduledFutureMap.put(jobId, scheduledFuture);
    }

    /**
     * 任务是否已经进入调度器
     *
     * @param jobId 任务主键key
     * @return {@link Boolean}
     */
    private Boolean isScheduled(Long jobId) {
        if (scheduledFutureMap.containsKey(jobId)) {
            return !scheduledFutureMap.get(jobId).isCancelled();
        }
        return false;
    }

    private ScheduledJob parseFrom(String beanClass) {
        try {
            Class<?> clazz = Class.forName(beanClass);
            return (ScheduledJob) springBeanUtils.getBean(clazz);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        return null;
    }
}
@Component
public class SpringBeanUtils implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        SpringBeanUtils.applicationContext = applicationContext;
    }

    /**
     * 获取applicationContext
     */
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    /**
     * 通过name获取 Bean.
     */
    public  Object getBean(String name) {
        return getApplicationContext().getBean(name);
    }

    /**
     * 通过class获取Bean.
     */
    public  <T> T getBean(Class<T> clazz) {
        return getApplicationContext().getBean(clazz);
    }

    /**
     * 通过name,以及Clazz返回指定的Bean
     */
    public  <T> T getBean(String name, Class<T> clazz) {
        return getApplicationContext().getBean(name, clazz);
    }
}
@Data
public abstract class ScheduledJob implements Runnable {

    /**
    * 任务主键id
    */
    private Long jobId;

    /**
    * 任务名
    */
    private String jobName;
}
@Component
public class SchedulerTestDemo extends ScheduledJob {

    @Override
    public void run() {
        System.out.println("我是定时任务要执行的类..");
        System.out.println(SchedulerTestDemo.class.getName() + ":" + LocalDateTime.now());
    }

}
/**
 * 项目启动时,将数据库中job定时任务加载
 */
@Component
public class GrapeApplicationListener {

    private final ScheduledJobService scheduledJobService;
    private final ISysJobService sysJobService;

    public GrapeApplicationListener(ISysJobService sysJobService, ScheduledJobService scheduledJobService) {
        this.sysJobService = sysJobService;
        this.scheduledJobService = scheduledJobService;
    }

    @PostConstruct
    public void initStartJob() {
    	// 初始化job
        scheduledJobService.initAllJob(sysJobService.list());
    }
}
@SpringBootApplication(scanBasePackages = {"com.example.grape"})
@MapperScan("com.example.grape.dao.mapper")
@EnableScheduling
public class GrapeApplication {

    public static void main(String[] args) {
        SpringApplication.run(GrapeApplication.class, args);
    }

}

总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Spring动态添加定时任务的实现思路

    一.背景 在工作中,有些时候我们有些定时任务的执行可能是需要动态修改的,比如: 生成报表,有些项目配置每天的8点生成,有些项目配置每天的10点生成,像这种动态的任务执行时间,在不考虑分布式执行的情况下,我们可以 使用 Spring Task来简单的实现. 二.需求和实现思路 1.能够动态的添加一个定时任务. 在Spring中存在一个类ThreadPoolTaskScheduler,它可以实现根据一个cron表达式来调度一个任务,并返回一个ScheduledFuture对象. 2.能够取消定时任务

  • SpringBoot动态定时功能实现方案详解

    目录 业务场景 环境准备 实现方案 归纳总结 业务场景 基于上篇程序,做了一版动态定时程序,然后发现这个定时程序需要在下次执行的时候会加载新的时间,所以如果改了定时程序不能马上触发,所以想到一种方法,在保存定时程序的时候将cron表达式传过去,然后触发定时程序,下面看看怎么实现 环境准备 开发环境 JDK 1.8 SpringBoot2.2.1 Maven 3.2+ 开发工具 IntelliJ IDEA smartGit Navicat15 实现方案 基于上一版进行改进: 先根据选择的星期生成c

  • spring定时任务(scheduler)的串行、并行执行实现解析

    对于spring的定时任务,最近有接触过一些,对于串行和并行也学习了一下,现在这里做下记录. 我是把每个定时任务分别写在不同的类中的,即一个类就是一个定时任务,然后在spring配置文件中进行配置,首先说串行任务的配置.如下: 1.串行 <task:scheduled-tasks> <task:scheduled ref="className1" method="methodName1" cron="0 0/5 * * * ?"

  • spring中定时任务taskScheduler的详细介绍

    前言 众所周知在spring 3.0版本后,自带了一个定时任务工具,而且使用简单方便,不用配置文件,可以动态改变执行状态.也可以使用cron表达式设置定时任务. 被执行的类要实现Runnable接口 TaskScheduler接口 TaskScheduler是一个接口,TaskScheduler接口下定义了6个方法 1.schedule(Runnable task, Trigger trigger); 指定一个触发器执行定时任务.可以使用CronTrigger来指定Cron表达式,执行定时任务

  • Spring动态管理定时任务之ThreadPoolTaskScheduler解读

    目录 Spring动态管理定时任务ThreadPoolTaskScheduler 实现思路 ThreadPoolTaskScheduler 定时任务实现 总结 Spring动态管理定时任务ThreadPoolTaskScheduler Spring任务调度核心类ThreadPoolTaskScheduler,API文档解释如下: Implementation of Spring's TaskScheduler interface, wrapping a native java.util.conc

  • Quartz+Spring Boot实现动态管理定时任务

    项目实践过程中碰到一个动态管理定时任务的需求:针对每个人员进行信息的定时更新,具体更新时间可随时调整.启动.暂定等. 思路 将每个人员信息的定时配置保存到数据库中,这样实现了任务的动态展示和管理.任务的每一次新增或变更,都会去数据库变更信息. 设置一个统一的任务管理器,专门负责动态任务的增删改查. POM依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://mav

  • spring boot task实现动态创建定时任务的方法

    在Spring Boot项目中,通过@EnableScheduling可启用Spring自带的定时任务支持,在通过@Scheduled注解定义定时任务,但是通过注解只能编写固定时间的定时任务,无法动态调整定时间隔,可通过实现SchedulingConfigurer接口实现动态定时任务注册. 对比Quartz的优缺点 优点:配置非常简单 缺点: 不支持分布式部署 不支持动态配置定时任务 不支持持久化 其实这几个缺点归根结底都是因为不支持持久化,所以如果项目需要持久化定时任务,还是要选择Quartz

  • 一文搞懂如何实现Java,Spring动态启停定时任务

    目录 为什么需要定时任务 Java定时任务的原理 Timer+TimerTask ScheduledThreadPoolExecutor Timer VS ScheduledThreadPoolExecutor Spring定时任务 @Scheduled定时任务原理(源码) 为什么需要定时任务 定时任务的应用场景十分广泛,如定时清理文件.定时生成报表.定时数据同步备份等. Java定时任务的原理 jdk自带的库中,有两种技术可以实现定时任务,一种是Timer,另一种是ScheduledThrea

  • SpringBoot 实现动态添加定时任务功能

    目录 代码结构 1.配置类 2.定时任务类型枚举 3.实际执行任务实现类 4.定时任务包装器 5.任务注册器(核心) 6.使用 最后 最近的需求有一个自动发布的功能, 需要做到每次提交都要动态的添加一个定时任务 代码结构 1. 配置类 package com.orion.ops.config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conf

  • spring中websocket定时任务实现实时推送

    有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了.使用的定时任务方式为spring的TaskScheduler对象实现任务调度. TaskScheduler定时任务实现 TaskScheduler接口提供了多种调度方法来实现运行任务的执行. public interface TaskScheduler { //通过触发器来决定task是否执行 ScheduledFuture sche

  • Spring事务管理只对出现运行期异常进行回滚

    一.结论 Spring的事务管理默认只对出现运行期异常(java.lang.RuntimeException及其子类)进行回滚. 如果一个方法抛出Exception或者Checked异常,Spring事务管理默认不进行回滚. 关于异常的分类一下详细介绍: 1.基本概念 看java的异常结构图  Throwable是所有异常的根,java.lang.Throwable Error是错误,java.lang.Error Exception是异常,java.lang.Exception 2.Excep

  • springboot整合Quartz实现动态配置定时任务的方法

    前言 在我们日常的开发中,很多时候,定时任务都不是写死的,而是写到数据库中,从而实现定时任务的动态配置,下面就通过一个简单的示例,来实现这个功能. 一.新建一个springboot工程,并添加依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency

  • Spring动态多数据源配置实例Demo

    最近由于咨询spring如何配置多数据源的人很多,一一回答又比较麻烦,而且以前的博文中的配置也是有问题,因此特此重新发布一个Demo给大家. Demo中共有两个数据源,即MySQL和Oracle,并已经进行简单测试,动态切换数据源是没有问题的,希望借此Demo能帮助到大家. Demo下载地址: Spring动态切换多数据源Demo:http://xiazai.jb51.net/201701/yuanma/dynamicDatasourceDemo_jb51.rar 另外我给些说明,阐述下多数据源

随机推荐