Spring Batch轻量级批处理框架实战

目录
  • 1 实战前的理论基础
    • 1.1 Spring Batch是什么
    • 1.2 Spring Batch能做什么
    • 1.3 基础架构
    • 1.4 核心概念和抽象
  • 2 各个组件介绍
    • 2.1 Job
    • 2.2 Step
    • 2.3 ExecutionContext
    • 2.4 JobRepository
    • 2.5 JobLauncher
    • 2.6 Item Reader
    • 2.7 Item Writer
    • 2.8 Item Processor
  • 3 Spring Batch实战
    • 3.1 依赖和项目结构以及配置文件
    • 3.2 代码和数据表
    • 3.3 测试
  • 4 实战后的总结
    • 4.1 JobBuilderFactory
    • 4.2 StepBuilderFactory
  • 参考文档:

1 实战前的理论基础

1.1 Spring Batch是什么

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统日常运营至关重要的强大的批处理应用程序。同时使开发人员在必要时可以轻松访问和利用更先进的企业服务。Spring Batch 不是调度框架,它旨在与调度程序一起工作,而不是取代调度程序。

1.2 Spring Batch能做什么

  • 自动化、复杂的大量信息处理,无需用户交互即可最有效地处理。这些操作通常包括基于时间的事件(例如月末计算、通知或通信)。
  • 定期应用在非常大的数据集上重复处理的复杂业务规则(例如,保险福利确定或费率调整)。
  • 将从内部和外部系统接收的信息集成到记录系统中,这些信息通常需要以事务方式进行格式化、验证和处理。批处理用于每天为企业处理数十亿笔交易。

业务场景:

  • 定期提交批处理
  • 并发批处理:作业的并行处理
  • 分阶段的、企业消息驱动的处理
  • 大规模并行批处理
  • 失败后手动或计划重启
  • 依赖步骤的顺序处理(扩展到工作流驱动的批处理)
  • 部分处理:跳过记录(例如,在回滚时)
  • 整批事务,适用于小批量或现有存储过程/脚本的情况

总之Spring batch可以做的:

  • 从数据库、文件或队列中读取大量记录。
  • 以某种方式处理数据。
  • 以修改后的形式写回数据。

1.3 基础架构

1.4 核心概念和抽象

核心概念:一个 Job 有一对多的Step,每个步骤都正好有一个 ItemReader、一个ItemProcessor和 一个ItemWriter。需要启动作业(使用 JobLauncher),并且需要存储有关当前运行进程的元数据(在 中 JobRepository)。

2 各个组件介绍

2.1 Job

Job是封装了整个批处理过程的实体。与其他 Spring 项目一样,一个Job与 XML 配置文件或基于 Java 的配置连接在一起。这种配置可以被称为“作业配置”。

可配置项:

  • 作业的简单名称。
  • Step实例的定义和排序。
  • 作业是否可重新启动。

2.2 Step

一个Step是一个域对象,它封装了批处理作业的一个独立的、连续的阶段。因此,每个 Job 完全由一个或多个步骤组成。一个Step包含定义和控制实际批处理所需的所有信息。

一个StepExecution代表一次尝试执行一个StepStepExecution 每次Step运行时都会创建一个新的,类似于JobExecution

2.3 ExecutionContext

一个ExecutionContext表示由框架持久化和控制的键/值对的集合,以允许开发人员有一个地方来存储范围为StepExecution对象或JobExecution对象的持久状态。

2.4 JobRepository

JobRepository是上述所有 Stereotypes 的持久性机制。它提供了CRUD操作JobLauncherJob以及Step实现。当 Job第一次启动,一个JobExecution被从库中获得,并且,执行的过程中,StepExecutionJobExecution实施方式是通过将它们传递到存储库持续。

使用 Java 配置时,@EnableBatchProcessing注解提供了一个 JobRepository作为开箱即用自动配置的组件之一。

2.5 JobLauncher

JobLauncher表示一个简单的接口,用于Job使用给定的 集合 启动JobParameters,如以下示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

期望实现JobExecution从 中 获得有效JobRepository并执行Job

2.6 Item Reader

ItemReader是一种抽象,表示一次检索Step一个项目的输入。当ItemReader用完它可以提供的项目时,它通过返回来表明这一点null

2.7 Item Writer

ItemWriter是一种抽象,表示一次一个Step、一批或一大块项目的输出。通常, anItemWriter不知道它接下来应该接收的输入,并且只知道在其当前调用中传递的项目。

2.8 Item Processor

ItemProcessor是表示项目的业务处理的抽象。当ItemReader读取一个项目并ItemWriter写入它们时,它 ItemProcessor提供了一个访问点来转换或应用其他业务处理。如果在处理该项目时确定该项目无效,则返回 null表示不应写出该项目。

3 Spring Batch实战

下面就利用我们所学的理论实现一个最简单的Spring Batch批处理项目

3.1 依赖和项目结构以及配置文件

依赖

<!--Spring batch-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<!-- web依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- lombok-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.20</version>
</dependency>
<!--  mysql-->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>
<!--  mybatis-->
<dependency>
    <groupId>com.baomidou</groupId>
    <artifactId>mybatis-plus-boot-starter</artifactId>
    <version>3.2.0</version>
</dependency>

项目结构

配置文件

server.port=9000
spring.datasource.url=jdbc:mysql://localhost:3306/test
spring.datasource.username=root
spring.datasource.password=12345
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

3.2 代码和数据表

数据表

CREATE TABLE `student` (
    `id` int(100) NOT NULL AUTO_INCREMENT,
    `name` varchar(45) DEFAULT NULL,
    `age` int(2) DEFAULT NULL,
    `address` varchar(45) DEFAULT NULL,
    PRIMARY KEY (`id`),
    UNIQUE KEY `id_UNIQUE` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT

Student实体类

/**
 * @desc: Student实体类
 * @author: YanMingXin
 * @create: 2021/10/15-12:17
 **/
@Data
@Accessors(chain = true)
@NoArgsConstructor
@AllArgsConstructor
@ToString
@TableName("student")
public class Student {

    @TableId(value = "id", type = IdType.AUTO)
    private Long sId;

    @TableField("name")
    private String sName;

    @TableField("age")
    private Integer sAge;

    @TableField("address")
    private String sAddress;

}

Mapper层

/**
 * @desc: Mapper层
 * @author: YanMingXin
 * @create: 2021/10/15-12:17
 **/
@Mapper
@Repository
public interface StudentDao extends BaseMapper<Student> {
}

模拟数据库(文件)中读取类

/**
 * @desc: 模拟数据库中读取
 * @author: YanMingXin
 * @create: 2021/10/16-10:13
 **/
public class StudentVirtualDao {

    /**
     * 模拟从数据库中读取
     *
     * @return
     */
    public List<Student> getStudents() {
        ArrayList<Student> students = new ArrayList<>();
        students.add(new Student(1L, "zs", 23, "Beijing"));
        students.add(new Student(2L, "ls", 23, "Beijing"));
        students.add(new Student(3L, "ww", 23, "Beijing"));
        students.add(new Student(4L, "zl", 23, "Beijing"));
        students.add(new Student(5L, "mq", 23, "Beijing"));
        students.add(new Student(6L, "gb", 23, "Beijing"));
        students.add(new Student(7L, "lj", 23, "Beijing"));
        students.add(new Student(8L, "ss", 23, "Beijing"));
        students.add(new Student(9L, "zsdd", 23, "Beijing"));
        students.add(new Student(10L, "zss", 23, "Beijing"));
        return students;
    }
}

Service层接口

/**
 * @desc:
 * @author: YanMingXin
 * @create: 2021/10/15-12:16
 **/
public interface StudentService {

    List<Student> selectStudentsFromDB();

    void insertStudent(Student student);
}

Service层实现类

/**
 * @desc: Service层实现类
 * @author: YanMingXin
 * @create: 2021/10/15-12:16
 **/
@Service
public class StudentServiceImpl implements StudentService {

    @Autowired
    private StudentDao studentDao;

    @Override
    public List<Student> selectStudentsFromDB() {
        return studentDao.selectList(null);
    }

    @Override
    public void insertStudent(Student student) {
        studentDao.insert(student);
    }
}

最核心的配置类BatchConfiguration

/**
 * @desc: BatchConfiguration
 * @author: YanMingXin
 * @create: 2021/10/15-12:25
 **/
@Configuration
@EnableBatchProcessing
@SuppressWarnings("all")
public class BatchConfiguration {

    /**
     * 注入JobBuilderFactory
     */
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    /**
     * 注入StepBuilderFactory
     */
    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    /**
     * 注入JobRepository
     */
    @Autowired
    public JobRepository jobRepository;

    /**
     * 注入JobLauncher
     */
    @Autowired
    private JobLauncher jobLauncher;

    /**
     * 注入自定义StudentService
     */
    @Autowired
    private StudentService studentService;

    /**
     * 注入自定义job
     */
    @Autowired
    private Job studentJob;

    /**
     * 封装writer bean
     *
     * @return
     */
    @Bean
    public ItemWriter<Student> writer() {
        ItemWriter<Student> writer = new ItemWriter() {
            @Override
            public void write(List list) throws Exception {
                //debug发现是嵌套的List reader的线程List嵌套真正的List
                list.forEach((stu) -> {
                    for (Student student : (ArrayList<Student>) stu) {
                        studentService.insertStudent(student);
                    }
                });
            }
        };
        return writer;
    }

    /**
     * 封装reader bean
     *
     * @return
     */
    @Bean
    public ItemReader<Student> reader() {
        ItemReader<Student> reader = new ItemReader() {
            @Override
            public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
                //模拟数据获取
                StudentVirtualDao virtualDao = new StudentVirtualDao();
                return virtualDao.getStudents();
            }
        };
        return reader;
    }

    /**
     * 封装processor bean
     *
     * @return
     */
    @Bean
    public ItemProcessor processor() {
        ItemProcessor processor = new ItemProcessor() {
            @Override
            public Object process(Object o) throws Exception {
                //debug发现o就是reader单次单线程读取的数据
                return o;
            }
        };
        return processor;
    }

    /**
     * 封装自定义step
     *
     * @return
     */
    @Bean
    public Step studentStepOne() {
        return stepBuilderFactory.get("studentStepOne")
            .chunk(1)
            .reader(reader()) //加入reader
            .processor(processor())  //加入processor
            .writer(writer())//加入writer
            .build();
    }

    /**
     * 封装自定义job
     *
     * @return
     */
    @Bean
    public Job studentJob() {
        return jobBuilderFactory.get("studentJob")
            .flow(studentStepOne())//加入step
            .end()
            .build();
    }

    /**
     * 使用spring 定时任务执行
     */
    @Scheduled(fixedRate = 5000)
    public void printMessage() {
        try {
            JobParameters jobParameters = new JobParametersBuilder()
                .addLong("time", System.currentTimeMillis())
                .toJobParameters();
            jobLauncher.run(studentJob, jobParameters);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3.3 测试

项目启动1s之后

看数据库,除了我们实体类定义的表以外多出来这么多表,这些表都是spring batch自带的记录日志和错误的表,具体的字段含义的有待研究

4 实战后的总结

Spring Batch有非常快的写入和读取速度,但是带来的影响就是非常耗费内存和数据库连接池的资源如果使用不好的话还会发生异常,因此我们要进行正确的配置,接下来我们进行简单的源码探究:

4.1 JobBuilderFactory

job的获取使用了简单工厂模式和建造者模式JobBuilderFactory获取JobBuilder在经过配置返回一个job对象的实例,该实例就是Spring Batch中最顶级的组件,包含了n和step

public class JobBuilderFactory {

   private JobRepository jobRepository;

   public JobBuilderFactory(JobRepository jobRepository) {
      this.jobRepository = jobRepository;
   }
   //返回JobBuilder
   public JobBuilder get(String name) {
      JobBuilder builder = new JobBuilder(name).repository(jobRepository);
      return builder;
   }
}

jobBuilder类

public class JobBuilder extends JobBuilderHelper<JobBuilder> {

   /**
    * 为指定名称的作业创建一个新的构建器
    */
   public JobBuilder(String name) {
      super(name);
   }

   /**
    * 创建将执行步骤或步骤序列的新作业构建器。
    */
   public SimpleJobBuilder start(Step step) {
      return new SimpleJobBuilder(this).start(step);
   }

   /**
    * 创建将执行流的新作业构建器。
    */
   public JobFlowBuilder start(Flow flow) {
      return new FlowJobBuilder(this).start(flow);
   }

   /**
    * 创建将执行步骤或步骤序列的新作业构建器
    */
   public JobFlowBuilder flow(Step step) {
      return new FlowJobBuilder(this).start(step);
   }
}

4.2 StepBuilderFactory

直接看StepBuilder类

public class StepBuilder extends StepBuilderHelper<StepBuilder> {

   public StepBuilder(String name) {
      super(name);
   }

   /**
    * 用自定义微线程构建步骤,不一定是项处理。
    */
   public TaskletStepBuilder tasklet(Tasklet tasklet) {
      return new TaskletStepBuilder(this).tasklet(tasklet);
   }

   /**
    * 构建一个步骤,按照提供的大小以块的形式处理项。为了将这一步扩展到容错,
    * 在构建器上调用SimpleStepBuilder的 faultolerant()方法。
    * @param <I> 输入类型
    * @param <O> 输出类型
    */
   public <I, O> SimpleStepBuilder<I, O> chunk(int chunkSize) {
      return new SimpleStepBuilder<I, O>(this).chunk(chunkSize);
   }

   public <I, O> SimpleStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
      return new SimpleStepBuilder<I, O>(this).chunk(completionPolicy);
   }

   public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {
      return new PartitionStepBuilder(this).partitioner(stepName, partitioner);
   }

   public PartitionStepBuilder partitioner(Step step) {
      return new PartitionStepBuilder(this).step(step);
   }

   public JobStepBuilder job(Job job) {
      return new JobStepBuilder(this).job(job);
   }

   /**
    * 创建将执行流的新步骤构建器。
    */
   public FlowStepBuilder flow(Flow flow) {
      return new FlowStepBuilder(this).flow(flow);
   }
}

参考文档:

https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/index.html

https://www.jdon.com/springbatch.html

到此这篇关于Spring Batch轻量级批处理框架实战的文章就介绍到这了,更多相关Spring Batch批处理内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 详解Spring Batch 轻量级批处理框架实践

    实践内容 从 MariaDB 一张表内读 10 万条记录,经处理后写到 MongoDB . 具体实现 1.新建 Spring Boot 应用,依赖如下: <!-- Web 应用 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> &

  • Java中批处理框架spring batch详细介绍

    spring batch简介 spring batch是spring提供的一个数据处理框架.企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作. 这些业务运营包括: 无需用户交互即可最有效地处理大量信息的自动化,复杂处理. 这些操作通常包括基于时间的事件(例如月末计算,通知或通信). 在非常大的数据集中重复处理复杂业务规则的定期应用(例如,保险利益确定或费率调整). 集成从内部和外部系统接收的信息,这些信息通常需要以事务方式格式化,验证和处理到记录系统中. 批处理用于每天为企业处

  • 详解批处理框架之Spring Batch

    目录 一.Spring Batch的概念知识 1.1.分层架构 1.2.关键概念 1.2.1.JobRepository 1.2.2.任务启动器JobLauncher 1.2.3.任务Job 1.2.4.步骤Step 1.2.5.输入--处理--输出 二.代码实例 2.1.基本框架 2.2.输入--处理--输出 2.2.1.读取ItemReader 2.2.2.处理ItemProcessor 2.2.3.输出ItremWriter 2.3.Step 2.4.Job 2.5.运行 三.监听List

  • Spring Batch批处理框架使用解析

    这篇文章主要介绍了Spring Batch批处理框架使用解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 使用Spring Batch做为批处理框架,可以完成常规的数据量不是特别大的离线计算. 现在写一个简单的入门版示例. 这里默认大家已经掌握了Spring Batch的基本知识,示例只是为了快速上手实践 目标1:程序随机生成字符串,经过Spring Batch后,统一在字符串后加入"----PROCESSED",并输出 目标2:程

  • Spring batch批处理框架

    spring batch框架的简介 批处理任务是大多数IT项目的一个重要组成部分,批处理在业务系统中负责处理海量的数据,无须人工干预就能够自动高效的进行复杂的数据分析和处理.批处理会定期读入批量数据,经过相应的业务处理进行归档的业务操作,批处理的特征是自动执行,处理的数据量大,定时执行.将整个批处理的流程按逻辑划分可以分为读数据,处理数据和写数据. spring batch对批处理本身的特性进行了抽象,将批处理作业抽象为job和job step,将批处理的处理过程分解为数据读,数据处理和数据写.

  • Spring Batch轻量级批处理框架实战

    目录 1 实战前的理论基础 1.1 Spring Batch是什么 1.2 Spring Batch能做什么 1.3 基础架构 1.4 核心概念和抽象 2 各个组件介绍 2.1 Job 2.2 Step 2.3 ExecutionContext 2.4 JobRepository 2.5 JobLauncher 2.6 Item Reader 2.7 Item Writer 2.8 Item Processor 3 Spring Batch实战 3.1 依赖和项目结构以及配置文件 3.2 代码和

  • Spring Batch批处理框架操作指南

    目录 简介 Spring Batch 架构 Spring Batch 核心概念 什么是 Job 什么是 JobInstance 什么是 JobParameters 什么是 JobExecution 什么是 Step 什么是 StepExecution 什么是 ExecutionContext 什么是 JobRepository 什么是 JobLauncher 什么是 Item Reader 什么是 Item Writer 什么是 Item Processor chunk 处理流程 skip 策略

  • 浅谈Spring Batch在大型企业中的最佳实践

    在大型企业中,由于业务复杂.数据量大.数据格式不同.数据交互格式繁杂,并非所有的操作都能通过交互界面进行处理.而有一些操作需要定期读取大批量的数据,然后进行一系列的后续处理.这样的过程就是"批处理". 批处理应用通常有以下特点: 数据量大,从数万到数百万甚至上亿不等: 整个过程全部自动化,并预留一定接口进行自定义配置: 这样的应用通常是周期性运行,比如按日.周.月运行: 对数据处理的准确性要求高,并且需要容错机制.回滚机制.完善的日志监控等. 什么是Spring batch Sprin

  • 详解spring batch的使用和定时器Quart的使用

    spring Batch是一个基于Spring的企业级批处理框架,它通过配合定时器Quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理. 在使用spring batch之前,得对spring batch的流程有一个基本了解 每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是ItemReader用来读取数据的,一个是ItemProcessor

  • 解决Spring Batch框架job任务只跑一次的问题

    目录 Spring Batch job任务只跑一次 出现原因 解决方法 job的启动,停止,放弃 1.启动一个job 2.停止一个job 3.放弃一个job 4.失败一个job 5.结束一个job Spring Batch job任务只跑一次 在一次实际使用spring batch的过程中,在定时任务中,第一次执行Job没有出现问题,然后再次执行时不会执行job任务: 出现原因 针对这种异常需要明确Job Instance的概念,Job Instance 是由Job的名称和执行该job的参数组成

随机推荐