Java多线程批量数据导入的方法详解

前言:

当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:

  • 大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)
  • 数据同步(从第三方接口拉取数据处理后写入自己的数据库)

以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步

  • 数据读取:从数据源读取数据到内存
  • 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理

而且根据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢。

设计思路

由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:

  • 批量读取数据
  • 多线程写入数据

示例

多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入。

模拟服务

import java.util.concurrent.atomic.AtomicLong;
/**
* 数据批量写入用的模拟服务
*
* @author RJH
* create at 2019-04-01
*/
public class MockService {
/**
* 可读取总数
*/
private long canReadTotal;
/**
* 写入总数
*/
private AtomicLong writeTotal=new AtomicLong(0);
/**
* 写入休眠时间(单位:毫秒)
*/
private final long sleepTime;
/**
* 构造方法
*
* @param canReadTotal
* @param sleepTime
*/
public MockService(long canReadTotal, long sleepTime) {
this.canReadTotal = canReadTotal;
this.sleepTime = sleepTime;
}
/**
* 批量读取数据接口
*
* @param num
* @return
*/
public synchronized long readData(int num) {
long readNum;
if (canReadTotal >= num) {
canReadTotal -= num;
readNum = num;
} else {
readNum = canReadTotal;
canReadTotal = 0;
}
//System.out.println("read data size:" + readNum);
return readNum;
}
/**
* 写入数据接口
*/
public void writeData() {
try {
// 休眠一定时间模拟写入速度慢
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 写入总数自增
System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
}
/**
* 获取写入的总数
*
* @return
*/
public long getWriteTotal() {
return writeTotal.get();
}
}

批量数据处理器

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基于线程池的多线程批量写入处理器
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandler {
private ExecutorService executorService;
private MockService service;
/**
* 每次批量读取的数据量
*/
private int batch;
/**
* 线程个数
*/
private int threadNum;
public SimpleBatchHandler(MockService service, int batch,int threadNum) {
this.service = service;
this.batch = batch;
//使用固定数目的线程池
this.executorService = Executors.newFixedThreadPool(threadNum);
}
/**
* 开始处理
*/
public void startHandle() {
// 开始处理的时间
long startTime = System.currentTimeMillis();
System.out.println("start handle time:" + startTime);
long readData;
while ((readData = service.readData(batch)) != 0) {// 批量读取数据,知道读取不到数据才停止
for (long i = 0; i < readData; i++) {
executorService.execute(() -> service.writeData());
}
}
// 关闭线程池
executorService.shutdown();
while (!executorService.isTerminated()) {//等待线程池中的线程执行完
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("end handle time:" + endTime);
// 总耗时
System.out.println("total handle time:" + (endTime - startTime) + "ms");
// 写入总数
System.out.println("total write num:" + service.getWriteTotal());
}
}

测试类

/**
* SimpleBatchHandler的测试类
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandlerTest {
public static void main(String[] args) {
// 总数
long total=100000;
// 休眠时间
long sleepTime=100;
// 每次拉取的数量
int batch=100;
// 线程个数
int threadNum=16;
MockService mockService=new MockService(total,sleepTime);
SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();
}
}

运行结果

start handle time:1554298681755
thread:Thread[pool-1-thread-2,5,main] write data:1
thread:Thread[pool-1-thread-1,5,main] write data:2
...省略部分输出
thread:Thread[pool-1-thread-4,5,main] write data:100000
end handle time:1554299330202
total handle time:648447ms
total write num:100000

分析

在单线程情况下的执行时间应该为total*sleepTime,即10000000ms,而改造为多线程后执行时间为648447ms。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java实现批量导入excel表格数据到数据库中的方法

    本文实例讲述了Java实现批量导入excel表格数据到数据库中的方法.分享给大家供大家参考,具体如下: 1.创建导入抽象类 package com.gcloud.common.excel; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintStream; import java.sql.SQLException;

  • Java多线程之volatile关键字及内存屏障实例解析

    前面一篇文章在介绍Java内存模型的三大特性(原子性.可见性.有序性)时,在可见性和有序性中都提到了volatile关键字,那这篇文章就来介绍volatile关键字的内存语义以及实现其特性的内存屏障. volatile是JVM提供的一种最轻量级的同步机制,因为Java内存模型为volatile定义特殊的访问规则,使其可以实现Java内存模型中的两大特性:可见性和有序性.正因为volatile关键字具有这两大特性,所以我们可以使用volatile关键字解决多线程中的某些同步问题. volatile

  • Java中excel表数据的批量导入方法

    本文实例为大家分享了Java中excel表数据的批量导入,供大家参考,具体内容如下 首先看下工具类: import java.awt.Color; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.lang.ref

  • 解决Java导入excel大量数据出现内存溢出的问题

    问题:系统要求导入40万条excel数据,采用poi方式,服务器出现内存溢出情况. 解决方法:由于HSSFWorkbook workbook = new HSSFWorkbook(path)一次性将excel load到内存中导致内存不够. 故采用读取csv格式.由于csv的数据以x1,x2,x3形成,类似读取txt文档. private BufferedReader bReader; /** * 执行文件入口 */ public void execute() { try { if(!path.

  • Java 实现多线程切换等待唤醒交替打印奇偶数

    引言 在日常工作生活中,可能会有用时几个人或是很多人干同一件事,在java编程中,同样也会出现类似的情况,多个线程干同样一个活儿,比如火车站买票系统不能多个人买一到的是同一张票,当某个窗口(线程)在卖某一张票的时候,别的窗口(线程)不允许再卖此张票了,在此过程中涉及到一个锁和资源等待的问题,如何合理正确的让线程与线程在干同一件事的过程中,不会抢资源以及一个一直等待一个一直干活的状况,接下来就聊一下多线程的等待唤醒以及切换的过程,在此就以A和B两个线程交替打印奇偶数的例子为例,代码如下: pack

  • Java多线程批量数据导入的方法详解

    前言: 当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理.常见的场景有: 大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理) 数据同步(从第三方接口拉取数据处理后写入自己的数据库) 以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步 数据读取:从数据源读取数据到内存 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理 而且根据读取的速度一般会比数据写入的

  • Java实现自定义Excel数据排序的方法详解

    目录 1.引入jar包 2.自定义排序 通常,我们可以在Excel中对指定列数据执行升序或者降序排序,排序时可依据单元格中的数值.单元格颜色.字体颜色或图标等.在需要自定义排序情况下,我们也可以自行根据排序需要编辑数据排列顺序.本文,将通过Java应用程序来实现如何自定义排序. 1.引入jar包 使用jar包:Spire.Xls.jar version: 12.8.4 导入方法1:手动下载jar到本地,解压,然后找到lib文件夹下的Spire.Xls.jar文件.然后在IDEA中打开“Proje

  • SpringBoot+Elasticsearch实现数据搜索的方法详解

    目录 一.简介 二.代码实践 2.1.导入依赖 2.2.配置环境变量 2.3.创建 elasticsearch 的 config 类 2.4.索引管理 2.5.文档管理 三.小结 一.简介 在上篇 ElasticSearch 文章中,我们详细的介绍了 ElasticSearch 的各种 api 使用. 实际的项目开发过程中,我们通常基于某些主流框架平台进行技术开发,比如 SpringBoot,今天我们就以 SpringBoot 整合 ElasticSearch 为例,给大家详细的介绍 Elast

  • java与JSON数据的转换实例详解

    java与JSON数据的转换实例详解 JSON与JAVA数据的转换(JSON 即 JavaScript Object Natation,它是一种轻量级的数据交换格式,非常适合于服务器与 JavaScript 的交互.) 代码中有这么一句,是后台的封装数据. JSONObject jo = JSONObject.fromObject(map); 常见的java代码转换成json --请注意,这个方法曾经给我造成过困惑.因为,它在对Object转换的时候是按照domain类中的所有getXXX()方

  • Java多线程之线程状态的迁移详解

    一.六种状态 java.lang.Thread 的状态分为以下 6 种,它们以枚举的形式,封装在了Thread类内部: NEW:表示线程刚刚创建出来,还未启动 RUNNABLE:可运行状态,该状态的线程可以是ready或running,唯一的决定因素是线程调度器 BLOCKED:阻塞,线程正在等待一个monitor锁以便进入一个同步代码块 WAITING:等待,一种挂起等待的状态.一个线程处于waiting是为了等待其他线程执行某个特定的动作. TIMED_WAITING:定时等待. TERMI

  • Spring中自定义数据类型转换的方法详解

    目录 类型转换服务 实现Converter接口 实现ConverterFactory接口 实现GenericConverter接口 环境:Spring5.3.12.RELEASE. Spring 3引入了一个core.onvert包,提供一个通用类型转换系统.系统定义了一个SPI来实现类型转换逻辑,以及一个API来在运行时执行类型转换.在Spring容器中,可以使用这个系统作为PropertyEditor实现的替代,将外部化的bean属性值字符串转换为所需的属性类型.还可以在应用程序中需要类型转

  • Java通过反射注解赋值的方法详解

    目录 问题描述 最终解决 if/else 普通解法 通过反射注解赋值属性 解题思路 汇总某些字段的和 总结 源码 前段时间,领导分配一个统计销售区域汇总的数据,解决方案使用到了反射获取注解,通过注解获取属性或者设置字段属性. 问题描述 查询公司列表,分别是公司id.区域id.区域名称: 公司id 区域id 区域名称 1 1 华南 2 2 华北 3 2 华北 4 3 华东 5 3 华东 创建公司类Company: public class Company { public Company(Inte

  • Java实现统计在线人数功能的方法详解

    目录 1. 监听器的简介 2. Java监听器的类型 (1)ServletContextListener (2)HttpSessionListener (3)ServletRequestListener (4)ServletContextAttributeListener (5)HttpSessionAttributeListener (6)ServletRequestAttributeListener (7)HttpSessionActivationListener 3.监听器Listener

  • Python批量处理图片大小尺寸方法详解

    目录 前言 简单讲解 1.建目录 2.图片大小修改 批量修改图片Size 1.目录处理 2.图片批量修改Size完整代码 前言 出发点,网上下了一批png,使用wxFormBuilder做软件工具栏的图标,原图做出来的效果 这么大的一个图标让笔者差点就笑岔气了 以前都是用工具改变图片的大小,这次想了下,开发的事情肯定用脚本搞定 所以决定写一个脚本让图片变成32x32,原图是124x128的, 简单讲解 本文使用Pillow(PIL), 安装: pip install pillow 这个库有很多图

  • Java postgresql数组字段类型处理方法详解

    在实际开发中遇到postgresql中定义的数组字段,下面解决两个问题,如何定义数组字段的默认值为空格数组,以及如何再java实体类中直接使用数组对象接受数据或把数据存入数据库. 1.在postgresql中定义数组对象及默认值 以字符串你数组为例: 比如一个字段用于存储多张图片的url,可以使用一下sql定义 pictures _varchar NOT NUll default ARRAY[]::character varying[] 2.实体类存入数组到数据库并接受数据库的数组数据 直接定义

随机推荐