浅谈Spring Batch在大型企业中的最佳实践

在大型企业中,由于业务复杂、数据量大、数据格式不同、数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理。而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理。这样的过程就是“批处理”。

批处理应用通常有以下特点:

  1. 数据量大,从数万到数百万甚至上亿不等;
  2. 整个过程全部自动化,并预留一定接口进行自定义配置;
  3. 这样的应用通常是周期性运行,比如按日、周、月运行;
  4. 对数据处理的准确性要求高,并且需要容错机制、回滚机制、完善的日志监控等。

什么是Spring batch

Spring batch是一个轻量级的全面的批处理框架,它专为大型企业而设计,帮助开发健壮的批处理应用。Spring batch为处理大批量数据提供了很多必要的可重用的功能,比如日志追踪、事务管理、job执行统计、重启job和资源管理等。同时它也提供了优化和分片技术用于实现高性能的批处理任务。

它的核心功能包括:

  1. 事务管理
  2. 基于块的处理过程
  3. 声明式的输入/输出操作
  4. 启动、终止、重启任务
  5. 重试/跳过任务
  6. 基于Web的管理员接口

笔者所在的部门属于国外某大型金融公司的CRM部门,在日常工作中我们经常需要开发一些批处理应用,对Spring Batch有着丰富的使用经验。近段时间笔者特意总结了这些经验。

使用Spring Batch 3.0以及Spring Boot

在使用Spring Batch时推荐使用最新的Spring Batch 3.0版本。相比Spring Batch2.2,它做了以下方面的提升:

  1. 支持JSR-352标准
  2. 支持Spring4以及Java8
  3. 增强了Spring Batch Integration的功能
  4. 支持JobScope
  5. 支持SQLite

支持Spring4和Java8是一个重大的提升。这样就可以使用Spring4引入的Spring boot组件,从而开发效率方面有了一个质的飞跃。引入Spring-batch框架只需要在build.gradle中加入一行代码即可:

compile("org.springframework.boot:spring-boot-starter-batch")

而增强Spring Batch Integration的功能后,我们就可以很方便的和Spring家族的其他组件集成,还可以以多种方式来调用job,也支持远程分区操作以及远程块处理。

而支持JobScope后我们可以随时为对象注入当前Job实例的上下文信息。只要我们制定Bean的scope为job scope,那么就可以随时使用jobParameters和jobExecutionContext等信息。

 <bean id="..." class="..." scope="job">
  <property name="name" value="#{jobParameters[input]}" />
</bean>

<bean id="..." class="..." scope="job">
  <property name="name" value="#{jobExecutionContext['input.name']}.txt" />
</bean>

使用Java Config而不是xml的配置方式

之前我们在配置job和step的时候都习惯用xml的配置方式,但是随着时间的推移发现问题颇多。

  1. xml文件数急剧膨胀,配置块长且复杂,可读性很差;
  2. xml文件缺少语法检查,有些低级错误只有在运行集成测试的时候才能发现;
  3. 在xml文件中进行代码跳转时IDE的支持力度不够;

我们渐渐发现使用纯Java类的配置方式更灵活,它是类型安全的,而且IDE的支持更好。在构建job或step时采用的流式语法相比xml更加简洁易懂。

    @Bean
    public Step step(){
        return stepBuilders.get("step")
                .<Partner,Partner>chunk(1)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .listener(logProcessListener())
                .faultTolerant()
                .skipLimit(10)
                .skip(UnknownGenderException.class)
                .listener(logSkipListener())
                .build();
    }

在这个例子中可以很清楚的看到该step的配置,比如reader/processor/writer组件,以及配置了哪些listener等。

本地集成测试中使用内存数据库

Spring batch在运行时需要数据库支持,因为它需要在数据库中建立一套schema来存储job和step运行的统计信息。而在本地集成测试中我们可以借助Spring batch提供的内存Repository来存储Spring batch的任务执行信息,这样即避免了在本地配置一个数据库,又可以加快job的执行。

 <bean id="jobRepository"
 class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
  <property name="transactionManager" ref="transactionManager"/>
</bean>

我们在build.gradle中加入对hsqldb的依赖:

runtime(‘org.hsqldb:hsqldb:2.3.2')

然后在测试类中添加对DataSource的配置。

 @EnableAutoConfiguration
@EnableBatchProcessing
@DataJpaTest
@Import({DataSourceAutoConfiguration.class, BatchAutoConfiguration.class})
public class TestConfiguration {

}

并且在applicaton.properties配置中添加初始化Database的配置:

spring.batch.initializer.enable=true

合理的使用Chunk机制

Spring batch在配置Step时采用的是基于Chunk的机制。即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作。这样可以最大化的优化写入效率,整个事务也是基于Chunk来进行。

当我们在需要将数据写入到文件、数据库中之类的操作时可以适当设置Chunk的值以满足写入效率最大化。但有些场景下我们的写入操作其实是调用一个web service或者将消息发送到某个消息队列中,那么这些场景下我们就需要设置Chunk的值为1,这样既可以及时的处理写入,也不会由于整个Chunk中发生异常后,在重试时出现重复调用服务或者重复发送消息的情况。

使用Listener来监视job执行情况并及时做相应的处理

Spring batch提供了大量的Listener来对job的各个执行环节进行全面的监控。

在job层面Spring batch提供了JobExecutionListener接口,其支持在Job开始或结束时进行一些额外处理。在step层面Spring batch提供了StepExecutionListener,ChunkListener,ItemReadListener,ItemProcessListener,ItemWriteListener,SkipListener等接口,同时对Retry和Skip操作也提供了RetryListener及SkipListener。

通常我们会为每个job都实现一个JobExecutionListener,在afterJob操作中我们输出job的执行信息,包括执行时间、job参数、退出代码、执行的step以及每个step的详细信息。这样无论是开发、测试还是运维人员对整个job的执行情况了如指掌。

如果某个step会发生skip的操作,我们也会为其实现一个SkipListener,并在其中记录skip的数据条目,用于下一步的处理。

实现Listener有两种方式,一种是继承自相应的接口,比如继承JobExecutionListener接口,另一种是使用annoation(注解)的方式。经过实践我们认为使用注解的方式更好一些,因为使用接口你需要实现接口的所有方法,而使用注解则只需要对相应的方法添加annoation即可。

下面的这个类采用了继承接口的方式,我们看到其实我们只用到了第一个方法,第二个和第三个都没有用到。但是我们必须提供一个空的实现。

 public class CustomSkipListener implements SkipListener<String, String> {
  @Override
  public void onSkipInRead(Throwable t) {
    // business logic
  }

  @Override
  public void onSkipInWrite(String item, Throwable t) {
    // no need
  }

  @Override
  public void onSkipInProcess(String item, Throwable t) {
    // no need
  }
}

而使用annoation的方式可以简写为:

 public class CustomSkipListener {

  @OnSkipInRead
  public void onSkipInRead(Throwable t) {
    // business logic
  }
}

使用Retry和Skip增强批处理工作的健壮性

在处理百万级的数据过程过程中难免会出现异常。如果一旦出现异常而导致整个批处理工作终止的话那么会导致后续的数据无法被处理。Spring Batch内置了Retry(重试)和Skip(跳过)机制帮助我们轻松处理各种异常。适合Retry的异常的特点是这些异常可能会随着时间推移而消失,比如数据库目前有锁无法写入、web服务当前不可用、web服务满载等。所以对这些异常我们可以配置Retry机制。而有些异常则不应该配置Retry,比如解析文件出现异常等,因为这些异常即使Retry也会始终失败。

即使Retry多次仍然失败也无需让整个step失败,可以对指定的异常设置Skip选项从而保证后续的数据能够被继续处理。我们也可以配置SkipLimit选项保证当Skip的数据条目达到一定数量后及时终止整个Job。

有时候我们需要在每次Retry中间隔做一些操作,比如延长Retry时间,恢复操作现场等,Spring Batch提供了BackOffPolicy来达到目的。下面是一个配置了Retry机制、Skip机制以及BackOffPolicy的step示例。

 @Bean
public Step step(){
  return stepBuilders.get("step")
      .<Partner,Partner>chunk(1)
      .reader(reader())
      .processor(processor())
      .writer(writer())
      .listener(logProcessListener())
      .faultTolerant()
      .skipLimit(10)
      .skip(UnknownGenderException.class)
      .retryLimit(5)
      .retry(ServiceUnavailableException.class)
      .backOffPolicy(backoffPolicy)
      .listener(logSkipListener())
      .build();
}

使用自定义的Decider来实现Job flow

在Job执行过程中不一定都是顺序执行的,我们经常需要根据某个job的输出数据或执行结果来决定下一步的走向。以前我们会把一些判断放置在下游step中进行,这样可能会导致有些step实际运行了,但其实并没有做任何事情。比如一个step执行过程中会将失败的数据条目记录到一个报告中,而下一个step会判断有没有生成报告,如果生成了报告则将该报告发送给指定联系人,如果没有则不做任何事情。这种情况下可以通过Decider机制来实现Job的执行流程。在Spring batch 3.0中Decider已经从Step中独立出来,和Step处于同一级别。

 public class ReportDecider implements JobExecutionDecider {
  @Override
  public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
    if (report.isExist()) {
      return new FlowExecutionStatus(“SEND");
    }

    return new FlowExecutionStatus(“SKIP");
  }
}

而在job配置中可以这样来使用Decider。这样整个Job的执行流程会更加清晰易懂。

 public Job job() {
  return new JobBuilder("petstore")
      .start(orderProcess())
      .next(reportDecider)
      .on("SEND").to(sendReportStep)
      .on("SKIP").end().build()
      .build()
}

采用多种机制加速Job的执行

批处理工作处理的数据量大,而执行窗口一般又要求比较小。所以必须要通过多种方式来加速Job的执行。一般我们有四种方式来实现:

  1. 在单个step中多线程执行任务
  2. 并行执行不同的Step
  3. 并行执行同一个Step
  4. 远程执行Chunk任务

在单个step多线程执行任务可以借助于taskExecutor来实现。这种情况适合于reader、writer是线程安全的并且是无状态的场景。我们还可以设置线程数量。

 public Step step() {
  return stepBuilders.get("step")
      .tasklet(tasklet)
      .throttleLimit(20)
      .build();
}

上述示例中的tasklet需要实现TaskExecutor,Spring Batch提供了一个简单的多线程TaskExecutor供我们使用:SimpleAsyncTaskExecutor。

并行执行不同的Step在Spring batch中很容易实现,以下是一个示例:

 public Job job() {
  return stepBuilders.get("parallelSteps")
      .start(step1)
      .split(asyncTaskExecutor).add(flow1, flow2)
      .next(step3)
      .build();
}

在这个示例中我们先执行step1,然后并行执行flow1和flow2,最后再执行step3。

Spring batch提供了PartitionStep来实现对同一个step在多个进程中实现并行处理。通过PartitonStep再配合PartitionHandler可以将一个step扩展到多个Slave上实现并行运行。

远程执行Chunk任务则是将某个Step的processer操作分割到多个进程中,多个进程通过一些中间件进行通讯(比如采用消息的方式)。这种方式适合于Processer是瓶颈而Reader和Writer不是瓶颈的场景。

结语

Spring Batch对批处理场景进行了合理的抽象,封装了大量的实用功能,使用它来开发批处理应用可以达到事半功倍的效果。在使用的过程中我们仍需要坚持总结一些最佳实践,从而能够交付高质量的可维护的批处理应用,满足企业级应用的苛刻要求。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 详解spring batch的使用和定时器Quart的使用

    spring Batch是一个基于Spring的企业级批处理框架,它通过配合定时器Quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理. 在使用spring batch之前,得对spring batch的流程有一个基本了解 每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是ItemReader用来读取数据的,一个是ItemProcessor

  • spring batch 读取多个文件数据导入数据库示例

    项目的目录结构 需要读取文件的的数据格式 applicatonContext.xml的配置 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p=

  • Spring batch批处理框架

    spring batch框架的简介 批处理任务是大多数IT项目的一个重要组成部分,批处理在业务系统中负责处理海量的数据,无须人工干预就能够自动高效的进行复杂的数据分析和处理.批处理会定期读入批量数据,经过相应的业务处理进行归档的业务操作,批处理的特征是自动执行,处理的数据量大,定时执行.将整个批处理的流程按逻辑划分可以分为读数据,处理数据和写数据. spring batch对批处理本身的特性进行了抽象,将批处理作业抽象为job和job step,将批处理的处理过程分解为数据读,数据处理和数据写.

  • Spring Batch读取txt文件并写入数据库的方法教程

    项目需求 近日需要实现用户推荐相关的功能,也就是说向用户推荐他可能喜欢的东西. 我们的数据分析工程师会将用户以及用户可能喜欢的东西整理成文档给我,我只需要将数据从文档中读取出来,然后对数据进行进一步的清洗(例如去掉特殊符号,长度如果太长则截取).然后将处理后的数据存入数据库(Mysql). 所以分为三步: 读取文档获得数据 对获得的数据进行处理 更新数据库(新增或更新) 考虑到这个数据量以后会越来越大,这里没有使用 poi 来读取数据,而直接使用了 SpringBatch. 实现步骤 本文假设读

  • Spring Batch入门教程篇

    SpringBatch介绍: SpringBatch 是一个大数据量的并行处理框架.通常用于数据的离线迁移,和数据处理,⽀持事务.并发.流程.监控.纵向和横向扩展,提供统⼀的接⼝管理和任务管理;SpringBatch是SpringSource和埃森哲为了统一业界并行处理标准为广大开发者提供方便开发的一套框架. 官方地址:github.com/spring-projects/spring-batch SpringBatch 本身提供了重试,异常处理,跳过,重启.任务处理统计,资源管理等特性,这些特

  • 浅谈Spring Batch在大型企业中的最佳实践

    在大型企业中,由于业务复杂.数据量大.数据格式不同.数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理.而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理.这样的过程就是"批处理". 批处理应用通常有以下特点: 数据量大,从数万到数百万甚至上亿不等: 整个过程全部自动化,并预留一定接口进行自定义配置: 这样的应用通常是周期性运行,比如按日.周.月运行: 对数据处理的准确性要求高,并且需要容错机制.回滚机制.完善的日志监控等. 什么是Spring batch Sprin

  • 浅谈spring容器中bean的初始化

    当我们在spring容器中添加一个bean时,如果没有指明它的scope属性,则默认是singleton,也就是单例的. 例如先声明一个bean: public class People { private String name; private String sex; public String getName() { return name; } public void setName(String name) { this.name = name; } public String get

  • 浅谈spring中的default-lazy-init参数和lazy-init

    在spring的配置中的根节点上有个  default-lazy-init="true"配置: 1.spring的default-lazy-init参数 此参数表示延时加载,即在项目启动时不会实例化注解的bean,除非启动项目时需要用到,未实例化的注解对象在程序实际访问调用时才注入调用 spring在启动的时候,default-lazy-init参数默认为false,会默认加载整个对象实例图,从初始化ACTION配置.到 service配置到dao配置.乃至到数据库连接.事务等等.这样

  • 浅谈spring中scope作用域

    今天研究了一下scope的作用域.默认是单例模式,即scope="singleton".另外scope还有prototype.request.session.global session作用域.scope="prototype"多例.再配置bean的作用域时,它的头文件形式如下: 如何使用spring的作用域: <bean id="role" class="spring.chapter2.maryGame.Role" s

  • 浅谈Spring中Bean的作用域、生命周期

    本文主要探究的是关于Bean的作用域.生命周期的相关内容,具体如下. Bean的作用域 Spring 3中为Bean定义了5中作用域,分别为singleton(单例).prototype(原型).request.session和global session,5种作用域说明如下: 1.singleton:单例模式,Spring IoC容器中只会存在一个共享的Bean实例,无论有多少个Bean引用它,始终指向同一对象.Singleton作用域是Spring中的缺省作用域,也可以显示的将Bean定义为

  • 浅谈Spring中@Import注解的作用和使用

    @Import用来导入@Configuration注解的配置类.声明@Bean注解的bean方法.导入ImportSelector的实现类或导入ImportBeanDefinitionRegistrar的实现类. @Import注解的作用 查看Import注解源码 /** * Indicates one or more {@link Configuration @Configuration} classes to import. * * <p>Provides functionality eq

  • 浅谈spring中isolation和propagation的用法

    可以在XML文件中进行配置,下面的代码是个示意代码 <tx:advice id="txAdvice" transaction-manager="txManager"> <tx:attributes> <tx:method name="add*" propagation="REQUIRED" isolation="READ_COMMITTED"/>增加记录的方法 <t

  • 浅谈Spring 中 @EnableXXX 注解的套路

    目录 前言 设计目标 @EnableScheduling (导入一个 @Configuration 类) @EnableTransactionManagement(导入一个 ImportSelector) @EnableAspectJAutoProxy (在 Bean 定义层导入) 结论 前言 在 Spring 框架中有很多实用的功能,不需要写大量的配置代码,只需添加几个注解即可开启. 其中一个重要原因是那些 @EnableXXX 注解,它可以让你通过在配置类加上简单的注解来快速地开启诸如事务管

  • 浅谈Spring的两种事务定义方式

    一.声明式 这种方法不需要对原有的业务做任何修改,通过在XML文件中定义需要拦截方法的匹配即可完成配置,要求是,业务处理中的方法的命名要有规律,比如setXxx,xxxUpdate等等.详细配置如下: <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="

  • 浅谈Spring的两种配置容器

    Spring提供了两种容器类型 SpringIOC容器是一个IOC Service Provider.提供了两种容器类型:BeanFactory和ApplicationContext.Spring的IOC容器是一个提供IOC支持的轻量级容器.除了基本的ioc支持,它作为轻量级容器还提供了IOC之外的支持. BeanFactory BeanFactory是基础类型IOC容器.顾名思义,就是生产Bean的工厂.能够提供完整的IOC服务.没有特殊指定的话,其默认采用延迟初始化策略.只有当客户端对象需要

随机推荐