SpringBoot用多线程批量导入数据库实现方法

目录
  • 环境
  • 原始的for循环入库
  • 批量保存操作
  • 在批量插入的基础上使用多线程
  • 处理多线程入库的事务问题

环境

springboot、mybatisPlus、mysql8

mysql8(部署在1核2G的服务器上,很卡,所以下面的数据条数用5000,太大怕不是要等到花儿都谢了 0.0)

原始的for循环入库

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
            //在循环中入库
            baseMapper.insert(entity);
        }
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        return end - start;
    }
}

共耗时:180121 ms

批量保存操作

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
      	//mybatisPlus提供的批量保存方法,数字代表每几条数据提交一次事务,默认1000
        saveBatch(entityList, 1000);
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        return end - start;
    }
}

耗时时间:87217ms

在批量插入的基础上使用多线程

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Object doTest() throws InterruptedException {
        long start = System.currentTimeMillis();
        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 5000; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 1000);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch latch = new CountDownLatch(5);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
            });
        });
        latch.await();
        // 也可以这么写,设定超时时间
        //latch.await(100,TimeUnit.SECONDS);
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        //关闭线程池
        poolExecutor.shutdown();
        return end - start;
    }
}

耗时时间: 28235

可见时间从180秒,缩短到了28秒,但是@Transactional对于多线程是控制不了所有的事务的。

Spring实现事务的原理是通过ThreadLocal把数据库连接绑定到当前线程中,同一个事务中数据库操作使用同一个jdbc connection,新开启的线程获取不到当前jdbc connection。

如下代码:

partition.forEach(item -> {
            poolExecutor.execute(() -> {
                saveBatch(item, 1000);
                latch.countDown();
                //让每个都报错
                int i = 1/0;
            });
        });

控制台打印:

Exception in thread "执行线程5" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程2" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程4" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程1" java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
Exception in thread "执行线程3" 30179
java.lang.ArithmeticException: / by zero
    at com.kusch.ares.service.impl.MoreTestServiceImpl.lambda$null$1(MoreTestServiceImpl.java:68)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

可见5个线程都报错了,但是去查询数据库,却可以查询到5000条数据,这是不应该出现的情况。

处理多线程入库的事务问题

@Service
@Slf4j
public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEntity> implements MoreTestService {
    @Resource
    private DataSourceTransactionManager dataSourceTransactionManager;
    @Resource
    private TransactionDefinition transactionDefinition;
    @Override
    //此处手动管理事务的提交后,这个注解就可以去掉了
    //    @Transactional(rollbackFor = Exception.class)
    public Object doTest() {
        long start = System.currentTimeMillis();
        //手动创建线程池,注意你 数据库连接池的 允许连接数量,别超过了就行。
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                5,
                5,
                30,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(10),
                //isDaemon 设置线程是否是守护线程,true的话,主线程结束,new的线程就不会继续工作
                new NamedThreadFactory("执行线程", false),
                (r, executor) -> System.out.println("拒绝" + r));
        List<MoreTestEntity> entityList = new ArrayList<>();
        for (int i = 0; i < 50; i++) {
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) i);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            entityList.add(entity);
        }
        //拆分list,将其拆分成5份,然后上面线程池创建也是5个核心线程,刚好执行
        List<List<MoreTestEntity>> partition = ListUtils.partition(entityList, 10);
        //使用CountDownLatch保证所有线程都执行完成
        CountDownLatch sonLatch = new CountDownLatch(5);
        //主线程的 肯定为1
        CountDownLatch mainLatch = new CountDownLatch(1);
        AtomicBoolean hasError = new AtomicBoolean(false);
        partition.forEach(item -> {
            poolExecutor.execute(() -> {
                doSave(item, sonLatch, hasError, mainLatch);
            });
        });
        try {
            //此处应该是用try catch 包裹着主线程的所有业务代码,以此保证主线程中任何一处报错都可以通知子线程
            //这里加一个是为了调试主线程中的数据入库操作
            MoreTestEntity entity = new MoreTestEntity();
            entity.setId((long) 99999);
            entity.setA(UUID.randomUUID().toString());
            entity.setB(UUID.randomUUID().toString());
            entity.setC(UUID.randomUUID().toString());
            entity.setD(UUID.randomUUID().toString());
            entity.setE(UUID.randomUUID().toString());
            entity.setF(UUID.randomUUID().toString());
            entity.setG(UUID.randomUUID().toString());
            entity.setH(UUID.randomUUID().toString());
            entity.setI(UUID.randomUUID().toString());
            entity.setJ(UUID.randomUUID().toString());
            entity.setK(UUID.randomUUID().toString());
            save(entity);
            //主线程报错
            int i = 10 / 0;
            sonLatch.await();
        } catch (InterruptedException e) {
            hasError.set(true);
            e.printStackTrace();
        }
        mainLatch.countDown();
        long end = System.currentTimeMillis();
        System.err.println(end - start);
        //关闭线程池
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
        return end - start;
    }
    /**
     * 包装后的子线程的保存代码
     *
     * @param entityList 要保存的集合
     * @param sonLatch   子线程 CountDownLatch
     * @param hasError   是否发生错误
     * @param mainLatch  主线程 CountDownLatch
     */
    private void doSave(List<MoreTestEntity> entityList,
                        CountDownLatch sonLatch,
                        AtomicBoolean hasError,
                        CountDownLatch mainLatch) {
        TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
        try {
            //            //子线程报错
            //            int i = 10 / 0;
            saveBatch(entityList);
        } catch (Throwable throwable) {
            throwable.printStackTrace();
            hasError.set(true);
        } finally {
            //这是必须的,每个子线程走完,要让主线程继续走,然后再回到子线程的每个任务,决定是提交还是回滚
            sonLatch.countDown();
        }
        try {
            //等待主线程的执行结束
            mainLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
            hasError.set(true);
        }
        //事务操作
        if (hasError.get()) {
            dataSourceTransactionManager.rollback(transactionStatus);
        } else {
            dataSourceTransactionManager.commit(transactionStatus);
        }
    }
}

分别放开子线程报错和主线程报错,会发现事务都可以正常回滚,达到了预期的效果。

主要思路就是通过子线程CountDownLatch和主线程CountDownLatch,控制线程好代码的执行顺序即可。

最后补充几点:

  • 上述代码中的countDown()一旦出现不执行的情况那会导致线程堵塞堆积,所以建议给await()增加超时时间
  • 这样操作可能还会出现问题,比如主线程通知子线程可以进行实务操作了,但是各个子线程之间非透明,所以还是有几率存在某个子线程事务回滚失败的情况。

到此这篇关于SpringBoot用多线程批量导入数据库实现方法的文章就介绍到这了,更多相关SpringBoot多线程导入数据库内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 基于SpringBoot多线程@Async的使用体验

    目录 多线程@Async的使用体验 场景 1.线程池配置 2.子父线程之间共享一个Request的配置方案 3.阻塞主线程,等待所有子线程执行完毕后继续执行主线程 1.CountDownLatch 2.Future 4.多线程共用一个事务 异步调用@Async问题 1.使用背景 2.异步处理方式 3.@Async不返回数据 4.@Async返回数据 5.异常处理 多线程@Async的使用体验 场景 导入:可以将大批量的数据insert操作采用多线程的方式并行执行 第三方服务的接口调用:由于存在个

  • SpringBoot实现动态多线程并发定时任务

    本文实例为大家分享了SpringBoot实现动态多线程并发定时任务的具体代码,供大家参考,具体内容如下 实现定时任务有多种方式,使用spring自带的,继承SchedulingConfigurer的方式. 一.实现 1.启动类 在启动类添加注解@EnableScheduling开启,不然不起用做. 2.新建任务类 添加注解@Component注册到spring的容器中. package com.example.demo.task; import com.example.demo.entity.M

  • SpringBoot多线程进行异步请求的处理方式

    目录 SpringBoot多线程进行异步请求的处理 第一步:编写配置类 第二步:对方法使用注解标注为使用多线程进行处理 SpringBoot请求线程优化 使用Callable来实现 1.异步调用的另一种方式 3.Deferred方式实现异步调用 SpringBoot多线程进行异步请求的处理 近期在协会博客园中,有人发布了博客,系统进行查重的时候由于机器最低配置进行大量计算时需要十秒左右时间才能处理完,由于一开始是单例模式,导致,在某人查重的时候整个系统是不会再响应别的请求的,导致了系统假死状态,

  • SpringBoot中使用多线程的方法示例

    一.介绍 Spring是通过任务执行器(TaskExecutor)来实现多线程和并发编程,使用Spring提供的ThreadPoolTaskExecutor来创建一个基于线城池的TaskExecutor.在使用线程池的大多数情况下都是异步非阻塞的.节省更多的时间,提高效率. 工作原理 当主线程中调用execute接口提交执行任务时:则执行以下步骤:注意:线程池初始时,是空的. 如果当前线程数<corePoolSize,如果是则创建新的线程执行该任务 如果当前线程数>=corePoolSize,

  • SpringBoot定时任务多线程实现示例

    测试Spring Boot定时任务冲突时,使用的线程数量 引入依赖: Spring Boot 2.6.1 <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> 简单的测试类 import lombok.extern.slf4j.Slf4j; import org.springframework.sc

  • SpringBoot用多线程批量导入数据库实现方法

    目录 环境 原始的for循环入库 批量保存操作 在批量插入的基础上使用多线程 处理多线程入库的事务问题 环境 springboot.mybatisPlus.mysql8 mysql8(部署在1核2G的服务器上,很卡,所以下面的数据条数用5000,太大怕不是要等到花儿都谢了 0.0) 原始的for循环入库 @Service @Slf4j public class MoreTestServiceImpl extends ServiceImpl<MoreTestMapper, MoreTestEnti

  • SpringBoot+Vue实现EasyPOI导入导出的方法详解

    目录 前言 一.为什么做导入导出 二.什么是 EasyPOI 三.项目简介 项目需求 效果图 开发环境 四.实战开发 核心源码 前端页面 后端核心实现 五.项目源码 小结 前言 Hello~ ,前后端分离系列和大家见面了,秉着能够学到知识,学会知识,学懂知识的理念去学习,深入理解技术! 项目开发过程中,很大的需求都有 导入导出功能,我们依照此功能,来实现并还原真实企业开发中的实现思路 一.为什么做导入导出 为什么做导入导出 导入 在项目开发过程中,总会有一些统一的操作,例如插入数据,系统支持单个

  • php实现refresh刷新页面批量导入数据的方法

    本文实例讲述了php实现refresh刷新页面批量导入数据的方法.分享给大家供大家参考.具体分析如下: 这个功能是参考dedecms生成html页面的原理,只是dedecms使用的是js跳转而我使用的是refresh进行跳转,效果是一样的,下面我们一起来看一个php实现批量导入数据的方法. 因为我有1000W数据一次导入数据库肯定是不可行了,所以我就导致了每一次导入50条或更多数据,然后下次再刷新一次这样就可以解决这个问题了,代码如下: 复制代码 代码如下: <?php set_time_lim

  • Java Excel数据导入数据库的方法

    目录 1.根据业务需求设计数据库表 2.根据数据库表设计一个Excel模板 3.环境准备 4.通过插件生成表对应的实体类 5.自定义编写工具类 6.编写具体业务逻辑Service 7.在dao层对应的xml文件中,编写批量上传的方法 8.Controller实现业务的控制 9.通过Swagger测试接口 10.在数据和控制台中查看导入效果 总结 1.根据业务需求设计数据库表 2.根据数据库表设计一个Excel模板 模板的每列属性必须与表字段一一对应 3.环境准备 我这里项目环境是基于Spring

  • mysql如何利用Navicat导出和导入数据库的方法

    MySql是我们经常用到的数据,无论是开发人员用来练习,还是小型私服游戏服务器,或者是个人软件使用,都十分方便.对于做一些个人辅助软件,选择mysql数据库是个明智的选择,有一个好的工具更是事半功倍,对于MySql 的IDE 我推荐Navicat for MySql,现在我就向大家介绍如何利用Navicat for MySql 导出和导入数据. 导出数据库: 打开Navicat ,在我们要到处的数据上面右击鼠标,然后弹出的快捷菜单上点击"转储SQL 文件",在再次弹出的子菜单项中选择第

  • 基于PHP读取TXT文件向数据库导入海量数据的方法

    有一个TXT文件,包含了10万条记录,格式如下: 列1       列2       列3   列4   列5a    00003131    0    0    adductive#1 adducting#1 adducent#1a    00003356    0    0    nascent#1a    00003553    0    0    emerging#2 emergent#2a    00003700    0.25    0    dissilient#1 -------

  • 图解SSIS批量导入Excel文件的实现方法

    将一个目录下(可以包括子目录)结构一样的excel文件批量导入sql2005,可以用ssis来定制任务.下面用大量图片完全说明整个过程. 1.建立测试excel文件,假设有a b c d四个字段,保存在f:/excel目录下 并复制很多个一样的文件2.打开Microsoft Visual Studio 2005或者随sql2005安装的SQL Server Business Intelligence Development Studio,新建一个商业智能项目.3.工具箱拖一个Foreach循环容

  • java批量导入导出文件的实例分享(兼容xls,xlsx)

    一.介绍 利用java实现文件的导入导出数据库,目前在大部分系统中是比较常见的功能了,今天写个小demo来理解其原理,没接触过的同学也可以看看参考下. 目前我所接触过的导入导出技术主要有POI和iReport,poi主要作为一些数据批量导入数据库,iReport做报表导出.另外还有jxl类似poi的方式,不过貌似很久没跟新了,2007之后的office好像也不支持,这里就不说了. 二.POI使用详解 2.1 什么是Apache POI? Apache POI是Apache软件基金会的开放源码函式

  • python Django批量导入不重复数据

    本文为大家分享了python Django批量导入不重复数据的实现代码,供大家参考,具体内容如下 程序如下: #coding:utf-8 import os os.environ.setdefault("DJANGO_SETTINGS_MODULE", "www.settings") ''' Django 版本大于等于1.7的时候,需要加上下面两句 import django django.setup() 否则会抛出错误 django.core.exceptions

  • java多线程批量拆分List导入数据库的实现过程

    目录 一.前言 二.直接把list怼进Mysql 三.分组把list导入Mysql中 四.多线程分批导入Mysql 五.小结 一.前言 前两天做了一个导入的功能,导入开始的时候非常慢,导入2w条数据要1分多钟,后来一点一点的优化,从直接把list怼进Mysql中,到分配把list导入Mysql中,到多线程把list导入Mysql中.时间是一点一点的变少了.非常的爽,最后变成了10s以内.下面就展示一下过程. 二.直接把list怼进Mysql 使用mybatis的批量导入操作: @Transact

随机推荐