Java实现多线程大批量同步数据(分页)

背景

最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据。

最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率。

一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!!

最后,改造了一番,2小时,就成功同步了300w+数据。

以下是主要逻辑。

线程的个数请根据你自己的服务器性能酌情设置。

思路

先通过count查出结果集的总条数,设置每个线程分页查询的条数,通过总条数和单次条数得到线程数量,通过改变limit的下标实现分批查询。

代码实现

package com.github.admin.controller.loans;

import com.baomidou.mybatisplus.mapper.EntityWrapper;
import com.github.admin.model.entity.CaseCheckCallRecord;
import com.github.admin.model.entity.duyan.DuyanCallRecordDetail;
import com.github.admin.model.entity.loans.CaseCallRemarkRecord;
import com.github.admin.service.duyan.DuyanCallRecordDetailService;
import com.github.admin.service.loans.CaseCallRemarkRecordService;
import com.github.common.constant.MongodbConstant;
import com.github.common.util.DingDingMsgSendUtils;
import com.github.common.util.ListUtils;
import com.github.common.util.Response;
import com.github.common.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.query.Criteria;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

/**
 * 多线程同步历史数据
 * @author songfayuan
 * @date 2019-09-26 15:38
 */
@Slf4j
@RestController
@RequestMapping("/demo")
public class SynchronizeHistoricalDataController implements DisposableBean {

    private ExecutorService executor = Executors.newFixedThreadPool(10, "SynchronizeHistoricalDataController");  //newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

    @Value("${spring.profiles.active}")
    private String profile;
    @Autowired
    private DuyanCallRecordDetailService duyanCallRecordDetailService;
    @Autowired
    private MongoTemplate mongoTemplate;
    @Autowired
    private CaseCallRemarkRecordService caseCallRemarkRecordService;

    /**
     * 多线程同步通话记录历史数据
     * @param params
     * @return
     * @throws Exception
     */
    @GetMapping("/syncHistoryData")
    public Response syncHistoryData(Map<String, Object> params) throws Exception {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    logicHandler(params);
                } catch (Exception e) {
                    log.warn("多线程同步稽查通话记录历史数据才处理异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,多线程同步稽查通话记录历史数据才处理异常,errMsg = "+e);
                }
            }
        });
        return Response.success("请求成功");
    }

    /**
     * 处理数据逻辑
     * @param params
     * @throws Exception
     */
    private void logicHandler(Map<String, Object> params) throws Exception {
        /******返回结果:多线程处理完的最终数据******/
        List<DuyanCallRecordDetail> result = new ArrayList<>();

        /******查询数据库总的数据条数******/
        int count = this.duyanCallRecordDetailService.selectCount(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1));
        DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,本次需要同步" + count + "条历史稽查通话记录数据。");

//        int count = 2620266;
        /******限制每次查询的条数******/
        int num = 1000;

        /******计算需要查询的次数******/
        int times = count / num;
        if (count % num != 0) {
            times = times + 1;
        }

        /******每个线程开始查询的行数******/
        int offset = 0;

        /******添加任务******/
        List<Callable<List<DuyanCallRecordDetail>>> tasks = new ArrayList<>();
        for (int i = 0; i < times; i++) {
            Callable<List<DuyanCallRecordDetail>> qfe = new ThredQuery(duyanCallRecordDetailService, params, offset, num);
            tasks.add(qfe);
            offset = offset + num;
        }

        /******为避免太多任务的最终数据全部存在list导致内存溢出,故将任务再次拆分单独处理******/
        List<List<Callable<List<DuyanCallRecordDetail>>>> smallList = ListUtils.partition(tasks, 10);
        for (List<Callable<List<DuyanCallRecordDetail>>> callableList : smallList) {
            if (CollectionUtils.isNotEmpty(callableList)) {
//                executor.execute(new Runnable() {
//                    @Override
//                    public void run() {
//                        log.info("任务拆分执行开始:线程{}拆分处理开始...", Thread.currentThread().getName());
//
//                        log.info("任务拆分执行结束:线程{}拆分处理开始...", Thread.currentThread().getName());
//                    }
//                });

                try {
                    List<Future<List<DuyanCallRecordDetail>>> futures = executor.invokeAll(callableList);
                    /******处理线程返回结果******/
                    if (futures != null && futures.size() > 0) {
                        for (Future<List<DuyanCallRecordDetail>> future : futures) {
                            List<DuyanCallRecordDetail> duyanCallRecordDetailList = future.get();
                            if (CollectionUtils.isNotEmpty(duyanCallRecordDetailList)){
                                executor.execute(new Runnable() {
                                    @Override
                                    public void run() {
                                        /******异步存储******/
                                        log.info("异步存储MongoDB开始:线程{}拆分处理开始...", Thread.currentThread().getName());
                                        saveMongoDB(duyanCallRecordDetailList);
                                        log.info("异步存储MongoDB结束:线程{}拆分处理开始...", Thread.currentThread().getName());
                                    }
                                });
                            }
                            //result.addAll(future.get());
                        }
                    }
                } catch (Exception e) {
                    log.warn("任务拆分执行异常,errMsg = {}", e);
                    DingDingMsgSendUtils.sendDingDingGroupMsg("【系统消息】" + profile + "环境,任务拆分执行异常,errMsg = "+e);
                }
            }
        }
    }

    /**
     * 数据存储MongoDB
     * @param duyanCallRecordDetailList
     */
    private void saveMongoDB(List<DuyanCallRecordDetail> duyanCallRecordDetailList) {
        for (DuyanCallRecordDetail duyanCallRecordDetail : duyanCallRecordDetailList) {
            /******重复数据不同步MongoDB******/
            org.springframework.data.mongodb.core.query.Query query = new org.springframework.data.mongodb.core.query.Query();
            query.addCriteria(Criteria.where("callUuid").is(duyanCallRecordDetail.getCallUuid()));
            List<CaseCheckCallRecord> caseCheckCallRecordList = mongoTemplate.find(query, CaseCheckCallRecord.class, MongodbConstant.CASE_CHECK_CALL_RECORD);
            if (CollectionUtils.isNotEmpty(caseCheckCallRecordList)) {
                log.warn("call_uuid = {}在MongoDB已经存在数据,后面数据将被舍弃...", duyanCallRecordDetail.getCallUuid());
                continue;
            }

            /******关联填写的记录******/
            CaseCallRemarkRecord caseCallRemarkRecord = this.caseCallRemarkRecordService.selectOne(new EntityWrapper<CaseCallRemarkRecord>()
                    .eq("is_delete", 0)
                    .eq("call_uuid", duyanCallRecordDetail.getCallUuid()));

            CaseCheckCallRecord caseCheckCallRecord = new CaseCheckCallRecord();
            BeanUtils.copyProperties(duyanCallRecordDetail, caseCheckCallRecord);
            //补充
            caseCheckCallRecord.setCollectorUserId(duyanCallRecordDetail.getUserId());
            
            if (caseCallRemarkRecord != null) {
                //补充
                caseCheckCallRecord.setCalleeName(caseCallRemarkRecord.getContactName());            
            }
            log.info("正在存储数据到MongoDB:{}", caseCheckCallRecord.toString());
            this.mongoTemplate.save(caseCheckCallRecord, MongodbConstant.CASE_CHECK_CALL_RECORD);
        }
    }

    @Override
    public void destroy() throws Exception {
        executor.shutdown();
    }
}

class ThredQuery implements Callable<List<DuyanCallRecordDetail>> {
    /******需要通过构造方法把对应的业务service传进来 实际用的时候把类型变为对应的类型******/
    private DuyanCallRecordDetailService myService;
    /******查询条件 根据条件来定义该类的属性******/
    private Map<String, Object> params;

    /******分页index******/
    private int offset;
    /******数量******/
    private int num;

    public ThredQuery(DuyanCallRecordDetailService myService, Map<String, Object> params, int offset, int num) {
        this.myService = myService;
        this.params = params;
        this.offset = offset;
        this.num = num;
    }

    @Override
    public List<DuyanCallRecordDetail> call() throws Exception {
        /******通过service查询得到对应结果******/
        List<DuyanCallRecordDetail> duyanCallRecordDetailList = myService.selectList(new EntityWrapper<DuyanCallRecordDetail>()
                .eq("is_delete", 0)
                .eq("platform_type", 1)
                .last("limit "+offset+", "+num));
        return duyanCallRecordDetailList;
    }
}

ListUtils工具

package com.github.common.util;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.*;
import java.util.ArrayList;
import java.util.List;

/**
 * 描述:List工具类
 * @author songfayuan
 * 2018年7月22日下午2:23:22
 */
@Slf4j
public class ListUtils {
    
    /**
     * 描述:list集合深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年7月22日下午2:35:23
     */
    public static <T> List<T> deepCopy(List<T> src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            List<T> dest = (List<T>) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
            return null;
        } catch (IOException e) {
            e.printStackTrace();
            return null;
        }
    }
    /**
     * 描述:对象深拷贝
     * @param src
     * @return
     * @throws IOException
     * @throws ClassNotFoundException
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> T objDeepCopy(T src) {
        try {
            ByteArrayOutputStream byteout = new ByteArrayOutputStream();
            ObjectOutputStream out = new ObjectOutputStream(byteout);
            out.writeObject(src);
            ByteArrayInputStream bytein = new ByteArrayInputStream(byteout.toByteArray());
            ObjectInputStream in = new ObjectInputStream(bytein);
            @SuppressWarnings("unchecked")
            T dest = (T) in.readObject();
            return dest;
        } catch (ClassNotFoundException e) {
            log.error("errMsg = {}", e);
            return null;
        } catch (IOException e) {
            log.error("errMsg = {}", e);
            return null;
        }
    }

    /**
     * 将一个list均分成n个list,主要通过偏移量来实现的
     * @author songfayuan
     * 2018年12月14日
     */
    public static <T> List<List<T>> averageAssign(List<T> source, int n) {
        List<List<T>> result = new ArrayList<List<T>>();
        int remaider = source.size() % n;  //(先计算出余数)
        int number = source.size() / n;  //然后是商
        int offset = 0;//偏移量
        for (int i = 0; i < n; i++) {
            List<T> value = null;
            if (remaider > 0) {
                value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
                remaider--;
                offset++;
            } else {
                value = source.subList(i * number + offset, (i + 1) * number + offset);
            }
            result.add(value);
        }
        return result;
    }

    /**
     * List按指定长度分割
     * @param list the list to return consecutive sublists of (需要分隔的list)
     * @param size the desired size of each sublist (the last may be smaller) (分隔的长度)
     * @author songfayuan
     * @date 2019-07-07 21:37
     */
    public static <T> List<List<T>> partition(List<T> list, int size){
        return  Lists.partition(list, size); // 使用guava
    }

    /**
     * 测试
     * @param args
     */
    public static void main(String[] args) {
        List<Integer> bigList = new ArrayList<>();
        for (int i = 0; i < 101; i++){
            bigList.add(i);
        }
        log.info("bigList长度为:{}", bigList.size());
        log.info("bigList为:{}", bigList);
        List<List<Integer>> smallists = partition(bigList, 20);
        log.info("smallists长度为:{}", smallists.size());
        for (List<Integer> smallist : smallists) {
            log.info("拆分结果:{},长度为:{}", smallist, smallist.size());
        }
    }

}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java多线程编程实战之模拟大量数据同步

    背景 最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的.否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果. 不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中.这就按照平时工作中遇到的实际问题,脑补了一个很可能存在的业务场景: 已知某公司管理着 1000 个微信服务号,每个服务号有 1w ~ 50w 粉丝不等.假设该公司每天都需要将所有微信服务号的粉丝数据通过调用微信 API 的方式更新到本地数据库. 需求分析 对此

  • java多线程编程之为什么要进行数据同步

    Java中的变量分为两类:局部变量和类变量.局部变量是指在方法内定义的变量,如在run方法中定义的变量.对于这些变量来说,并不存在线程之间共享的问题.因此,它们不需要进行数据同步.类变量是在类中定义的变量,作用域是整个类.这类变量可以被多个线程共享.因此,我们需要对这类变量进行数据同步.数据同步就是指在同一时间,只能由一个线程来访问被同步的类变量,当前线程访问完这些变量后,其他线程才能继续访问.这里说的访问是指有写操作的访问,如果所有访问类变量的线程都是读操作,一般是不需要数据同步的.那么如果不

  • 基于Java方式实现数据同步

    本文实例为大家分享了Java方式实现数据同步的具体代码,供大家参考,具体内容如下 使用java方式实现两个系统之间数据的同步. 业务背景 在新系统中设置定时任务需要实时把客户系统中的数据及时同步过来,保持数据的一致性. 实现逻辑 1.根据客户提供的接口,本系统中采用Http的Post请求方式获取接口数据.2.由于客户提供的接口必带页码和页面容量,因此会涉及到多次请求接口才能拿到全量数据,因此相同的操作可以采用递归的方式进行.3.每次请求一次接口根据页面容量(pageSize)可获取多条数据,此时

  • java 定时同步数据的任务优化

    前言 定时任务在系统中并不少见,主要目的是用于需要定时处理数据或者执行某个操作的情况下,如定时关闭订单,或者定时备份.而常见的定时任务分为2种,第一种:固定时间执行,如:每分钟执行一次,每天执行一次.第二种:延时多久执行,就是当发生一件事情后,根据这件时间发生的时间定时多久后执行任务,如:15分钟后关闭订单付款状态,24小时候后关闭订单并且释放库存,而由于第二种一般都是单一数据的处理(主要是指数据量不大,一般情况下只有一个主体处理对象,如:一个订单以及订单中的N个商品),所以一般情况下第二种出现

  • Java实现同步枚举类数据到数据库

    本文实例为大家分享了Java同步枚举类数据到数据库的具体实现代码,供大家参考,具体内容如下 1.需求说明: 我们在开发中常常会用到数据字典,后端程序中也会经常用到(一般是用枚举类来存储),然而我们数据库中也会维护一个数据字典的数据,便于前端做数据显示时的处理,有一个问题就是,如果字典项发生变化后,我们需要修改枚举类和数据库的字典数据,要修改两次,还要面临二者不一致的风险. 所以这里的一个决绝方案就是自动读取枚举类的数据并更新到数据库,本文只讲枚举类数据的提取. 2.首先创建一个描述枚举类型的注解

  • Java实现多线程大批量同步数据(分页)

    背景 最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据. 最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率. 一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!! 最后,改造了一番,2小时,就成功同步了300w+数据. 以下是主要逻辑. 线程的

  • Java编程多线程之共享数据代码详解

    本文主要总结线程共享数据的相关知识,主要包括两方面:一是某个线程内如何共享数据,保证各个线程的数据不交叉:一是多个线程间如何共享数据,保证数据的一致性. 线程范围内共享数据 自己实现的话,是定义一个Map,线程为键,数据为值,表中的每一项即是为每个线程准备的数据,这样在一个线程中数据是一致的. 例子 package com.iot.thread; import java.util.HashMap; import java.util.Map; import java.util.Random; /*

  • Java多线程实现第三方数据同步

    本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下 一.场景 最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+.在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换:增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换.在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!! 二.获取数据的方式 2.1 递归方式

  • java多线程数据分页处理实例讲解

    在数据的最终结果上,我们能够通过分类的方法,准备的筛选出不同类别结果的信息.这里我们发散一下思维,在Java中对于数据大量处理的,多线程是一个非常常见的代表,我们可以用分页来处理多线程的数据问题.下面我们对分类的类型进行了解,然后带来两种分页在多线程的逻辑. 1.常见的分页类型 传统的:采用传统的分页方式,可以明确的获取数据信息,如有多少条数据,分多少页显示等. 下拉式:采用下拉式的分页方式,一般无法获取明确的数据数量相关的信息,但在分页操作以后,仍然可以看到之前查询的数据. 2.分页式查询逻辑

  • Java多线程 线程同步与死锁

     Java多线程 线程同步与死锁 1.线程同步 多线程引发的安全问题 一个非常经典的案例,银行取钱的问题.假如你有一张银行卡,里面有5000块钱,然后你去银行取款2000块钱.正在你取钱的时候,取款机正要从你的5000余额中减去2000的时候,你的老婆正巧也在用银行卡对应的存折取钱,由于取款机还没有把你的2000块钱扣除,银行查到存折里的余额还剩5000块钱,准备减去2000.这时,有趣的事情发生了,你和你的老婆从同一个账户共取走了4000元,但是账户最后还剩下3000元. 使用代码模拟下取款过

  • Java多线程的同步优化的6种方案

    概述 处理器上的寄存器的读写的速度比内存快几个数量级,为了解决这种速度矛盾,在它们之间加入了高速缓存. 加入高速缓存带来了一个新的问题:缓存一致性.如果多个缓存共享同一块主内存区域,那么多个缓存的数据可能会不一致,需要一些协议来解决这个问题. 在Java内存模型中,分为主内存和线程工作内存,线程使用共享数据时,先从主内存中拷贝数据到工作内存,使用完成之后再写入主内存中. 在Java中,有多线程并发时,我们可以使用多线程同步的方式来解决内存一致性的问题.通常我们可以在程序中添加同步锁来保障数据的安

  • Java多线程下解决数据安全问题

    目录 同步代码块 同步方法 lock锁 同步代码块 基本语句 synchronized (任意对象) { 操作共享代码 } 代码示例 public class SellTicket implements Runnable { private int tickets = 100; private Object object = new Object(); @Override public void run() { while (true) { synchronized (object) { if

  • Java 多线程之间共享数据

    目录 1.线程范围的共享变量 2.使用Map实现线程范围内数据的共享 3.ThreadLocal实现线程范围内数据的共享 4.优化 5.实例 1.线程范围的共享变量 多个业务模块针对同一个static变量的操作 要保证在不同线程中 各模块操作的是自身对应的变量对象 public class ThreadScopeSharaData { private static int data = 0 ; public static void main(String[] args) { for(int i

  • Java多线程之同步工具类CyclicBarrier

    目录 1 CyclicBarrier方法说明 2 CyclicBarrier实例 3 CyclicBarrier源码解析 CyclicBarrier构造函数 await方法 nextGeneration的源码 breakBarrier源码 isBroken方法 reset方法 getNumberWaiting方法 前言: CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到达到某个公共屏障点.与CountDownLatch不同的是该barrier在释放线程等待后可以重用,所以

随机推荐