基于Java方式实现数据同步

本文实例为大家分享了Java方式实现数据同步的具体代码,供大家参考,具体内容如下

使用java方式实现两个系统之间数据的同步。

业务背景

在新系统中设置定时任务需要实时把客户系统中的数据及时同步过来,保持数据的一致性。

实现逻辑

1.根据客户提供的接口,本系统中采用Http的Post请求方式获取接口数据。
2.由于客户提供的接口必带页码和页面容量,因此会涉及到多次请求接口才能拿到全量数据,因此相同的操作可以采用递归的方式进行。
3.每次请求一次接口根据页面容量(pageSize)可获取多条数据,此时可以采用批量添加数据库的操作,使用批量SQL添加语句。
4.由于数据同步需要保持两个系统数据的一致性,因此需要使用定时任务并规定同步频率,例如:一天一次或者一天两次。
5.定时任务的使用会产生数据重复的问题,因此根据某个唯一字段建立唯一索引来避免数据重复添加的问题。
6.唯一索引的建立可以避免同一记录重复添加的问题,但是避免不了同一条记录除唯一索引字段之外其它字段对应数据发生变化问题,因此使用replace into添加SQL语句可以解决此问题。

提示: a. 如果发现表中已经有此行数据(根据主键或者唯一索引判断)则先删除此行数据,然后插入新的数据。 b. 不然的话,直接插入新的数据。

使用技术

1.设置定时任务。
2.采用Http的Post方式获取接口数据。
3.涉及多页数据采用递归方式循环调用。
4.批量操作数据库(replace into)。
5.建立唯一索引避免重复插入数据。

代码详情

1.StudentMapper.java

/**
     * 批量添加数据同步接口学生数据
     * @param studentList
     * @return
     */
    int addBatchStudent(@Param(value = "studentList") List<Student> studentList);

2.SyncStudentServiceImpl.java代码如下:

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import ***.common.utils.HttpUtils;
import ***.common.utils.StringUtils;
import ***.entity.sync.Student;
import ***.mapper.sync.StudentMapper;
import ***.service.sync.SyncStudentService;
import ***.vo.StudentVO;
import lombok.extern.slf4j.Slf4j;

/**
 * 数据同步的学生实现类
 * @author hc
 * @create 2021/03/25 11:20
 * @version 1.0
 * @since 1.0
 */
@Service
@Slf4j
public class SyncStudentServiceImpl implements SyncStudentService {
    @Autowired
    private StudentMapper studentMapper;
    @Autowired
    private HttpUtils httpUtils;

    @Override
    public void bulkAddStudent(StudentVO studentVO) {
        log.info("调取学生接口传的参数对象studentVO:"+studentVO);
        log.info("开始调取学生接口获取第" + studentVO.getPageIndex() + "页数据");
        //如何页面容量小于100,则按100计算
        if(studentVO.getPageSize() < 100) {
            studentVO.setPageSize(100);
        }
        //根据当前页码和页面容量调取获取学生数据接口
        JSONObject jsonObject = this.sendStudentHttpPost(studentVO);
        //判断返回JSONObject是否为null
        if (StringUtils.isNotNull(jsonObject) && jsonObject.containsKey("errcode") && jsonObject.getInteger("errcode") == 0) {
            if(jsonObject.containsKey("data")){
                JSONArray jsonArray = jsonObject.getJSONArray("data");
                //通过判断获取的jsonObject对象中key值为data是否为null和其 jsonArray的长度来判断是否进行递归
                //提示:此判断方法好于通过计算总页码的方式来递归拿数据(对获取的total依赖过大,因此放弃此方式)
                if(jsonObject.getString("data") != null && jsonArray.size() > 0) {
                   log.info("当前数据加载到几页》》》》:{}", studentVO.getPageIndex());
                   //调取批量添加数据
                   this.addStudentCycleData(jsonObject, studentVO);
                   //页码加1,继续调取下一页数据
                      studentVO.setPageIndex(studentVO.getPageIndex() + 1);
                     //采用递归方式直至循环结束
                   this.bulkAddStudent(studentVO);
                }else {
                    log.info("学生数据同步结束》》》");
                }
            }
        }
    }

    /**
     * 批量添加学生数据
     * @param jsonObject
     * @param areaVO
     * @return
     */
    public void addStudentCycleData(JSONObject jsonObject, StudentVO studentVO){
        List<Student> studentList = null;
        //判断jsonArray时候为空
        if (jsonObject != null && StringUtils.isNotBlank(jsonObject.getString("data"))) {
            //把JsonArray转成对应实体类集合
            studentList = JSONObject.parseArray(jsonObject.getString("data"), Student.class);
        }
        try {
            log.info("学生接口第" + studentVO.getPageIndex() + "页数据开始入库...");
            //调取方法批量进行添加学生数据
            studentMapper.addBatchStudent(studentList);
            log.info("学生接口第" + studentVO.getPageIndex() + "页数据入库成功...");
        } catch (Exception e) {
            log.error("学生批量添加数据库异常:{}", e.getMessage());
        }
    }

    /**
     * 根据studentVO(当前页码和页面容量)发送获取学生数据的请求
     * @param studentVO
     * @return
     */
    public JSONObject sendStudentHttpPost(StudentVO studentVO){
        JSONObject jsonObject = null;
        String studentUrl = "http://*****/async-api/jc/student";
        try {
            if (StringUtils.isNotEmpty(studentUrl)) {
                Map<String, Object> param = new HashMap<>();
                param.put("pageIndex", studentVO.getPageIndex());
                param.put("pageSize", studentVO.getPageSize());
                log.info("开始发起http请求...");
                jsonObject = httpUtils.sendHttpPost(param, studentUrl);
            }
        } catch (Exception e) {
            log.error("调取客户学生同步接口出现异常:{},页面容量为:{},页码:{}", e.getMessage(), 
            studentVO.getPageSize(), studentVO.getPageIndex());
        }
        return jsonObject;
    }
}

3.StudentVO.java代码如下:

import lombok.Data;
/**
 * 数据同步接口获取学生数据传的参数VO类
 * @author hc
 * @create 2021/3/11 10:35
 * @version 1.0
 * @since 1.0
 */
@Data
public class StudentVO{
    //当前页码(初始值为0)
    private Integer pageIndex = 0;
    //页码容量
    private Integer pageSize;
}

4.HttpUtils.java代码如下:

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
import org.springframework.web.client.HttpClientErrorException;
import org.springframework.web.client.RestTemplate;
import java.util.Map;

/**
 * Http请求工具类
 * @author hc
 * @create 2021/3/4
 * @version 1.0
 */
@Component
@Slf4j
public class HttpUtils {
    @Autowired
    private RestTemplate restTemplate;
    
    /**
     * 发送http的post请求方法
     * @param param
     * @return
     */
    public JSONObject sendHttpPost(Integer type, Map<String, Object> param, String url){
        log.info("调取同步接口Url:{}", url);
        JSONObject jsonObject = null;
        //发起http的post准备工作
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add("Content-Type", "application/json");
        HttpEntity<Map<String, Object>> httpEntity = new HttpEntity<>(param, httpHeaders);
        ResponseEntity<String> response = null;
        try {
            log.info("param参数为:{}",param.toString());
            response = restTemplate.postForEntity(url, httpEntity, String.class);
        } catch (HttpClientErrorException e) {
            log.error("发起http请求报错信息:{}",e.getResponseBodyAsString());
        }
        String bodyData = response.getBody();
        if (StringUtils.isNotEmpty(bodyData)) {
            jsonObject = JSONObject.parseObject(bodyData);
        }
        return jsonObject;
    }
}

5.StudentMapper.xml的SQL语句如下:

<!-- 批量添加数据同步接口中获取的学生数据 -->
<insert id="addBatchStudent" parameterType="***.entity.sync.Student">
    replace into xf_clue_sync_student(id, student_code, student_name, status, create_date, update_date)
    <foreach collection="studentList" item="student" open="values" separator="," >
       (#{student.id,jdbcType=BIGINT}, #{student.studentCode,jdbcType=INTEGER}, #{student.studentName,jdbcType=VARCHAR}, 
        #{student.status,jdbcType=INTEGER}, #{student.createDate,jdbcType=VARCHAR}, #{student.updateDate,jdbcType=VARCHAR})
    </foreach>
</insert>

功能小结

1.定时任务配置相关代码此处不再展示,SpringBoot框架使用注解的方式即可设置定时任务以及调取频率。
2.数据同步接口开发需要根据具体应用场景采用不同的方法,需视情况而定,例如:也可以使用kettle工具等等。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

(0)

相关推荐

  • Java多线程编程实战之模拟大量数据同步

    背景 最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的.否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果. 不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中.这就按照平时工作中遇到的实际问题,脑补了一个很可能存在的业务场景: 已知某公司管理着 1000 个微信服务号,每个服务号有 1w ~ 50w 粉丝不等.假设该公司每天都需要将所有微信服务号的粉丝数据通过调用微信 API 的方式更新到本地数据库. 需求分析 对此

  • java 定时同步数据的任务优化

    前言 定时任务在系统中并不少见,主要目的是用于需要定时处理数据或者执行某个操作的情况下,如定时关闭订单,或者定时备份.而常见的定时任务分为2种,第一种:固定时间执行,如:每分钟执行一次,每天执行一次.第二种:延时多久执行,就是当发生一件事情后,根据这件时间发生的时间定时多久后执行任务,如:15分钟后关闭订单付款状态,24小时候后关闭订单并且释放库存,而由于第二种一般都是单一数据的处理(主要是指数据量不大,一般情况下只有一个主体处理对象,如:一个订单以及订单中的N个商品),所以一般情况下第二种出现

  • java多线程编程之为什么要进行数据同步

    Java中的变量分为两类:局部变量和类变量.局部变量是指在方法内定义的变量,如在run方法中定义的变量.对于这些变量来说,并不存在线程之间共享的问题.因此,它们不需要进行数据同步.类变量是在类中定义的变量,作用域是整个类.这类变量可以被多个线程共享.因此,我们需要对这类变量进行数据同步.数据同步就是指在同一时间,只能由一个线程来访问被同步的类变量,当前线程访问完这些变量后,其他线程才能继续访问.这里说的访问是指有写操作的访问,如果所有访问类变量的线程都是读操作,一般是不需要数据同步的.那么如果不

  • Java实现同步枚举类数据到数据库

    本文实例为大家分享了Java同步枚举类数据到数据库的具体实现代码,供大家参考,具体内容如下 1.需求说明: 我们在开发中常常会用到数据字典,后端程序中也会经常用到(一般是用枚举类来存储),然而我们数据库中也会维护一个数据字典的数据,便于前端做数据显示时的处理,有一个问题就是,如果字典项发生变化后,我们需要修改枚举类和数据库的字典数据,要修改两次,还要面临二者不一致的风险. 所以这里的一个决绝方案就是自动读取枚举类的数据并更新到数据库,本文只讲枚举类数据的提取. 2.首先创建一个描述枚举类型的注解

  • Java实现多线程大批量同步数据(分页)

    背景 最近遇到个功能,两个月有300w+的数据,之后还在累加,因一开始该数据就全部存储在mysql表,现需要展示在页面,还需要关联另一张表的数据,而且产品要求页面的查询条件多达20个条件,最终,这个功能卡的要死,基本查不出来数据. 最后是打算把这两张表的数据同时存储到MongoDB中去,以提高查询效率. 一开始同步的时候,采用单线程,循环以分页的模式去同步这两张表数据,结果是…一晚上,只同步了30w数据,特慢!!! 最后,改造了一番,2小时,就成功同步了300w+数据. 以下是主要逻辑. 线程的

  • 基于Java方式实现数据同步

    本文实例为大家分享了Java方式实现数据同步的具体代码,供大家参考,具体内容如下 使用java方式实现两个系统之间数据的同步. 业务背景 在新系统中设置定时任务需要实时把客户系统中的数据及时同步过来,保持数据的一致性. 实现逻辑 1.根据客户提供的接口,本系统中采用Http的Post请求方式获取接口数据.2.由于客户提供的接口必带页码和页面容量,因此会涉及到多次请求接口才能拿到全量数据,因此相同的操作可以采用递归的方式进行.3.每次请求一次接口根据页面容量(pageSize)可获取多条数据,此时

  • 基于Java回顾之多线程同步的使用详解

    首先阐述什么是同步,不同步有什么问题,然后讨论可以采取哪些措施控制同步,接下来我们会仿照回顾网络通信时那样,构建一个服务器端的"线程池",JDK为我们提供了一个很大的concurrent工具包,最后我们会对里面的内容进行探索. 为什么要线程同步? 说到线程同步,大部分情况下, 我们是在针对"单对象多线程"的情况进行讨论,一般会将其分成两部分,一部分是关于"共享变量",一部分关于"执行步骤". 共享变量 当我们在线程对象(Run

  • MySQL高性能实现Canal数据同步神器

    目录 简介 配置MySQL centos7 安装 canal java客户端 简介 Canal是阿里巴巴基于Java开源的数据同步工具.平时业务场景使用比较多的如下: 同步数据到ES.Redis缓存中. 数据同步. 业务需要监听数据. 图片来源阿里巴巴github仓库:https://github.com/alibaba/canal 配置MySQL 跟配置主从复制类似. 数据执行如下命令: -- 给canal单独创建用户 用户名:canal 密码:123456 create user 'cana

  • Python操作ES的方式及与Mysql数据同步过程示例

    目录 Python操作Elasticsearch的两种方式 mysql和Elasticsearch同步数据 haystack的使用 Redis补充 Python操作Elasticsearch的两种方式 # 官方提供的:Elasticsearch # pip install elasticsearch # GUI:pyhon能做图形化界面编程吗? -Tkinter -pyqt # 使用(查询是重点) # pip3 install elasticsearch https://github.com/e

  • java多线程之线程同步七种方式代码示例

    为何要使用同步?  java允许多线程并发控制,当多个线程同时操作一个可共享的资源变量时(如数据的增删改查),     将会导致数据不准确,相互之间产生冲突,因此加入同步锁以避免在该线程没有完成操作之前,被其他线程的调用,     从而保证了该变量的唯一性和准确性. 1.同步方法  即有synchronized关键字修饰的方法.     由于java的每个对象都有一个内置锁,当用此关键字修饰方法时,     内置锁会保护整个方法.在调用该方法前,需要获得内置锁,否则就处于阻塞状态.     代码

  • linux下实现web数据同步的四种方式(性能比较)

    实现web数据同步的四种方式 ======================================= 1.nfs实现web数据共享2.rsync +inotify实现web数据同步3.rsync+sersync更快更节约资源实现web数据同步4.unison+inotify实现web数据双向同步 ======================================= 一.nfs实现web数据共享 nfs能实现数据同步是通过NAS(网络附加存储),在服务器上共享一个文件,且服务器需

  • 基于java解析JSON的三种方式详解

    本文实例分析了基于java解析JSON的三种方式.分享给大家供大家参考,具体如下: 一.什么是JSON? JSON是一种取代XML的数据结构,和xml相比,它更小巧但描述能力却不差,由于它的小巧所以网络传输数据将减少更多流量从而加快速度. JSON就是一串字符串 只不过元素会使用特定的符号标注. {} 双括号表示对象 [] 中括号表示数组 "" 双引号内是属性或值 : 冒号表示后者是前者的值(这个值可以是字符串.数字.也可以是另一个数组或对象) 所以 {"name"

  • 基于java文件上传-原始的Servlet方式

    前言:干了这几个项目,也做过几次文件上传下载,要么是copy项目以前的代码,要么是百度的,虽然做出来了,但学习一下原理弄透彻还是很有必要的.刚出去转了一圈看周围有没有租房的,在北京出去找房子是心里感觉最不爽的时候,没有归属感,房租还不便宜,RT,不能好高骛远,还是脚踏实地一点一点学技术吧,终将有一日,工资会涨的. java文件上传 传统的文件上传,不用jquery插件的话,就是用form表单提交,项目里用过uploadify,可以异步上传文件,原理我也没研究.现在说传统的form表单上传文件.

  • 基于C# 写一个 Redis 数据同步小工具

    概念 Redis是一个开源的使用ANSI C语言编写.支持网络.可基于内存亦可持久化的日志型.Key-Value数据库,和Memcached类似,它支持存储的value类型相对更多,包括string(字符串).list(链表).set(集合).zset(sorted set --有序集合)和hash(哈希类型).在此基础上,redis支持各种不同方式的排序.与memcached一样,为了保证效率,数据都是缓存在内存中.区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文

随机推荐