详解spring batch的使用和定时器Quart的使用

spring Batch是一个基于Spring的企业级批处理框架,它通过配合定时器Quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理。

在使用spring batch之前,得对spring batch的流程有一个基本了解

每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是ItemReader用来读取数据的,一个是ItemProcessor用来处理数据的,一个是ItemWriter用来写数据(可以是文件也可以是插入sql语句),JobLauncher用来启动Job,JobRepository是上述处理提供的一种持久化机制,它为JobLauncher,Job,和Step实例提供CRUD操作。

pom.xml  三个batch的jar包

<dependency>
     <groupId>org.springframework</groupId>
     <artifactId>spring-batch-core</artifactId>
     <version>2.1.8.RELEASE</version>
    </dependency> 

    <dependency>
     <groupId>org.springframework</groupId>
     <artifactId>spring-batch-infrastructure</artifactId>
     <version>2.1.8.RELEASE</version>
<dependency> 

     <dependency>
     <groupId>org.springframework</groupId>
     <artifactId>spring-batch-test</artifactId>
     <version>2.1.8.RELEASE</version>
    </dependency>  

batch.xml

<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://www.springframework.org/schema/batch
    http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
    http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
  "> 

  <bean id="jobLauncher"
    class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
  </bean> 

  <bean id="jobRepository"
    class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
    <property name="validateTransactionState" value="false" /> 

  </bean>
<!--一个job-->
    <batch:job id="writerteacherInterview">
        <batch:step id="teacherInterview">
          <batch:tasklet>
            <batch:chunk reader="jdbcItemReaderTeacherInterview" writer="teacherInterviewItemWriter"
              processor="teacherInterviewProcessor" commit-interval="10">
            </batch:chunk>
          </batch:tasklet>
        </batch:step>
      </batch:job> 

  <!--job的读取数据操作-->
  <bean id="jdbcItemReaderTeacherInterview"
    class="org.springframework.batch.item.database.JdbcCursorItemReader"
    scope="step">
    <property name="dataSource" ref="dataSource" />
    <property name="sql"
      value="select distinct teacherName ,count(teacherName) as num from examininterviewrecord  where pdate >'${detail_startime}' and pdate < '${detail_endtime}' GROUP BY teacherName " />
    <property name="rowMapper" ref="teacherInterviewMapper">
    </property>
  </bean> 

</beans> 

读取数据    teacherInterviewMapper

package com.yc.batch; 

import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;
import com.yc.vo.TeacherInterviewdetail;
import com.yc.vo.TeacherWorkdetail;
import com.yc.vo.Workdetail;
@Component("teacherInterviewMapper")
public class TeacherInterviewMapper implements RowMapper {
  @Override
  public Object mapRow(ResultSet rs, int rowNum) throws SQLException {  

    TeacherInterviewdetail TId=new TeacherInterviewdetail();
    TId.setTeacherName(rs.getString("teacherName"));
     TId.setNum(rs.getInt("num"));
    return TId;
  }
}

处理数据  teacherInterviewProcessor ,这个处理数据方法,一般都是在这里在这里进行一些数据的加工,比如有些数据没有读到,你也可以在这个方法和后面那个写入数据的类里面写,所以就导致了这个类里面你可以什么都不敢,直接把数据抛到后面去,让后面的写数据类来处理;我这里就是处理数据的这个类什么都没写,但是最好还是按它的规则来!

package com.yc.batch; 

import org.hibernate.engine.transaction.jta.platform.internal.SynchronizationRegistryBasedSynchronizationStrategy;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service; 

import com.yc.vo.TeacherInterviewdetail;
import com.yc.vo.TeacherWorkdetail;
import com.yc.vo.Workdetail; 

//业务层
@Component("teacherInterviewProcessor")
public class TeacherInterviewProcessor implements ItemProcessor<TeacherInterviewdetail, TeacherInterviewdetail> { 

  @Override
  public TeacherInterviewdetail process(TeacherInterviewdetail teacherInterviewdetail) throws Exception { 

    return teacherInterviewdetail;
  }
}

写数据 teacherInterviewItemWriter 这个类里面主要是把数据写进一个文件里,同时我这个类里面还有一些数据处理

package com.yc.batch; 

import java.io.InputStream; 

import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import javax.annotation.Resource;
import org.springframework.batch.item.ItemWriter;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import com.yc.biz.ExamineeClassBiz;
import com.yc.biz.WorkBiz;
import com.yc.utils.CsvUtils;
import com.yc.vo.TeacherInterviewdetail;
import com.yc.vo.TeacherWorkdetail;
import com.yc.vo.Workdetail;
import net.sf.ehcache.util.PropertyUtil;
//写
@Component("teacherInterviewItemWriter")
public class TeacherInterviewItemWriter implements ItemWriter<TeacherInterviewdetail>{ 

  @Override
  public void write(List<? extends TeacherInterviewdetail> teacherInterviewdetails) throws Exception {
    Properties props = new Properties();
    InputStream in= PropertyUtil.class.getClassLoader().getResourceAsStream("connectionConfig.properties");
    props.load(in);
    String time=props.getProperty("detail_time");
    CsvUtils cu=new CsvUtils();
     List<Object> works=new ArrayList<Object>();
     for(TeacherInterviewdetail t:teacherInterviewdetails){
       works.add(t);
     } 

    String path=this.getClass().getResource("/").getPath();
    path=path.substring(0,path.lastIndexOf("/"));
    path=path.substring(0,path.lastIndexOf("/"));
    path=path.substring(0,path.lastIndexOf("/"));
    path=path.substring(0,path.lastIndexOf("/"));
    cu.writeCsv(path+"/csv/teacherInterview_"+time+".csv",works );
  }
}

我这里有用到一个吧数据写进CSV文件的jar包

<dependency>
     <groupId>net.sourceforge.javacsv</groupId>
     <artifactId>javacsv</artifactId>
     <version>2.0</version>
    </dependency>

CsvUtils帮助类的写入CSV文件方法

/**
   * 写入CSV文件
   * @throws IOException
   */
  public void writeCsv(String path,List<Object> t) throws IOException{
    String csvFilePath = path;
    String filepath=path.substring(0,path.lastIndexOf("/"));
    File f=new File(filepath);
    if(!f.exists()){
      f.mkdirs();
    }
    File file=new File(path);
    if(!file.exists()){
      file.createNewFile();
    }
    CsvWriter wr =new CsvWriter(csvFilePath,',',Charset.forName("GBK"));
    try {
      for(Object obj:t){
        String[] contents=obj.toString().split(",");
        wr.writeRecord(contents);
      }
      wr.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

就这样一个基本的batch流程就跑起来了,它通过从数据里读取一些数据,然后经过处理后,被存进服务器下的一个文件里面,之后像这种数据的读取就不需要去数据库里面查询了,而是可以直接通过读取CSV文件来处理这个业务。一般使用这个的都会配一个定时器,让它们每隔一段时间跑一次,从而获得较新的数据

下面是定时器的配置

定时器的配置非常简单,我是使用注解方式来配置的

定时器任务类

package com.yc.task.impl;
import javax.transaction.Transactional;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service; 

import com.yc.batch.ClassBatch;
import com.yc.batch.MessageItemBatch;
import com.yc.batch.TeacherInterviewBatch;
import com.yc.batch.TearcherBatch;
import com.yc.po.Work;
import com.yc.task.WorkTask;
import com.yc.vo.Workdetail;
@Service
public class WorkTaskImpl implements WorkTask{ 

  @Autowired
  private TeacherInterviewBatch teacherInterviewBatch;//教师访谈记录
  public void setTeacherInterviewBatch(TeacherInterviewBatch teacherInterviewBatch) {
    this.teacherInterviewBatch = teacherInterviewBatch;
  } 

  @Scheduled(cron= "0 30 22 * * ?")  //每天晚上十点30执行一次 这个注解会让框架会自动把这个方法看成任务启动方法
  @Override
  public void task() {
    try {
      teacherInterviewBatch.test();//教师访谈
    } catch (Exception e) {
      e.printStackTrace();
    } 

  } 

}

定时器所真正要执行的方法

package com.yc.batch; 

import javax.annotation.Resource;
import org.apache.commons.jexl2.Main;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; 

@Component
public class TeacherInterviewBatch { 

  private Job job;
  private JobLauncher launcher; 

  @Resource(name="writerteacherInterview")
  public void setJob(Job job) {
    this.job = job;
  } 

  @Autowired
  public void setLauncher(JobLauncher launcher) {
    this.launcher = launcher;
  }

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Spring Batch入门教程篇

    SpringBatch介绍: SpringBatch 是一个大数据量的并行处理框架.通常用于数据的离线迁移,和数据处理,⽀持事务.并发.流程.监控.纵向和横向扩展,提供统⼀的接⼝管理和任务管理;SpringBatch是SpringSource和埃森哲为了统一业界并行处理标准为广大开发者提供方便开发的一套框架. 官方地址:github.com/spring-projects/spring-batch SpringBatch 本身提供了重试,异常处理,跳过,重启.任务处理统计,资源管理等特性,这些特

  • spring batch 读取多个文件数据导入数据库示例

    项目的目录结构 需要读取文件的的数据格式 applicatonContext.xml的配置 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p=

  • Spring Batch读取txt文件并写入数据库的方法教程

    项目需求 近日需要实现用户推荐相关的功能,也就是说向用户推荐他可能喜欢的东西. 我们的数据分析工程师会将用户以及用户可能喜欢的东西整理成文档给我,我只需要将数据从文档中读取出来,然后对数据进行进一步的清洗(例如去掉特殊符号,长度如果太长则截取).然后将处理后的数据存入数据库(Mysql). 所以分为三步: 读取文档获得数据 对获得的数据进行处理 更新数据库(新增或更新) 考虑到这个数据量以后会越来越大,这里没有使用 poi 来读取数据,而直接使用了 SpringBatch. 实现步骤 本文假设读

  • Spring batch批处理框架

    spring batch框架的简介 批处理任务是大多数IT项目的一个重要组成部分,批处理在业务系统中负责处理海量的数据,无须人工干预就能够自动高效的进行复杂的数据分析和处理.批处理会定期读入批量数据,经过相应的业务处理进行归档的业务操作,批处理的特征是自动执行,处理的数据量大,定时执行.将整个批处理的流程按逻辑划分可以分为读数据,处理数据和写数据. spring batch对批处理本身的特性进行了抽象,将批处理作业抽象为job和job step,将批处理的处理过程分解为数据读,数据处理和数据写.

  • 详解spring batch的使用和定时器Quart的使用

    spring Batch是一个基于Spring的企业级批处理框架,它通过配合定时器Quartz来轻易实现大批量的数据读取或插入,并且全程自动化,无需人员管理. 在使用spring batch之前,得对spring batch的流程有一个基本了解 每个batch它都包含了一个job,而一个job中却有可能包含多个step,整个batch中干活的是step,batch主要是用来对数据的操作,所以step就有三个操作数据的东西,一个是ItemReader用来读取数据的,一个是ItemProcessor

  • 详解Spring batch 入门学习教程(附源码)

    Spring batch 是一个开源的批处理框架.执行一系列的任务. 在 spring batch 中 一个job 是由许多 step 组成的.而每一个 step  又是由 READ-PROCESS-WRITE task或者 单个 task 组成. 1. "READ-PROCESS-WRITE" 处理,根据字面意思理解就可以: READ 就是从资源文件里面读取数据,比如从xml文件,csv文件,数据库中读取数据. PROCESS 就是处理读取的数据 WRITE 就是将处理过的数据写入到

  • 详解Spring Batch 轻量级批处理框架实践

    实践内容 从 MariaDB 一张表内读 10 万条记录,经处理后写到 MongoDB . 具体实现 1.新建 Spring Boot 应用,依赖如下: <!-- Web 应用 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> &

  • 详解Spring的核心机制依赖注入

    详解Spring的核心机制依赖注入 对于一般的Java项目,他们都或多或少有一种依赖型的关系,也就是由一些互相协作的对象构成的.Spring把这种互相协作的关系称为依赖关系.如A组件调用B组件的方法,可称A组件依赖于B组件,依赖注入让Spring的Bean以配置文件组织在一起,而不是以硬编码的方式耦合在一起 一.理解依赖注入 依赖注入(Dependency Injection) = 控制反转(Inversion ofControl,IoC):当某个Java实例(调用者)需另一个Java实例(被调

  • 详解 Spring注解的(List&Map)特殊注入功能

    详解 Spring注解的(List&Map)特殊注入功能 最近接手一个新项目,已经没有原开发人员维护了.项目框架是基于spring boot进行开发.其中有两处Spring的注解花费了大量的时间才弄明白到底是怎么用的,这也涉及到spring注解的一个特殊的注入功能. 首先,看到代码中有直接注入一个List和一个Map的.示例代码如下: @Autowired private List<DemoService> demoServices; @Autowired private Map<

  • 详解spring boot starter redis配置文件

    spring-boot-starter-Redis主要是通过配置RedisConnectionFactory中的相关参数去实现连接redis service. RedisConnectionFactory是一个接口,有如下4个具体的实现类,我们通常使用的是JedisConnectionFactory. 在spring boot的配置文件中redis的基本配置如下: # Redis服务器地址 spring.redis.host=192.168.0.58 # Redis服务器连接端口 spring.

  • 详解Spring mvc ant path的使用方法

    详解Spring mvc ant path的使用方法 概要: 任何一个WEB都需要解决URL与请求处理器之间的映射,spring MVC也是一样,但Spring MVC就像Spring所作的一切一样(灵活,可以配置各种东西,但是也造成了很多复杂性),肯定不会只有一种方法来映射URL和 Controller之间的关系,并且在实际上,允许你自己创建映射规则和实现,而不仅仅依赖URL映射. 1.Spring path match Spring MVC中的路径匹配要比标准的web.xml要灵活的多.默认

  • 详解Spring Controller autowired Request变量

    详解Spring Controller autowired Request变量 spring的DI大家比较熟悉了,对于依赖注入的实现也无须赘述. 那么spring的bean的默认scope为singleton,对于controller来说每次方法中均可以获得request还是比较有意思的. 对于方法参数上的request通过构建方法的参数可以获得最新的request public final Object invokeForRequest(NativeWebRequest request, Mo

  • 详解Spring Hibernate连接oracle数据库的配置

    详解Spring Hibernate连接oracle数据库的配置 jdbc.properties文件配置如下  driverClassName=oracle.jdbc.driver.OracleDriver url=jdbc\:oracle\:thin\:@localhost\:1521\: database=OA username=oa password=oa initialSize=2 maxActive=10 maxIdle=2 minIdle=2 removeAbandoned=true

  • 详解Spring data 定义默认时间与日期的实例

    详解Spring data 定义默认时间与日期的实例 前言: 需求是这样的: 1. 创建时间与更新时间只能由数据库产生,不允许在实体类中产生,因为每个节点的时间/时区不一定一直.另外防止人为插入自定义时间时间. 2. 插入记录的时候创建默认时间,创建时间不能为空,时间一旦插入不允许日后在实体类中修改. 3. 记录创建后更新日志字段为默认为 null 表示该记录没有被修改过.一旦数据被修改,修改日期字段将记录下最后的修改时间. 4. 甚至你可以通过触发器实现一个history 表,用来记录数据的历

随机推荐