基于Spring Batch向Elasticsearch批量导入数据示例

1.介绍

当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。

2.示例

2.1 pom.xml

本文使用spring data jest连接ES(也可以使用spring data elasticsearch连接ES),ES版本为5.5.3

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.hfcsbc.estl</groupId>
  <artifactId>es-etl</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>es-etl</name>
  <description>Demo project for Spring Boot</description>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.0.M7</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
    </dependency>

    <dependency>
      <groupId>com.github.vanroy</groupId>
      <artifactId>spring-boot-starter-data-jest</artifactId>
      <version>3.0.0.RELEASE</version>
    </dependency>

    <dependency>
      <groupId>io.searchbox</groupId>
      <artifactId>jest</artifactId>
      <version>5.3.2</version>
    </dependency>

    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>

  <repositories>
    <repository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
    <repository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>

  </repositories>

  <pluginRepositories>
    <pluginRepository>
      <id>spring-snapshots</id>
      <name>Spring Snapshots</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </pluginRepository>
    <pluginRepository>
      <id>spring-milestones</id>
      <name>Spring Milestones</name>
      <url>https://repo.spring.io/milestone</url>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </pluginRepository>
  </pluginRepositories>
</project>

2.2 实体类及repository

package com.hfcsbc.esetl.domain;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;

/**
 * Create by pengchao on 2018/2/23
 */
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
  @Id
  private Long id;
  private String name;
  @OneToOne
  @Field(type = FieldType.Nested)
  private Address address;
}
package com.hfcsbc.esetl.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
 * Create by pengchao on 2018/2/23
 */
@Entity
@Data
public class Address {
  @Id
  private Long id;
  private String name;
}
package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;
/**
 * Create by pengchao on 2018/2/23
 */
public interface PersonRepository extends JpaRepository<Person, Long> {
}
package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
 * Create by pengchao on 2018/2/23
 */
public interface EsPersonRepository extends ElasticsearchRepository<Person, Long> {
}

2.3 配置elasticsearchItemWriter

package com.hfcsbc.esetl.itemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
 * Create by pengchao on 2018/2/23
 */
public class ElasticsearchItemWriter implements ItemWriter<Person>, ItemWriteListener<Person>, StepExecutionListener {

  private EsPersonRepository personRepository;

  public ElasticsearchItemWriter(EsPersonRepository personRepository) {
    this.personRepository = personRepository;
  }

  @Override
  public void beforeWrite(List<? extends Person> items) {

  }

  @Override
  public void afterWrite(List<? extends Person> items) {

  }

  @Override
  public void onWriteError(Exception exception, List<? extends Person> items) {

  }

  @Override
  public void beforeStep(StepExecution stepExecution) {

  }

  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    return null;
  }

  @Override
  public void write(List<? extends Person> items) throws Exception {
    //实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引
    personRepository.saveAll(items);
  }
}

2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)

package com.hfcsbc.esetl.itemReader;
import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import java.util.Iterator;
/**
 * Create by pengchao on 2018/2/24
 */
public class ElasticsearchItemReader<Person> extends AbstractPaginatedDataItemReader<Person> implements InitializingBean {

  private final ElasticsearchOperations elasticsearchOperations;

  private final SearchQuery query;

  private final Class<? extends Person> targetType;

  public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends Person> targetType) {
    this.elasticsearchOperations = elasticsearchOperations;
    this.query = query;
    this.targetType = targetType;
  }

  @Override
  protected Iterator<Person> doPageRead() {
    return (Iterator<Person>)elasticsearchOperations.queryForList(query, targetType).iterator();
  }

  @Override
  public void afterPropertiesSet() throws Exception {
  }
}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
/**
 * Create by pengchao on 2018/2/23
 */
@Configuration
@EnableBatchProcessing
public class BatchConfig {
  @Autowired
  private EsPersonRepository personRepository;

  @Bean
  public ItemReader<Person> orderItemReader(EntityManagerFactory entityManagerFactory){
    JpaPagingItemReader<Person> reader = new JpaPagingItemReader<Person>();
    String sqlQuery = "select * from person";
    try {
      JpaNativeQueryProvider<Person> queryProvider = new JpaNativeQueryProvider<Person>();
      queryProvider.setSqlQuery(sqlQuery);
      queryProvider.setEntityClass(Person.class);
      queryProvider.afterPropertiesSet();
      reader.setEntityManagerFactory(entityManagerFactory);
      reader.setPageSize(10000);
      reader.setQueryProvider(queryProvider);
      reader.afterPropertiesSet();
      reader.setSaveState(true);
    } catch (Exception e) {
      e.printStackTrace();
    }

    return reader;
  }

  @Bean
  public ElasticsearchItemWriter itemWriter(){
    return new ElasticsearchItemWriter(personRepository);
  }

  @Bean
  public Step step(StepBuilderFactory stepBuilderFactory,
           ItemReader itemReader,
           ItemWriter itemWriter){
    return stepBuilderFactory
        .get("step1")
        .chunk(10000)
        .reader(itemReader)
        .writer(itemWriter)
        .build();
  }

  @Bean
  public Job job(JobBuilderFactory jobBuilderFactory, Step step){
    return jobBuilderFactory
        .get("importJob")
        .incrementer(new RunIdIncrementer())
        .flow(step)
        .end()
        .build();
  }

  /**
   * spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
   * @param dataSource
   * @param manager
   * @return
   */
  @Bean
  public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource);
    jobRepositoryFactoryBean.setTransactionManager(manager);
    jobRepositoryFactoryBean.setDatabaseType("postgres");
    try {
      return jobRepositoryFactoryBean.getObject();
    } catch (Exception e) {
      e.printStackTrace();
    }
    return null;
  }
}

2.6配置数据库及es的连接地址

spring:
 redis:
  host: 192.168.1.222
 data:
  jest:
   uri: http://192.168.1.222:9200
   username: elastic
   password: changeme

 jpa:
  database: POSTGRESQL
  show-sql: true
  hibernate:
   ddl-auto: update

 datasource:
  platform: postgres
  url: jdbc:postgresql://192.168.1.222:5433/person
  username: hfcb
  password: hfcb
  driver-class-name: org.postgresql.Driver
  max-active: 2

spring.batch.initialize-schema: always

2.7 配置入口类

package com.hfcsbc.esetl;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {

  public static void main(String[] args) {
    SpringApplication.run(EsEtlApplication.class, args);
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

您可能感兴趣的文章:

  • Spring Boot与Kotlin 整合全文搜索引擎Elasticsearch的示例代码
  • Spring Boot整合Elasticsearch实现全文搜索引擎案例解析
  • Spring Boot集成ElasticSearch实现搜索引擎的示例
  • SpringBoot整合ElasticSearch的示例代码
  • 详解spring-boot集成elasticsearch及其简单应用
  • 详解spring中使用Elasticsearch的代码实现
  • SpringBoot整合ElasticSearch实践
  • spring 操作elasticsearch查询使用方法
(0)

相关推荐

  • 详解spring-boot集成elasticsearch及其简单应用

    介绍 记录将elasticsearch集成到spring boot的过程,以及一些简单的应用和helper类使用. 接入方式 使用spring-boot中的spring-data-elasticsearch,可以使用两种内置客户端接入 1.节点客户端(node client): 配置文件中设置为local:false,节点客户端以无数据节点(node-master或node-client)身份加入集群,换言之,它自己不存储任何数据,但是它知道数据在集群中的具体位置,并且能够直接转发请求到对应的节

  • Spring Boot整合Elasticsearch实现全文搜索引擎案例解析

    简单说,ElasticSearch(简称 ES)是搜索引擎,是结构化数据的分布式搜索引擎.Elastic Search是一个开源的,分布式,实时搜索和分析引擎.Spring Boot为Elasticsearch及Spring Data Elasticsearch提供的基于它的抽象提供了基本的配置.Spring Boot提供了一个用于聚集依赖的spring-boot-starter-data-elasticsearch 'StarterPOM'. 引入spring-boot-starter-dat

  • Spring Boot与Kotlin 整合全文搜索引擎Elasticsearch的示例代码

    Elasticsearch 在全文搜索里面基本是无敌的,在大数据里面也很有建树,完全可以当nosql(本来也是nosql)使用. 这篇文章简单介绍Spring Boot使用Kotlin语言连接操作 Elasticsearch.但是不会做很详细的介绍,如果要深入了解Elasticsearch在Java/kotlin中的使用,请参考我之前编写的<Elasticsearch Java API 手册> https://gitee.com/quanke/elasticsearch-java/ 里面包含使

  • SpringBoot整合ElasticSearch的示例代码

    ElasticSearch作为基于Lucene的搜索服务器,既可以作为一个独立的服务部署,也可以签入Web应用中.SpringBoot作为Spring家族的全新框架,使得使用SpringBoot开发Spring应用变得非常简单.本文要介绍如何整合ElasticSearch与SpringBoot. 实体设计: 每一本书(Book)都属于一个分类(Classify),都有一个作者(Author). 生成这个三个实体类,并实现其get和set方法. SpringBoot配置修改: 1.修改pom.xm

  • 详解spring中使用Elasticsearch的代码实现

    在使用Elasticsearch之前,先给大家聊一点干货. 1.      ES和solr都是作为全文搜索引擎出现的.都是基于Lucene的搜索服务器. 2.   ES不是可靠的存储系统,不是数据库,它有丢数据的风险. 3.  ES不是实时系统,数据写入成功只是trans log成功(类似于MySQL的bin log),写入成功后立刻查询查不到是正常的.因为数据此刻可能还在内存里而不是进入存储引擎里.同理,删除一条数据后也不是马上消失.写入何时可查询?ES内部有一个后台线程,定时将内存中的一批数

  • Spring Boot集成ElasticSearch实现搜索引擎的示例

    Elastic Search是一个开源的,分布式,实时搜索和分析引擎.Spring Boot为Elasticsearch及Spring Data Elasticsearch提供的基于它的抽象提供了基本的配置.Spring Boot提供了一个用于聚集依赖的spring-boot-starter-data-elasticsearch 'StarterPOM'. ElasticSearch作为搜索引擎,我们需要解决2大问题: 1,  如何将被搜索的数据在ES上创建反向索引 2,  Java代码如何与E

  • spring 操作elasticsearch查询使用方法

    最近学习了一下elasticsearch使用,网上的资料又很少,真是一个头两个大.好歹最后终于了解了.留个笔记做日后查询. package com.gooddeep.dev.elasticsearch.commons.dao; import java.util.ArrayList; import java.util.List; import java.util.Map; import org.elasticsearch.action.ActionFuture; import org.elasti

  • SpringBoot整合ElasticSearch实践

    本节我们基于一个发表文章的案例来说明SpringBoot如何elasticsearch集成.elasticsearch本身可以是一个独立的服务,也可以嵌入我们的web应用中,在本案例中,我们讲解如何将elasticsearch嵌入我们的应用中. 案例背景:每个文章(Article)都要属于一个教程(Tutorial),而且每个文章都要有一个作者(Author). 一.实体设计: Tutorial.java public class Tutorial implements Serializable

  • 基于Spring Batch向Elasticsearch批量导入数据示例

    1.介绍 当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率.Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据.由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入. 2.示例 2.1 pom.xml 本文使用spr

  • python批量导入数据进Elasticsearch的实例

    ES在之前的博客已有介绍,提供很多接口,本文介绍如何使用python批量导入.ES官网上有较多说明文档,仔细研究并结合搜索引擎应该不难使用. 先给代码 #coding=utf-8 from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [] f=open('index.txt') i=

  • python Django批量导入数据

    前言: 这期间有研究了Django网页制作过程中,如何将数据批量导入到数据库中. 这个过程真的是惨不忍睹,犯了很多的低级错误,这会在正文中说到的.再者导入数据用的是py脚本,脚本内容参考至自强学堂--中级教程--数据导入.  注:本文主要介绍自己学习的经验总结,而非教程! 正文:首先说明采用Django中bulk_create()函数来实现数据批量导入功能,为什么会选择它呢? 1 bulk_create()是执行一条SQL存入多条数据,使得导入速度更快; 2 bulk_create()减少了SQ

  • asp.net线程批量导入数据时通过ajax获取执行状态

    前言 最近因为工作中遇到一个需求,需要做了一个批量导入功能,但长时间运行没个反馈状态,很容易让人看了心急,产生各种臆想!为了解决心里障碍,写了这么个功能. 通过线程执行导入,并把正在执行的状态存入session,既共享执行状态,通过ajax调用session里的执行状态,从而实现反馈导入状态的功能! 上代码: 前端页面 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF

  • Django框架利用ajax实现批量导入数据功能

    本文实例为大家分享了网页中利用ajax实现批量导入数据功能的实现方法,供大家参考,具体内容如下 url.py代码: 复制代码 代码如下: url(r'^workimport/$', 'keywork.views.import_keywork', name='import_keywork') view.py代码: from keywork.models import DevData from django.http import JsonResponse #django ajax部分 def im

  • php实现refresh刷新页面批量导入数据的方法

    本文实例讲述了php实现refresh刷新页面批量导入数据的方法.分享给大家供大家参考.具体分析如下: 这个功能是参考dedecms生成html页面的原理,只是dedecms使用的是js跳转而我使用的是refresh进行跳转,效果是一样的,下面我们一起来看一个php实现批量导入数据的方法. 因为我有1000W数据一次导入数据库肯定是不可行了,所以我就导致了每一次导入50条或更多数据,然后下次再刷新一次这样就可以解决这个问题了,代码如下: 复制代码 代码如下: <?php set_time_lim

  • layui 上传文件_批量导入数据UI的方法

    使用layui的文件上传组件,可以方便的弹出文件上传界面. 效果如下: 点击[批量导入]按钮调用js脚本importData(config)就可以实现数据上传到服务器. 脚本: /*** * 批量导入 * config.downUrl 下载模板url * config.uploadUrl 上传文件url * config.msg * config.done 上传结束后执行. */ function importData(config){ var default_config = { msg:"数

  • python3实现elasticsearch批量更新数据

    废话不多说,直接上代码! updateBody = { "query":{ "range":{ "write_date": { "gte": "2019-01-15 12:30:17", "lte": "now" } } }, "script": { "inline": "ctx._source.index = par

  • Java API如何实现向Hive批量导入数据

    Java API实现向Hive批量导入数据 Java程序中产生的数据,如果导入oracle或者mysql库,可以通过jdbc连接insert批量操作完成,但是当前版本的hive并不支持批量insert操作,因为需要先将结果数据写入hdfs文件,然后插入Hive表中. package com.enn.idcard; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; impor

  • 小程序中实现excel数据的批量导入的示例代码

    目录 1 建立数据源 2 编制入库的代码 2.1 创建低码方法 2.2 低码中调用连接器 3 最终的代码 4 总结 我们上一篇介绍了如何利用微搭的自定义连接器接入腾讯文档的数据,光有接入是不够的,更重要的是我们需要将采集的数据积累下来,变成企业的数字资产. 积累数据最好的方式就是把数据存入数据库,低码工具除了有可视化编程的便利外,还提供了线上的文档型数据库.文档数据库比传统数据库的优势是,文档数据库的返回结构是JSON格式,直接就可以在前端进行渲染.关系型数据库还得通过代码进行转译. 另外一个方

随机推荐