Java多线程实现第三方数据同步

本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下

一、场景

最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+。在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换;增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换。在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!!

二、获取数据的方式

2.1 递归方式

使用递归方式时,要求数据量少,否则会出现栈溢出或堆溢出!!!并且递归方式是单线程,所以会导致同步速度很慢!!!

/**
     * 数据同步 - 递归方式
     * 此处存量数据只需要请求到数据并保存数据库即可,后期通过kettle进行转换。
     * Data为自定义实体类,这里仅做示例!!!
*/
    private void fetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex,pageSize);
        if (CollectionUtils.isNotEmpty(datas)) {
            dataService.saveOrUpdateBatch(datas);
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
            // 递归操作-直到数据同步完毕
            fetchAndSaveDB(pageIndex + 1, pageSize);
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }
    }
    /** 
     * 获取分页数据,Data为自定义实体类,这里仅做示例!!!
     */
    private List<Data> getDataByPage(int pageIndex, int pageSize) throws Exception {
        //通过feign调用第三方接口获取数据
        String data = dataFeignService.fetchAllData(pageSize, pageIndex);
        JSONObject jsonObject = JSONObject.parseObject(data);
        JSONArray datalist = jsonObject.getJSONArray("datalist");
        List<Data> datas = datalist.toJavaList(Data.class);
        return datas;
    }

2.2 多线程方式

由于递归方式是单线程,考虑到数据的庞大,且易造成内存溢出,因此将递归更换成多线程方式,不仅避免了内存溢出的情况,且速度大大的提升!!!

public void synAllData() {
         // 定义原子变量 - 页数
        AtomicInteger pageIndex = new AtomicInteger(0);
         // 创建线程池
         ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

        // 100万数据
        int total = 1000000;//数据总量
        int times = total / 1000;
        if (total % 1000!= 0) {
            times = times + 1;
        }
        LocalDateTime beginLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】开始同步时间:{}", beginLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        for (int index = 1; index <= times; index++) {
            fixedThreadPool.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        multiFetchAndSaveDB(pageIndex.incrementAndGet(), 1000);
                    } catch (Exception e) {
                        log.error("并发获取并保存数据异常:{}", e);
                    }
                }
            });
        }
        LocalDateTime endLocalDateTime = LocalDateTime.now();
        log.info("【数据同步 - 存量】同步结束时间:{},总共耗时:{}分钟",
                endLocalDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")),
                Duration.between(beginLocalDateTime, endLocalDateTime).toMinutes());
    }
    /**
     * 数据同步 - 【多线程方式】
     *
     * @throws Exception
     */
    private void multiFetchAndSaveDB(int pageIndex, int pageSize) throws Exception {
        log.info("【数据同步 - 存量】,第{}次同步,", pageIndex);
        List<Data> datas= getDataByPage(pageIndex, pageSize);//getDataByPage()同上2.1
        if (CollectionUtils.isNotEmpty(datas)) {
            log.info("【数据同步 - 存量】,第{}次同步,同步成功", pageIndex);
            if (datas.size() < pageSize) {
                log.info("【数据同步 - 存量】,第{}次同步,获取数据小于每页获取条数,证明已全部同步完毕!!!", pageIndex);
                return;
            }
        } else {
            log.info("【数据同步 - 存量】,第{}次同步,获取数据为空,证明已全部同步完毕!!!", pageIndex);
            return;
        }

    }

三、增量数据如何对接

增量数据需要写定时任务,可使用Scheduled注解,并需要将增量数据存放到目标库中且进行数据转换!!!此处就不再提供代码,可以参考上面的存量数据的方式编写!!!

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • java多线程编程之为什么要进行数据同步

    Java中的变量分为两类:局部变量和类变量.局部变量是指在方法内定义的变量,如在run方法中定义的变量.对于这些变量来说,并不存在线程之间共享的问题.因此,它们不需要进行数据同步.类变量是在类中定义的变量,作用域是整个类.这类变量可以被多个线程共享.因此,我们需要对这类变量进行数据同步.数据同步就是指在同一时间,只能由一个线程来访问被同步的类变量,当前线程访问完这些变量后,其他线程才能继续访问.这里说的访问是指有写操作的访问,如果所有访问类变量的线程都是读操作,一般是不需要数据同步的.那么如果不

  • Java多线程编程实战之模拟大量数据同步

    背景 最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的.否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果. 不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中.这就按照平时工作中遇到的实际问题,脑补了一个很可能存在的业务场景: 已知某公司管理着 1000 个微信服务号,每个服务号有 1w ~ 50w 粉丝不等.假设该公司每天都需要将所有微信服务号的粉丝数据通过调用微信 API 的方式更新到本地数据库. 需求分析 对此

  • Java实现多线程大批量同步数据(分页)

    背景 最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据. 最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率. 一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!! 最后,改造了一番,2小时,就成功同步了300w+数据. 以下是主要逻辑. 线程的

  • Java多线程实现第三方数据同步

    本文实例为大家分享了Java多线程实现第三方数据同步的具体代码,供大家参考,具体内容如下 一.场景 最近的一项开发任务是同步第三方数据,而第三方数据一般有存量数据和增量数据,存量数据有100w+.在得知此需求时,进行了一定的信息检索和工具学习,提前获取存量数据到目标库,再使用kettle进行存量数据转换:增量数据则根据业务方规定的请求时间,通过定时任务去获取增量数据并进行数据转换.在数据获取和转换时,我们应该要记录每一次的请求信息,便于溯源和数据对账!!! 二.获取数据的方式 2.1 递归方式

  • Java多线程下解决数据安全问题

    目录 同步代码块 同步方法 lock锁 同步代码块 基本语句 synchronized (任意对象) { 操作共享代码 } 代码示例 public class SellTicket implements Runnable { private int tickets = 100; private Object object = new Object(); @Override public void run() { while (true) { synchronized (object) { if

  • Java 多线程之间共享数据

    目录 1.线程范围的共享变量 2.使用Map实现线程范围内数据的共享 3.ThreadLocal实现线程范围内数据的共享 4.优化 5.实例 1.线程范围的共享变量 多个业务模块针对同一个static变量的操作 要保证在不同线程中 各模块操作的是自身对应的变量对象 public class ThreadScopeSharaData { private static int data = 0 ; public static void main(String[] args) { for(int i

  • 总结java多线程之互斥与同步解决方案

    一.线程互斥与同步 互斥:指的是多个线程不能同时访问共享变量 同步:指的是多个线程按指定的顺序执行操作 在同时有多个线程运行过程中,如何达到互斥和同步呢? 加锁即可 在此使用黑马笔记中room例子来说明锁.(ps: 以前就了解锁,但总会记乱,发现使用形象化记忆后就很清楚) 解决互斥 锁就相当于上图的房子,里面放着会被并发访问的共享变量 此时绿色区域(owner)无线程,此时多个线程想并发访问房子里的共享变量,那么只允许其中一个线程进入房子访问,并把房门锁上. 剩下的没有拿到锁的线程只能在entr

  • Java多线程编程之CountDownLatch同步工具使用实例

    好像倒计时计数器,调用CountDownLatch对象的countDown方法就将计数器减1,当到达0时,所有等待者就开始执行. java.util.concurrent.CountDownLatch 一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待.用给定的计数初始化CountDownLatch.由于调用了countDown()方法,所以在当前计数到达零之前,await方法会一直受阻塞.之后,会释放所有等待的线程,await的所有后续调用都将立即返回.这种现

  • 利用Java多线程技术导入数据到Elasticsearch的方法步骤

    前言 近期接到一个任务,需要改造现有从mysql往Elasticsearch导入数据MTE(mysqlToEs)小工具,由于之前采用单线程导入,千亿数据需要两周左右的时间才能导入完成,导入效率非常低.所以楼主花了3天的时间,利用java线程池框架Executors中的FixedThreadPool线程池重写了MTE导入工具,单台服务器导入效率提高十几倍(合理调整线程数据,效率更高). 关键技术栈 Elasticsearch jdbc ExecutorService\Thread sql 工具说明

  • Java多线程按指定顺序同步执行

    笔者今天看到一个有趣的面试题,如何让多个线程按照既定的顺序依次执行?比如每个线程输出一个整数, 那么期望就是这样的:0,1,2,3,4,5,6,7,8,9. 而不是0,2,4,1,3,5,8,7,9,6 乍一看,这不是反人性的考题吗?多线程本来就以乱序执行出名的.稍加思索,想到3种解决方案,分别用代码实现之. 方法1 使用newSingleThreadExecutor newSingleThreadExecutor返回仅仅包含一个线程的线程池,将多个任务交给此Executor时,这个线程池处理完

  • java多线程抓取铃声多多官网的铃声数据

    一直想练习下java多线程抓取数据. 有天被我发现,铃声多多的官网(http://www.shoujiduoduo.com/main/)有大量的数据. 通过观察他们前端获取铃声数据的ajax http://www.shoujiduoduo.com/ringweb/ringweb.php?type=getlist&listid={类别ID}&page={分页页码} 很容易就能发现通过改变 listId和page就能从服务器获取铃声的json数据, 通过解析json数据, 可以看到都带有{&q

  • JAVA多线程CountDownLatch使用详解

    前序: 上周测试给开发的同事所开发的模块提出了一个bug,并且还是偶现. 经过仔细查看代码,发现是在业务中启用了多线程,2个线程同时跑,但是新启动的2个线程必须保证一个完成之后另一个再继续运行,才能消除bug. 什么时候用? 多线程是在很多地方都会用到的,但是我们如果想要实现在某个特定的线程运行完之后,再启动另外一个线程呢,这个时候CountDownLatch就可以派上用场了 怎么用? 先看看普通的多线程代码: package code; public class MyThread extend

随机推荐