springbatch的封装与使用实例详解

Spring Batch官网介绍:

A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款轻量的、全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序。)

springbatch

主要实现批量数据的处理,我对batch进行的封装,提出了jobBase类型,具体job需要实现它即可。Spring Batch 不仅提供了统一的读写接口、丰富的任务处理方式、灵活的事务管理及并发处理,同时还支持日志、监控、任务重启与跳过等特性,大大简化了批处理应用开发,将开发人员从复杂的任务配置管理过程中解放出来,使他们可以更多地去关注核心的业务处理过程。

几个组件

•job
•step
•read
•write
•listener
•process
•validator

JobBase定义了几个公用的方法

 /**
 * springBatch的job基础类.
 */
 public abstract class JobBase<T> {
  /**
  * 批次.
  */
  protected int chunkCount = 5000;
  /**
  * 监听器.
  */
  private JobExecutionListener jobExecutionListener;
  /**
  * 处理器.
  */
  private ValidatingItemProcessor<T> validatingItemProcessor;
  /**
  * job名称.
  */
  private String jobName;
  /**
  * 检验器.
  */
  private Validator<T> validator;
  @Autowired
  private JobBuilderFactory job;
  @Autowired
  private StepBuilderFactory step;
  /**
  * 初始化.
  *
  * @param jobName         job名称
  * @param jobExecutionListener  监听器
  * @param validatingItemProcessor 处理器
  * @param validator        检验
  */
  public JobBase(String jobName,
         JobExecutionListener jobExecutionListener,
         ValidatingItemProcessor<T> validatingItemProcessor,
         Validator<T> validator) {
   this.jobName = jobName;
   this.jobExecutionListener = jobExecutionListener;
   this.validatingItemProcessor = validatingItemProcessor;
   this.validator = validator;
  }
  /**
  * job初始化与启动.
  */
  public Job getJob() throws Exception {
   return job.get(jobName).incrementer(new RunIdIncrementer())
     .start(syncStep())
     .listener(jobExecutionListener)
     .build();
  }
  /**
  * 执行步骤.
  *
  * @return
  */
  public Step syncStep() throws Exception {
   return step.get("step1")
     .<T, T>chunk(chunkCount)
     .reader(reader())
     .processor(processor())
     .writer(writer())
     .build();
  }
  /**
  * 单条处理数据.
  *
  * @return
  */
  public ItemProcessor<T, T> processor() {
   validatingItemProcessor.setValidator(processorValidator());
   return validatingItemProcessor;
  }
  /**
  * 校验数据.
  *
  * @return
  */
  @Bean
  public Validator<T> processorValidator() {
   return validator;
  }
  /**
  * 批量读数据.
  *
  * @return
  * @throws Exception
  */
  public abstract ItemReader<T> reader() throws Exception;
  /**
  * 批量写数据.
  *
  * @return
  */
  @Bean
  public abstract ItemWriter<T> writer();
 }

主要规定了公用方法的执行策略,而具体的job名称,读,写还是需要具体JOB去实现的。

具体Job实现

 @Configuration
 @EnableBatchProcessing
 public class SyncPersonJob extends JobBase<Person> {
  @Autowired
  private DataSource dataSource;
  @Autowired
  @Qualifier("primaryJdbcTemplate")
  private JdbcTemplate jdbcTemplate;
  /**
  * 初始化,规则了job名称和监视器.
  */
  public SyncPersonJob() {
   super("personJob", new PersonJobListener(), new PersonItemProcessor(), new BeanValidator<>());
  }
  @Override
  public ItemReader<Person> reader() throws Exception {
   StringBuffer sb = new StringBuffer();
   sb.append("select * from person");
   String sql = sb.toString();
   JdbcCursorItemReader<Person> jdbcCursorItemReader =
     new JdbcCursorItemReader<>();
   jdbcCursorItemReader.setSql(sql);
   jdbcCursorItemReader.setRowMapper(new BeanPropertyRowMapper<>(Person.class));
   jdbcCursorItemReader.setDataSource(dataSource);
   return jdbcCursorItemReader;
  }
  @Override
  @Bean("personJobWriter")
  public ItemWriter<Person> writer() {
   JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
   writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
   String sql = "insert into person_export " + "(id,name,age,nation,address) "
     + "values(:id, :name, :age, :nation,:address)";
   writer.setSql(sql);
   writer.setDataSource(dataSource);
   return writer;
  }
 }

写操作需要定义自己的bean的声明

注意,需要为每个job的write启个名称,否则在多job时,write将会被打乱

 /**
  * 批量写数据.
  *
  * @return
  */
 @Override
 @Bean("personVerson2JobWriter")
 public ItemWriter<Person> writer() {

 }

添加一个api,手动触发

 @Autowired
 SyncPersonJob syncPersonJob;
 @Autowired
 JobLauncher jobLauncher;
 void exec(Job job) throws Exception {
  JobParameters jobParameters = new JobParametersBuilder()
    .addLong("time", System.currentTimeMillis())
    .toJobParameters();
  jobLauncher.run(job, jobParameters);
 }
 @RequestMapping("/run1")
 public String run1() throws Exception {
  exec(syncPersonJob.getJob());
  return "personJob success";
 }

总结

以上所述是小编给大家介绍的springbatch的封装与使用实例详解,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

(0)

相关推荐

  • Spring Batch读取txt文件并写入数据库的方法教程

    项目需求 近日需要实现用户推荐相关的功能,也就是说向用户推荐他可能喜欢的东西. 我们的数据分析工程师会将用户以及用户可能喜欢的东西整理成文档给我,我只需要将数据从文档中读取出来,然后对数据进行进一步的清洗(例如去掉特殊符号,长度如果太长则截取).然后将处理后的数据存入数据库(Mysql). 所以分为三步: 读取文档获得数据 对获得的数据进行处理 更新数据库(新增或更新) 考虑到这个数据量以后会越来越大,这里没有使用 poi 来读取数据,而直接使用了 SpringBatch. 实现步骤 本文假设读

  • 详解SpringBoot和SpringBatch 使用

    什么是Spring Batch Spring Batch 是一个轻量级的.完善的批处理框架,旨在帮助企业建立健壮.高效的批处理应用.Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使的已经使用 Spring 框架的开发者或者企业更容易访问和利用企业服务. Spring Batch 提供了大量可重用的组件,包括了日志.追踪.事务.任务作业统计.任务重启.跳过.重复.资源管理.对于大数据量和高性能的批处理任务,Spring Batch 同样提供了

  • 基于Spring Batch向Elasticsearch批量导入数据示例

    1.介绍 当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率.Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据.由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入. 2.示例 2.1 pom.xml 本文使用spr

  • Spring batch批处理框架

    spring batch框架的简介 批处理任务是大多数IT项目的一个重要组成部分,批处理在业务系统中负责处理海量的数据,无须人工干预就能够自动高效的进行复杂的数据分析和处理.批处理会定期读入批量数据,经过相应的业务处理进行归档的业务操作,批处理的特征是自动执行,处理的数据量大,定时执行.将整个批处理的流程按逻辑划分可以分为读数据,处理数据和写数据. spring batch对批处理本身的特性进行了抽象,将批处理作业抽象为job和job step,将批处理的处理过程分解为数据读,数据处理和数据写.

  • Spring Batch入门教程篇

    SpringBatch介绍: SpringBatch 是一个大数据量的并行处理框架.通常用于数据的离线迁移,和数据处理,⽀持事务.并发.流程.监控.纵向和横向扩展,提供统⼀的接⼝管理和任务管理;SpringBatch是SpringSource和埃森哲为了统一业界并行处理标准为广大开发者提供方便开发的一套框架. 官方地址:github.com/spring-projects/spring-batch SpringBatch 本身提供了重试,异常处理,跳过,重启.任务处理统计,资源管理等特性,这些特

  • spring batch 读取多个文件数据导入数据库示例

    项目的目录结构 需要读取文件的的数据格式 applicatonContext.xml的配置 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p=

  • 详解spring batch的使用和定时器Quart的使用

    spring Batch是一个基于Spring的企业级批处理框架,它通过配合定时器Quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理. 在使用spring batch之前,得对spring batch的流程有一个基本了解 每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是ItemReader用来读取数据的,一个是ItemProcessor

  • springbatch的封装与使用实例详解

    Spring Batch官网介绍: A lightweight, comprehensive batch framework designed to enable the development of robust batch applications vital for the daily operations of enterprise systems.(一款轻量的.全面的批处理框架,用于开发强大的日常运营的企业级批处理应用程序.) springbatch 主要实现批量数据的处理,我对bat

  • AngularJS封装$http.post()实例详解

      AngularJS封装$http.post()实例详解 用了不是很长的时间跟了一个移动APP项目,用的是ionic + AngularJS + cordova框架,其间遇到过挺多问题,其中一个就是对Ajax的封装问题. 其实针对封装问题一直以来就没停止过谈论,不同的项目也有不同的需求,举个典型的例子,我在做这个项目的时候因为一开始没有考虑封装问题所以被批评了,而我的一个朋友却因为封装了受到了批评--很尴尬不是么. 那么对于是否要封装这个问题,究竟该怎么界定?其实这不是一个很复杂的问题,归根到

  • Flutter学习LogUtil封装与实现实例详解

    目录 一. 为什么要封装打印类 二. 需要哪些类 三. 打印输出的抽象类 四. 格式化日志内容 格式化堆栈 堆栈裁切工具类 格式化堆栈信息 格式化JSON 五. 需要用到的常量 六. 为了控制多个打印器的设置做了一个配置类 七. Log的管理类 九. 调用LogUtil 十. 定义一个Flutter 控制台打印输出的方法 十一. 现在使用前初始化log打印器一次 使用 一. 为什么要封装打印类 虽然 flutter/原生给我们提供了日志打印的功能,但是超出一定长度以后会被截断 Json打印挤在一

  • 公共Hooks封装文件下载useDownloadFile实例详解

    目录 引言 项目环境 封装前提:各方法对比 封装分解:下载核心代码 封装分解:用户体验设计 useDownloadFile.js完整代码 引言 对于经常需要开发企业管理后台的前端开发来说,必不可少的需要使用表格对于数据进行操作,在对于现有项目进行代码优化时,封装一些公共的Hooks. 本篇文章为useDownloadFile.js 基于个人项目环境进行封装的Hooks,仅以本文介绍封装Hooks思想心得,故相关代码可能不适用他人 项目环境 Vue3.x + Ant Design Vue3.x +

  • 微信小程序 封装http请求实例详解

    微信小程序 封装http请求 最近看了一下微信小程序,大致翻了一下,发现跟angular很相似的,但是比angular简单的很多具体可参考官方文档 https://mp.weixin.qq.com/debug/wxadoc/dev/framework/app-service/page.html?t=2017112 下面将封装http请求服务部分的服务以及引用部分 // 本服务用于封装请求 // 返回的是一个promisepromise var sendRrquest = function (ur

  • Vue3中使用typescript封装axios的实例详解

    这个axios封装,因为是用在vue3的demo里面的,为了方便,在vue3的配置里面按需加载element-plus 封装axios http.ts import axios, { AxiosRequestConfig, AxiosRequestHeaders, AxiosResponse } from 'axios' import { IResponseData } from '@/types' import { ElMessage, ElLoading, ILoadingInstance

  • React Native Modal 的封装与使用实例详解

    目录 背景 Android FullScreenModal 的封装使用 Android 原生实现全屏 Dialog 封装给 RN 进行相关的调用 Android 原生部分实现 JS 部分实现 使用 RootSiblings 封装 Modal 实现界面 Render 相关 实现 Modal 展示动画相关 使用 View 封装 Modal 整体 Modal 控件的封装 其他 Android Back 键的注意 View 封装 Modal 时候的注意 最后 背景 在使用 React Native(以下

  • ruby ftp封装实例详解

     ruby ftp封装实例详解 最近自己用ruby 封装了一个Net::FTP的工具类. class FtpTool def initialize() @current_ftp = create_ftp end # 获取指定格式的文件名称列表 # 例如: source = "test/*.txt" # 返回: [source/file_name.txt] def fetch_remote_filenames(source) return [] if source.blank? log_

  • react中的ajax封装实例详解

    react中的ajax封装实例详解 代码块 **opts: {'可选参数'} **method: 请求方式:GET/POST,默认值:'GET'; **url: 发送请求的地址, 默认值: 当前页地址; **data: string,json; **async: 是否异步:true/false,默认值:true; **cache: 是否缓存:true/false,默认值:true; **contentType: HTTP头信息,默认值:'application/x-www-form-urlenc

  • Android Intent封装的实例详解

    Android Intent封装的实例详解 什么是Intent: Intent是协调应用间.组件之间的通讯和交互.通过Intent你可以启动Activity.Service.Broadcasts.更可以跨程序调用第三方组件.例如:启动拨打电话界面.音乐播放等. 组件     启动 Activity startActicity() Service startService(),bindService( ) Broadcasts sendBroadcast() 使用Intent: 栗子:在一个Act

随机推荐