SpringBoot+SpringBatch+Quartz整合定时批量任务方式

目录
  • 一、引言
  • 二、代码具体实现
    • 1、pom文件
    • 2、application.yaml文件
    • 3、Service实现类
    • 4、SpringBatch配置类
    • 5、Processor,处理每条数据
    • 6、封装数据库返回数据的实体Bean
    • 7、启动类上要加上注解
  • 三、小结一下
  • spring-batch与quartz集成过程中遇到的问题
    • 问题
    • 原因
    • 解决

一、引言

最近一周,被借调到其他部门,赶一个紧急需求,需求内容如下:

PC网页触发一条设备升级记录(下图),后台要定时批量设备更新。这里定时要用到Quartz,批量数据处理要用到SpringBatch,二者结合,可以完成该需求。

由于之前,没有用过SpringBatch,于是上网查了下资料,发现可参考的不是很多,于是只能去慢慢的翻看官方文档

遇到不少问题,就记录一下吧。

二、代码具体实现

1、pom文件

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>
   </dependencies>

2、application.yaml文件

spring:
  datasource:
    username: thinklink
    password: thinklink
    url: jdbc:postgresql://172.16.205.54:5432/thinklink
    driver-class-name: org.postgresql.Driver
  batch:
    job:
      enabled: false
server:
  port: 8073
#upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/
upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/
# 每次批量处理的数据量,默认为5000
batch-size: 5000

3、Service实现类

触发批处理任务的入口,执行一个job

@Service("batchService")
public class BatchServiceImpl implements BatchService {
	// 框架自动注入
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job updateDeviceJob;
    /**
     * 根据 taskId 创建一个Job
     * @param taskId
     * @throws Exception
     */
    @Override
    public void createBatchJob(String taskId) throws Exception {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString("taskId", taskId)
                .addString("uuid", UUID.randomUUID().toString().replace("-",""))
                .toJobParameters();
        // 传入一个Job任务和任务需要的参数
        jobLauncher.run(updateDeviceJob, jobParameters);
    }
}

4、SpringBatch配置类

此部分最重要(☆☆☆☆☆)

@Configuration
public class BatchConfiguration {
    private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);
    @Value("${batch-size:5000}")
    private int batchSize;
	// 框架自动注入
    @Autowired
    public JobBuilderFactory jobBuilderFactory;
	// 框架自动注入
    @Autowired
    public StepBuilderFactory stepBuilderFactory;
	// 数据过滤器,对从数据库读出来的数据,注意进行操作
    @Autowired
    public TaskItemProcessor taskItemProcessor;
    // 接收job参数
    public Map<String, JobParameter> parameters;
    public Object taskId;
    @Autowired
    private JdbcTemplate jdbcTemplate;
	// 读取数据库操作
    @Bean
    @StepScope
    public JdbcCursorItemReader<DispatchRequest> itemReader(DataSource dataSource) {
        String querySql = " SELECT " +
                " e. ID AS taskId, " +
                " e.user_id AS userId, " +
                " e.timing_startup AS startTime, " +
                " u.device_id AS deviceId, " +
                " d.app_name AS appName, " +
                " d.compose_file AS composeFile, " +
                " e.failure_retry AS failureRetry, " +
                " e.tetry_times AS retryTimes, " +
                " e.device_managered AS deviceManagered " +
                " FROM " +
                " eiot_upgrade_task e " +
                " LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +
                " LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +
                " WHERE " +
                " ( " +
                " u.device_upgrade_status = 0 " +
                " OR u.device_upgrade_status = 2" +
                " )" +
                " AND e.tetry_times > u.retry_times " +
                " AND e. ID = ?";
        return new JdbcCursorItemReaderBuilder<DispatchRequest>()
                .name("itemReader")
                .sql(querySql)
                .dataSource(dataSource)
                .queryArguments(new Object[]{parameters.get("taskId").getValue()})
                .rowMapper(new DispatchRequest.DispatchRequestRowMapper())
                .build();
    }
	// 将结果写回数据库
    @Bean
    @StepScope
    public ItemWriter<ProcessResult> itemWriter() {
        return new ItemWriter<ProcessResult>() {
            private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {
                log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);
                Integer retryTimes = jdbcTemplate.queryForObject(
                        "select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",
                        new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class
                );
                retryTimes += 1;
                int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +
                        "where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());
                if (updateCount <= 0) {
                    log.warn("no task updated");
                } else {
                    log.info("count of {} task updated", updateCount);
                }
                // 最后一次重试
                if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {
                    log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());
                    return 1;
                } else {
                    return 0;
                }
            }
            @Override
            @Transactional
            public void write(List<? extends ProcessResult> list) throws Exception {
                Map taskMap = jdbcTemplate.queryForMap(
                        "select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",
                        list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的
                        );
                int deviceManagered = (int)taskMap.get("device_managered");
                Integer deviceCount = (Integer) taskMap.get("device_count");
                if (deviceCount == null) {
                    log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());
                }
                int taskStatus = (int)taskMap.get("task_status");
                for (ProcessResult result: list) {
                    deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());
                }
                if (deviceCount != null && deviceManagered == deviceCount) {
                    taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成
                }
                jdbcTemplate.update("update eiot_upgrade_task  set device_managered = ?, task_status = ? " +
                        "where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());
            }
        };
    }
    /**
     * 定义一个下发更新的 job
     * @return
     */
    @Bean
    public Job updateDeviceJob(Step updateDeviceStep) {
        return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
                .listener(new JobListener()) // 设置Job的监听器
                .flow(updateDeviceStep)// 执行下发更新的Step
                .end()
                .build();
    }
    /**
     * 定义一个下发更新的 step
     * @return
     */
    @Bean
    public Step updateDeviceStep(JdbcCursorItemReader<DispatchRequest> itemReader,ItemWriter<ProcessResult> itemWriter) {
        return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))
                .<DispatchRequest, ProcessResult> chunk(batchSize)
                .reader(itemReader) //根据taskId从数据库读取更新设备信息
                .processor(taskItemProcessor) // 每条更新信息,执行下发更新接口
                .writer(itemWriter)
                .build();
    }
    // job 监听器
    public class JobListener implements JobExecutionListener {
        @Override
        public void beforeJob(JobExecution jobExecution) {
            log.info(jobExecution.getJobInstance().getJobName() + " before... ");
            parameters = jobExecution.getJobParameters().getParameters();
            taskId = parameters.get("taskId").getValue();
            log.info("job param taskId : " + parameters.get("taskId"));
        }
        @Override
        public void afterJob(JobExecution jobExecution) {
            log.info(jobExecution.getJobInstance().getJobName() + " after... ");
            // 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job
            String sql = " SELECT " +
                    " count(*) " +
                    " FROM " +
                    " eiot_upgrade_device d " +
                    " LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +
                    " WHERE " +
                    " u. ID = ? " +
                    " AND d.retry_times < u.tetry_times " +
                    " AND ( " +
                    " d.device_upgrade_status = 0 " +
                    " OR d.device_upgrade_status = 2 " +
                    " ) ";
            // 获取更新失败的设备个数
            Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
            log.info("update device failure count : " + count);
            // 下面是使用Quartz触发定时任务
            // 获取任务时间,单位秒
//            String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);
            // 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒
            Integer millSecond = 10;
            if(count != null && count > 0){
                String jobName = "UpgradeTask_" + taskId;
                String reTaskId = taskId.toString();
                Map<String,Object> params = new HashMap<>();
                params.put("jobName",jobName);
                params.put("taskId",reTaskId);
                if (QuartzManager.checkNameNotExist(jobName))
                {
                    QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);
                }
            }
        }
    }
}

5、Processor,处理每条数据

可以在此对数据进行过滤操作

@Component("taskItemProcessor")
public class TaskItemProcessor implements ItemProcessor<DispatchRequest, ProcessResult> {
    public static final int STATUS_DISPATCH_FAILED = 2;
    public static final int STATUS_DISPATCH_SUCC = 1;
    private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);
    @Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")
    private String dispatchUrl;
    @Autowired
    JdbcTemplate jdbcTemplate;
    /**
     * 在这里,执行 下发更新指令 的操作
     * @param dispatchRequest
     * @return
     * @throws Exception
     */
    @Override
    public ProcessResult process(final DispatchRequest dispatchRequest) {
        // 调用接口,下发指令
        String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();
        log.info("request url:" + url);
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
        MultiValueMap<String, String> params = new LinkedMultiValueMap<String, String>();
        JSONObject jsonOuter = new JSONObject();
        JSONObject jsonInner = new JSONObject();
        try {
            jsonInner.put("jobId",dispatchRequest.getTaskId());
            jsonInner.put("name",dispatchRequest.getName());
            jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));
            jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));
            jsonInner.put("timestamp",dispatchRequest.getTimestamp());
            jsonOuter.put("method","updateApp");
            jsonOuter.put("params",jsonInner);
        } catch (JSONException e) {
            log.info("JSON convert Exception :" + e);
        }catch (IOException e) {
            log.info("Base64Util bytesToBase64Str :" + e);
        }
        log.info("request body json :" + jsonOuter);
        HttpEntity<String> requestEntity = new HttpEntity<String>(jsonOuter.toString(),headers);
        int status;
        try {
            ResponseEntity<String> response = restTemplate.postForEntity(url,requestEntity,String.class);
            log.info("response :" + response);
            if (response.getStatusCode() == HttpStatus.OK) {
                status = STATUS_DISPATCH_SUCC;
            } else {
                status = STATUS_DISPATCH_FAILED;
            }
        }catch (Exception e){
            status = STATUS_DISPATCH_FAILED;
        }
        return new ProcessResult(dispatchRequest, status);
    }
}

6、封装数据库返回数据的实体Bean

注意静态内部类

public class DispatchRequest {
    private String taskId;
    private String deviceId;
    private String userId;
    private String name;
    private byte[] composeFile;
    private String policy;
    private String timestamp;
    private String md5;
    private int failureRetry;
    private int retryTimes;
    private int deviceManagered;
   // 省略构造函数,setter/getter/tostring方法
   //......

    public static class DispatchRequestRowMapper implements RowMapper<DispatchRequest> {
        @Override
        public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {
            DispatchRequest dispatchRequest = new DispatchRequest();
            dispatchRequest.setTaskId(resultSet.getString("taskId"));
            dispatchRequest.setUserId(resultSet.getString("userId"));
            dispatchRequest.setPolicy(resultSet.getString("startTime"));
            dispatchRequest.setDeviceId(resultSet.getString("deviceId"));
            dispatchRequest.setName(resultSet.getString("appName"));
            dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));
            dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));
            dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));
            dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));
            dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));
            return dispatchRequest;
        }
    }
}

7、启动类上要加上注解

@SpringBootApplication
@EnableBatchProcessing
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

三、小结一下

其实SpringBatch并没有想象中那么好用,当从数据库中每次取5000条数据后,进入processor中是逐条处理的,这个时候不能不行操作,等5000条数据处理完之后,再一次性执行ItemWriter方法。

在使用的过程中,最坑的地方是ItemReader和ItemWriter这两个地方,如何执行自定义的Sql,参考文中代码就行。至于Quartz定时功能,很简单,只要定时创建SpringBatch里面的Job,让这个job启动就好了,此处就不在给出了,贴的代码太多了。由于公司一些原因,代码不能放到GitHub上。

spring-batch与quartz集成过程中遇到的问题

问题

启动时报Exception

Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB

原因

quartz的driverDelegateClass配置的是OracleDelegate,应用运行在weblogic上

解决

driverDelegateClass对应配置改为

org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate

以上为个人经验,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • 详解Spring整合Quartz实现动态定时任务

    最近项目中需要用到定时任务的功能,虽然spring 也自带了一个轻量级的定时任务实现,但感觉不够灵活,功能也不够强大.在考虑之后,决定整合更为专业的Quartz来实现定时任务功能. 普通定时任务 首先,当然是添加依赖的jar文件,我的项目是maven管理的,以下的我项目的依赖: <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring

  • SpringBoot定时任务两种(Spring Schedule 与 Quartz 整合 )实现方法

    前言 最近在项目中使用到定时任务,之前一直都是使用Quartz 来实现,最近看Spring 基础发现其实Spring 提供 Spring Schedule 可以帮助我们实现简单的定时任务功能. 下面说一下两种方式在Spring Boot 项目中的使用. Spring Schedule 实现定时任务 Spring Schedule 实现定时任务有两种方式 1. 使用XML配置定时任务, 2. 使用 @Scheduled 注解. 因为是Spring Boot 项目 可能尽量避免使用XML配置的形式,

  • spring boot整合quartz实现多个定时任务的方法

    最近收到了很多封邮件,都是想知道spring boot整合quartz如何实现多个定时任务的,由于本人生产上并没有使用到多个定时任务,这里给个实现的思路. 1.新建两个定时任务,如下: public class ScheduledJob implements Job{ @Override public void execute(JobExecutionContext context) throws JobExecutionException { System.out.println("sched

  • SpringBoot集成Quartz实现定时任务的方法

    1 需求 在我的前后端分离的实验室管理项目中,有一个功能是学生状态统计.我的设计是按天统计每种状态的比例.为了便于计算,在每天0点,系统需要将学生的状态重置,并插入一条数据作为一天的开始状态.另外,考虑到学生的请假需求,请假的申请往往是提前做好,等系统时间走到实际请假时间的时候,系统要将学生的状态修改为请假. 显然,这两个子需求都可以通过定时任务实现.在网上略做搜索以后,我选择了比较流行的定时任务框架Quartz. 2 Quartz Quartz是一个定时任务框架,其他介绍网上也很详尽.这里要介

  • SpringBoot+SpringBatch+Quartz整合定时批量任务方式

    目录 一.引言 二.代码具体实现 1.pom文件 2.application.yaml文件 3.Service实现类 4.SpringBatch配置类 5.Processor,处理每条数据 6.封装数据库返回数据的实体Bean 7.启动类上要加上注解 三.小结一下 spring-batch与quartz集成过程中遇到的问题 问题 原因 解决 一.引言 最近一周,被借调到其他部门,赶一个紧急需求,需求内容如下: PC网页触发一条设备升级记录(下图),后台要定时批量设备更新.这里定时要用到Quart

  • springboot 2.x整合mybatis实现增删查和批量处理方式

    目录 springboot 2.x整合mybatis实现增删查和批量处理 1.添加依赖 2.添加配置文件 3.Application.class添加扫描 4.创建Mapper 5.创建provider实现类 Springboot整合mybatis(注解而且能看明白版本) 1.环境配置 2.整合Mybatis springboot 2.x整合mybatis实现增删查和批量处理 话不多说,直接上代码: 1.添加依赖 <!--mybatis数据库整合--> <dependency> &l

  • Spring整合Quartz实现定时任务调度的方法

    最近项目中需要实现定时执行任务,比如定时计算会员的积分.调用第三方接口等,由于项目采用spring框架,所以这里结合spring框架来介绍. 编写作业类 即普通的pojo,如下: package com.pcmall.task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class TaskA { private static Logger logger = LoggerFactory.getLogger(Ta

  • Springboot项目与vue项目整合打包的实现方式

    我的环境 * JDK 1.8  * maven 3.6.0  * node环境 1.为什么需要前后端项目开发时分离,部署时合并? 在一些公司,部署实施人员的技术无法和互联网公司的运维团队相比,由于各种不定的环境也无法做到自动构建,容器化部署等.因此在这种情况下尽量减少部署时的服务软件需求,打出的包数量也尽量少.针对这种情况这里采用的在开发中做到前后端独立开发,打包时在后端springboot打包发布时将前端的构建输出一起打入,最后只需部署springboot的项目即可,无需再安装nginx服务器

  • springboot的yml配置文件通过db2的方式整合mysql的教程

    springboot整合MySQL很简单,多数据源就master,slave就行了,但是在整合DB2就需要另起一行,以下是同一个yml文件 先配置MySQL,代码如下 spring: datasource: type: com.alibaba.druid.pool.DruidDataSource druid: # 主库数据源 master: url: jdbc:mysql://localhost:3308/<数据库名>?useUnicode=true&characterEncoding

  • Springboot安全框架整合SpringSecurity实现方式

    目录 1.工业级安全框架介绍 2.建议搭建Spring Security环境 2.1在pom.xml中添加相关依赖 2.2创建Handler类 2.3创建简单的html和配置相关thymeleaf的路径 2.4最后再加个启动类,那么我们的整合测试就完成勒 2.5成果展示 用户名默认user,密码则随机生成的这串数字 3.进阶版使用 3.1用户名和密码自定义 3.2在config包下创建Encoder进行密码的校验和转码操作 3.3赋予账号角色权限 1.工业级安全框架介绍 Spring Secur

  • SpringBoot实现quartz定时任务可视化管理功能

    前言 在实际框架或产品开发过程中,springboot中集成quarzt方式基本是以job和trigger的bean对象方式直接硬编码完成的,例如以下代码示例.对于系统内定义的所有定时任务类型,具体执行类,执行策略,运行状态都没有一个动态全局的管理,所有决定将quartz做成可视化配置管理,便于统一管理,也降低了使用门槛,只需要关心job类的实现即可 @Bean public JobDetail SMSJobDetail() { return JobBuilder.newJob(SMSJob.c

  • SpringBoot详解整合Spring Boot Admin实现监控功能

    目录 监控 监控的意义 可视化监控平台 监控原理 自定义监控指标 监控 ​ 在说监控之前,需要回顾一下软件业的发展史.最早的软件完成一些非常简单的功能,代码不多,错误也少.随着软件功能的逐步完善,软件的功能变得越来越复杂,功能不能得到有效的保障,这个阶段出现了针对软件功能的检测,也就是软件测试.伴随着计算机操作系统的逐步升级,软件的运行状态也变得开始让人捉摸不透,出现了不稳定的状况.伴随着计算机网络的发展,程序也从单机状态切换成基于计算机网络的程序,应用于网络的程序开始出现,由于网络的不稳定性,

  • 浅谈SpringBoot集成Quartz动态定时任务

    SpringBoot自带schedule 沿用的springboot少xml配置的优良传统,本身支持表达式等多种定时任务 注意在程序启动的时候加上@EnableScheduling @Scheduled(cron="0/5 * * * * ?") public void job(){ System.out.println("每五秒执行一次"); } 为什么要使用Quartz 多任务情况下,quartz更容易管理,可以实现动态配置 执行时间表达式: 表达式示例: 集成

随机推荐