利用Java多线程技术导入数据到Elasticsearch的方法步骤

前言

近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低。所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高)。

关键技术栈

  • Elasticsearch
  • jdbc
  • ExecutorService\Thread
  • sql

工具说明

maven依赖

<dependency>
 <groupId>mysql</groupId>
 <artifactId>mysql-connector-java</artifactId>
 <version>${mysql.version}</version>
</dependency>
<dependency>
 <groupId>org.elasticsearch</groupId>
 <artifactId>elasticsearch</artifactId>
 <version>${elasticsearch.version}</version>
</dependency>
<dependency>
 <groupId>org.elasticsearch.client</groupId>
 <artifactId>transport</artifactId>
 <version>${elasticsearch.version}</version>
</dependency>
<dependency>
 <groupId>org.projectlombok</groupId>
 <artifactId>lombok</artifactId>
 <version>${lombok.version}</version>
</dependency>
<dependency>
 <groupId>com.alibaba</groupId>
 <artifactId>fastjson</artifactId>
 <version>${fastjson.version}</version>
</dependency>

java线程池设置

默认线程池大小为21个,可调整。其中POR为处理流程已办数据线程池,ROR为处理流程已阅数据线程池。

private static int THREADS = 21;
public static ExecutorService POR = Executors.newFixedThreadPool(THREADS);
public static ExecutorService ROR = Executors.newFixedThreadPool(THREADS);

定义已办生产者线程/已阅生产者线程:ZlPendProducer/ZlReadProducer

public class ZlPendProducer implements Runnable {
 ...
 @Override
 public void run() {
 System.out.println(threadName + "::启动...");
 for (int j = 0; j < Const.TBL.TBL_PEND_COUNT; j++)
 try {
 ....
 int size = 1000;
 for (int i = 0; i < count; i += size) {
 if (i + size > count) {
 //作用为size最后没有100条数据则剩余几条newList中就装几条
 size = count - i;
 }
 String sql = "select * from " + tableName + " limit " + i + ", " + size;
 System.out.println(tableName + "::sql::" + sql);
 rs = statement.executeQuery(sql);
 List<HistPendingEntity> lst = new ArrayList<>();
 while (rs.next()) {
 HistPendingEntity p = PendUtils.getHistPendingEntity(rs);
 lst.add(p);
 }
 MteExecutor.POR.submit(new ZlPendConsumer(lst));
 Thread.sleep(2000);
 }
 ....
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
}
public class ZlReadProducer implements Runnable {
 ...已阅生产者处理逻辑同已办生产者
}

定义已办消费者线程/已阅生产者线程:ZlPendConsumer/ZlReadConsumer

public class ZlPendConsumer implements Runnable {
 private String threadName;
 private List<HistPendingEntity> lst;
 public ZlPendConsumer(List<HistPendingEntity> lst) {
 this.lst = lst;
 }
 @Override
 public void run() {
 ...
 lst.forEach(v -> {
 try {
 String json = new Gson().toJson(v);
 EsClient.addDataInJSON(json, Const.ES.HistPendDB_Index, Const.ES.HistPendDB_type, v.getPendingId(), null);
 Const.COUNTER.LD_P.incrementAndGet();
 } catch (Exception e) {
 e.printStackTrace();
 System.out.println("err::PendingId::" + v.getPendingId());
 }
 });
 ...
 }
}
public class ZlReadConsumer implements Runnable {
 //已阅消费者处理逻辑同已办消费者
}

定义导入Elasticsearch数据监控线程:Monitor

监控线程-Monitor为了计算每分钟导入Elasticsearch的数据总条数,利用监控线程,可以调整线程池的线程数的大小,以便利用多线程更快速的导入数据。

public void monitorToES() {
 new Thread(() -> {
 while (true) {
 StringBuilder sb = new StringBuilder();
 sb.append("已办表数::").append(Const.TBL.TBL_PEND_COUNT)
 .append("::已办总数::").append(Const.COUNTER.LD_P_TOTAL)
 .append("::已办入库总数::").append(Const.COUNTER.LD_P);
 sb.append("~~~~已阅表数::").append(Const.TBL.TBL_READ_COUNT);
 sb.append("::已阅总数::").append(Const.COUNTER.LD_R_TOTAL)
 .append("::已阅入库总数::").append(Const.COUNTER.LD_R);
 if (ldPrevPendCount == 0 && ldPrevReadCount == 0) {
 ldPrevPendCount = Const.COUNTER.LD_P.get();
 ldPrevReadCount = Const.COUNTER.LD_R.get();
 start = System.currentTimeMillis();
 } else {
 long end = System.currentTimeMillis();
 if ((end - start) / 1000 >= 60) {
 start = end;
 sb.append("\n#########################################\n");
 sb.append("已办每分钟TPS::" + (Const.COUNTER.LD_P.get() - ldPrevPendCount) + "条");
 sb.append("::已阅每分钟TPS::" + (Const.COUNTER.LD_R.get() - ldPrevReadCount) + "条");
 ldPrevPendCount = Const.COUNTER.LD_P.get();
 ldPrevReadCount = Const.COUNTER.LD_R.get();
 }
 }
 System.out.println(sb.toString());
 try {
 Thread.sleep(3000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
 }).start();
}

初始化Elasticsearch:EsClient

String cName = meta.get("cName");//es集群名字
String esNodes = meta.get("esNodes");//es集群ip节点
Settings esSetting = Settings.builder()
 .put("cluster.name", cName)
 .put("client.transport.sniff", true)//增加嗅探机制,找到ES集群
 .put("thread_pool.search.size", 5)//增加线程池个数,暂时设为5
 .build();
String[] nodes = esNodes.split(",");
client = new PreBuiltTransportClient(esSetting);
for (String node : nodes) {
 if (node.length() > 0) {
 String[] hostPort = node.split(":");
 client.addTransportAddress(new TransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
 }
}

初始化数据库连接

conn = DriverManager.getConnection(url, user, password); 

启动参数

nohup java -jar mte.jar ES-Cluster2019 node1:9300,node2:9300,node3:9300 root 123456! jdbc:mysql://ip:3306/mte 130 130 >> ./mte.log 2>&1 & 

参数说明

ES-Cluster2019 为Elasticsearch集群名字

node1:9300,node2:9300,node3:9300为es的节点IP

130 130为已办已阅分表的数据

程序入口:MteMain

// 监控线程
Monitor monitorService = new Monitor();
monitorService.monitorToES();
// 已办生产者线程
Thread pendProducerThread = new Thread(new ZlPendProducer(conn, "ZlPendProducer"));
pendProducerThread.start();
// 已阅生产者线程
Thread readProducerThread = new Thread(new ZlReadProducer(conn, "ZlReadProducer"));
readProducerThread.start();

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • 使用java操作elasticsearch的具体方法

    系统环境: vm12 下的centos 7.2 当前安装版本: elasticsearch-2.4.0.tar.gz Java操作es集群步骤1:配置集群对象信息:2:创建客户端:3:查看集群信息 1:集群名称 默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错. 2:嗅探功能 通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以

  • java 使用ElasticSearch完成百万级数据查询附近的人功能

    上一篇文章介绍了ElasticSearch使用Repository和ElasticSearchTemplate完成构建复杂查询条件,简单介绍了ElasticSearch使用地理位置的功能. 这一篇我们来看一下使用ElasticSearch完成大数据量查询附近的人功能,搜索N米范围的内的数据. 准备环境 本机测试使用了ElasticSearch最新版5.5.1,SpringBoot1.5.4,spring-data-ElasticSearch2.1.4. 新建Springboot项目,勾选Elas

  • JAVA使用ElasticSearch查询in和not in的实现方式

    ElasticSearch Elasticsearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口.Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎.设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便. 最近用到ES查询,因用的是Java写的,需要实现一个需求:过滤一部分id,查询时不需要查出来. 既然需要不包含,那么首先需要实现包含的方式(精确完

  • 基于Lucene的Java搜索服务器Elasticsearch安装使用教程

    一.安装Elasticsearch Elasticsearch下载地址:http://www.elasticsearch.org/download/ ·下载后直接解压,进入目录下的bin,在cmd下运行elasticsearch.bat 即可启动Elasticsearch ·用浏览器访问: http://localhost:9200/   ,如果出现类似如下结果则说明安装成功: { "name" : "Benedict Kine", "cluster_na

  • 利用Java多线程技术导入数据到Elasticsearch的方法步骤

    前言 近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低.所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高). 关键技术栈 Elasticsearch jdbc ExecutorService\Thread sql 工具说明

  • Django框架利用ajax实现批量导入数据功能

    本文实例为大家分享了网页中利用ajax实现批量导入数据功能的实现方法,供大家参考,具体内容如下 url.py代码: 复制代码 代码如下: url(r'^workimport/$', 'keywork.views.import_keywork', name='import_keywork') view.py代码: from keywork.models import DevData from django.http import JsonResponse #django ajax部分 def im

  • python批量导入数据进Elasticsearch的实例

    ES在之前的博客已有介绍,提供很多接口,本文介绍如何使用python批量导入.ES官网上有较多说明文档,仔细研究并结合搜索引擎应该不难使用. 先给代码 #coding=utf-8 from datetime import datetime from elasticsearch import Elasticsearch from elasticsearch import helpers es = Elasticsearch() actions = [] f=open('index.txt') i=

  • Java多线程下解决数据安全问题

    目录 同步代码块 同步方法 lock锁 同步代码块 基本语句 synchronized (任意对象) { 操作共享代码 } 代码示例 public class SellTicket implements Runnable { private int tickets = 100; private Object object = new Object(); @Override public void run() { while (true) { synchronized (object) { if

  • Java 多线程之间共享数据

    目录 1.线程范围的共享变量 2.使用Map实现线程范围内数据的共享 3.ThreadLocal实现线程范围内数据的共享 4.优化 5.实例 1.线程范围的共享变量 多个业务模块针对同一个static变量的操作 要保证在不同线程中 各模块操作的是自身对应的变量对象 public class ThreadScopeSharaData { private static int data = 0 ; public static void main(String[] args) { for(int i

  • Java多线程实现第三方数据同步

    本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下 一.场景 最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+.在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换:增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换.在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!! 二.获取数据的方式 2.1 递归方式

  • JAVA多线程之中断机制及处理中断的方法

    目录 一,介绍 二,中断及如何响应中断? 一,介绍 这篇文章主要记录使用 interrupt() 方法中断线程,以及如何对InterruptedException进行处理.感觉对InterruptedException异常进行处理是一件谨慎且有技巧的活儿. 由于使用stop()方法停止线程非常的暴力,人家线程运行的好好的,突然就把人家杀死了,线程占用的锁被强制释放,极易导致数据的不一致性.可参考这篇文章对stop()方法的介绍. 因此,提出了一种温和的方式:请求另外一个线程不要再执行了,这就是中

  • Java多线程之如何确定线程数的方法

    关于多线程的线程数的确定,最近研读过几篇paper,在此做一下笔记,方便使用时翻看. 1.<Java 虚拟机并发编程>中介绍 就是说:线程数 = CPU的核心数 * (1 - 阻塞系数) 另一篇:<Java Concurrency in Practice>即<java并发编程实践>,给出的线程池大小的估算公式: Nthreads=Ncpu*Ucpu*(1+w/c),其中 Ncpu=CPU核心数,Ucpu=cpu使用率,0~1:W/C=等待时间与计算时间的比率 仔细推敲两

  • Android Studio4.0导入OpenCv4.3.0的方法步骤

    1.准备环境 Android Studio4.0:官网下载:https://developer.android.google.cn/studio/ (Android Studio安装之前首先需要确认电脑上是否安装好JAVA环境,具体安装可以参考其他大佬,本文不作为重点) OpenCv4.3.0:官网下载:https://opencv.org/ 百度网盘:链接: https://pan.baidu.com/s/1aC2E_LT8yFkyAKgZhcNPbg 提取码: 7bk1 2.新建工程 双击打

  • java项目导出为.exe执行文件的方法步骤

    将java项目导出为.exe执行文件需要借助于第三方软件,本文我们选择jar2exe软件. 第一步:先安装jar2exe软件,安装直接选择默认步骤即可. 第二步:需要将项目导出为jar文件,请参考上一篇文章. 第三步:打开Jar2Exe Wizard 2.5 第四步:选择你要输出的jar文件全路径以及所使用的平台,运行时JRE的版本,建议选此软件支持的最低版本和最高版本即可. 第五步:选择是控制台程序.图形化界面或服务器程序 .本文所操作的项目时GUI的 第六步:选择运行的主类 第七步:对应字节

随机推荐