Java之Rsync并发迁移数据并校验详解

java调用Rsync并发迁移数据并执行校验

java代码如下

RsyncFile.java

import lombok.NoArgsConstructor;
import lombok.SneakyThrows;

import java.io.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.*;

/**
 * @ClassName RsyncFile
 * @Descriptiom TODO rsync多线程同步迁移数据
 * @Author KING
 * @Date 2019/11/25 09:17
 * @Version 1.2.2
 * rsync -vzrtopg --progress --delete   //镜像同步
 **/
@NoArgsConstructor
public class RsyncFile implements  Runnable{
    private static final int availProcessors = Runtime.getRuntime().availableProcessors();
    //构造以cpu核心数为核心池,cpu线程数为最大池,超时时间为1s,线程队列为大小为无界的安全阻塞线程队列,拒绝策略为DiscardOldestPolicy()的线程池。(同步数据当然不能丢下拒绝任务)
    private ExecutorService ThreadPool = new ThreadPoolExecutor(availProcessors >> 1,
            availProcessors,
            1L,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(),Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.DiscardOldestPolicy());

    //保存扫描得到的文件列表
    private static ArrayList<String> fileNameList = new ArrayList<String>();
    private String shellname;
    private String filename;
    private String userip;
    private CountDownLatch countDownLatch;
    private static int deep = 0;

    public RsyncFile(String ShellName, String filename, String UserIP, CountDownLatch countDownLatch) {
        this.shellname = ShellName;
        this.filename = filename;
        this.userip = UserIP;
        this.countDownLatch = countDownLatch;
    }

    public static void main(String[] args) {
        try {
            new RsyncFile().Do(args[0],args[1],Integer.parseInt(args[2]));
        }catch (ArrayIndexOutOfBoundsException e){
            System.out.println(e);
            System.out.println("Error , args send fault");
            System.out.println("please send localAddress remote username @ remote IP or hostname and catalogue");
            System.out.println("like this [  /home/test/  root@node1:/test/   1 ]");
        }catch(NumberFormatException e1){
            System.out.println(e1);
            System.out.println("please input Right Directory depth, this number must be int");
            System.out.println("like this [  /home/test/  root@node1:/test/   1 ]");
        }
    }

    @SneakyThrows
    private void Do(String content,String UserIP,int setdeep){
        System.out.println("开始执行");
        System.out.println("开始时间:" + new Date());
        Long a = System.nanoTime();
        File file = new File(content);
        System.out.println("开始扫描本地指定目录");
        GetAllFile(file,setdeep);//按深度扫描非空文件夹和文件
        System.out.println("扫描本地目录完成");

        //给脚本赋予权限
        String [] cmd={"/bin/sh","-c","chmod 755 ./do*"};
        Runtime.getRuntime().exec(cmd);
        //创建远端目录操作
        System.out.println("开始创建远端目录结构");
        //一次计数锁用于保证目录创建完成
        CountDownLatch doDirLock = new CountDownLatch(1);
        ThreadPool.execute(new RsyncFile("./doDirc.sh",content,UserIP,doDirLock));
        doDirLock.await();
        System.out.println("创建远端目录结构完成");

        //开始同步工作
        System.out.println("开始执行同步工作");
        System.out.println("同步的文件夹或文件总数: " + fileNameList.size());
        System.out.println("正在同步。。。。。");
        //fileNameList.size()次计数锁用于保证数据同步完成(保证计时准确)
        CountDownLatch rsyncLock = new CountDownLatch(fileNameList.size());
        System.out.println(fileNameList.size());
        for (String fileName:fileNameList) {
            //除去文件名中与UserIP重复的文件路径
            String RemoteDir = UserIP.concat(fileName.replace(content, ""));
            System.out.println("要同步的本地目录或文件: " + fileName);
            System.out.println("要同步的远端目录或文件: " + RemoteDir);
            ThreadPool.execute(new RsyncFile("./doRync.sh",fileName, RemoteDir,rsyncLock));
        }
        rsyncLock.await();
        System.out.println("执行同步工作完成");

        //开始文件校验工作
        System.out.println("执行文件校验及重传");
        //一次计数锁用于保证校验完成
        CountDownLatch chechSumLock = new CountDownLatch(1);
        ThreadPool.execute(new RsyncFile("./doChecksum.sh",content,UserIP,chechSumLock));
        chechSumLock.await();
        System.out.println("文件校验及重传完成");
        ThreadPool.shutdown();

        Long b = System.nanoTime();
        Long aLong = (b - a)/1000000L;
        System.out.println("处理时间" + aLong + "ms");
        System.out.println("结束时间:" + new Date());

    }

    /**
     * 执行rsync脚本的线程方法,使用PrintWriter来与linux Terminal交互
     */
    @Override
    public void run() {
        try {
            String command=shellname.concat(" ").concat(filename).concat(" ").concat(userip);
            File wd = new File("/bin");
            Process process = null;
            process = Runtime.getRuntime().exec("/bin/bash", null, wd);
            if (process != null) {
                InputStream is = process.getInputStream();
                BufferedReader reader = new BufferedReader(new InputStreamReader(is));
                PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(process.getOutputStream())),
                        true);
                //切换到当前class文件所在目录
                out.println("cd " + System.getProperty("user.dir"));
                out.println(command);
                out.println("exit");
                StringBuilder sb = new StringBuilder();
                String line;
                while ((line = reader.readLine()) != null) {
                    sb.append(line + System.lineSeparator());
                }
                process.waitFor();
                reader.close();
                out.close();
                process.destroy();
                System.out.println("result:" + sb.toString());
            }else {
                System.out.println("找不到系统bash工具,请检查系统是否异常,并为系统创建/bin/sh的bash工具软连接");
            }
        } catch (Exception e) {
            System.err.println(e.getMessage());
        }finally {
            //倒记数锁释放一次
            countDownLatch.countDown();
        }
    }

    /**遍历指定的目录并能指定深度
     * @param file 指定要遍历的目录
     * @param setDeep 设定遍历深度
     */
    @SneakyThrows
    private static void  GetAllFile(File file, int setDeep) {
        if(file != null){
            if(file.isDirectory() && deep<setDeep){
                deep++;
                File f[] = file.listFiles();
                if(f != null) {
                    int length = f.length;
                    for(int i = 0; i < length; i++)
                        GetAllFile(f[i],setDeep);
                    deep--;
                }
            } else {
                if(file.isDirectory()){
                    //如果为目录末尾添加 / 保证rsync正常处理
                    fileNameList.add(file.getAbsolutePath().concat("/"));
                }else {
                    fileNameList.add(file.getAbsolutePath());
                }
            }
        }
    }

}

doDir.sh

rsync -av --include='*/' --exclude='*' $1 $2 |tee -a /tmp/rsync.log 2>&1
echo "创建目录结构操作"

doRsync.sh

rsync -avzi --stats --progress $1 $2 |tee -a /tmp/rsync.log 2>&1

doChecksum.sh

rsync -acvzi --stats --progress $1 $2 |tee -a /tmp/checksum.log 2>&1

附录

rsync输出日志说明如下

YXcstpoguax  path/to/file
|||||||||||
||||||||||╰- x: The extended attribute information changed
|||||||||╰-- a: The ACL information changed
||||||||╰--- u: The u slot is reserved for future use
|||||||╰---- g: Group is different
||||||╰----- o: Owner is different
|||||╰------ p: Permission are different
||||╰------- t: Modification time is different
|||╰-------- s: Size is different
||╰--------- c: Different checksum (for regular files), or
||              changed value (for symlinks, devices, and special files)
|╰---------- the file type:
|            f: for a file,
|            d: for a directory,
|            L: for a symlink,
|            D: for a device,
|            S: for a special file (e.g. named sockets and fifos)
╰----------- the type of update being done::
             <: file is being transferred to the remote host (sent)
             >: file is being transferred to the local host (received)
             c: local change/creation for the item, such as:
                - the creation of a directory
                - the changing of a symlink,
                - etc.
             h: the item is a hard link to another item (requires
                --hard-links).
             .: the item is not being updated (though it might have
                attributes that are being modified)
             *: means that the rest of the itemized-output area contains
                a message (e.g. "deleting")

到此这篇关于Java之Rsync并发迁移数据并校验详解的文章就介绍到这了,更多相关Java之Rsync并发内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java httpClient连接池支持多线程高并发的实现

    当采用HttpClient httpClient = HttpClients.createDefault() 实例化的时候.会导致Address already in use的异常. 信息: I/O exception (java.net.BindException) caught when processing request to {}->http://**.**.**.** Address already in use: connect 十一月 22, 2018 5:02:13 下午 or

  • Java并发之Condition案例详解

    目录 一.Condition接口介绍和示例 二.Condition接口常用方法 三.Condition接口原理简单解析 3.1.等待 3.2.通知 四.总结 五.利用Condition实现生产者消费者模式 在使用Lock之前,我们使用的最多的同步方式应该是synchronized关键字来实现同步方式了.配合Object的wait().notify()系列方法可以实现等待/通知模式.Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方

  • Java中常见的并发控制手段浅析

    目录 前言 1.1 同步代码块 1.2 CAS自旋方式 1.3 锁 1.4 阻塞队列 1.5 信号量Semaphore 1.6 计数器CountDownLatch 1.7 栅栏 CyclicBarrier 1.8 guava令牌桶 1.9 滑动窗口TimeWindow 1.10 小结 前言 单实例的并发控制,主要是针对JVM内,我们常规的手段即可满足需求,常见的手段大概有下面这些 同步代码块 CAS自旋 锁 阻塞队列,令牌桶等 1.1 同步代码块 通过同步代码块,来确保同一时刻只会有一个线程执行

  • Java并发编程之阻塞队列(BlockingQueue)详解

    目录 队列 阻塞队列 ArrayBlockingQueue 重要属性 构造方法 添加元素 add(e) offer(e) put(e) offer(e,time,unit) 移除元素 take() dequeue() LinkedBlockingQueue 重要属性 构造方法 添加元素 offer(e) put(e) 移除元素 poll() take() 对比 总结 大家好,我是小黑,一个在互联网苟且偷生的农民工. 队列 学过数据结构的同学应该都知道,队列是数据结构中一种特殊的线性表结构,和平时

  • Java并发编程之代码实现两玩家交换装备

    目录 1 Exchanger 是什么 2 Exchanger 详解 3 Exchanger 应用 总结 1 Exchanger 是什么 JDK 1.5 开始 JUC 包下提供的 Exchanger 类可用于两个线程之间交换信息.Exchanger 对象可理解为一个包含2个格子的容器,通过调用 exchanger 方法向其中的格子填充信息,当两个格子中的均被填充信息时,自动交换两个格子中的信息,然后将交换的信息返回给调用线程,从而实现两个线程的信息交换. 功能看似简单,但这在某些场景下是很有用处的

  • 浅谈Java高并发解决方案以及高负载优化方法

    目录 1.HTML静态化 2.图片服务器分离 3.数据库集群和库表散列 4.缓存 5.镜像 6.负载均衡 1)硬件四层交换 2)软件四层交换 一.高并发高负载类网站关注点之数据库 需要注意的是: 二.高并发高负载网站的系统架构之HTML静态化 网站HTML静态化解决方案 : 三.高并发高负载类网站关注点之缓存.负载均衡.存储 负载均衡/加速 存储 四.高并发高负载网站的系统架构之图片服务器分离 利用Apache实现图片服务器的分离,缘由: 环境介绍: 步骤: 五.高并发高负载网站的系统架构之数据

  • Java面试题冲刺第二十四天--并发编程

    目录 面试题1:说一下你对ReentrantLock的理解? CAS: AQS: 追问1:你认为 ReentrantLock 相比 synchronized 都有哪些区别? 面试题2:解释一下公平锁和非公平锁? 面试题3:能详细说一下CAS具体实现原理么? 追问1:那CAS的缺陷有哪些呢? 1.ABA: 2.自旋消耗资源: 3.多变量共享一致性问题: 追问2:讲一下什么是ABA问题?怎么解决? 总结 面试题1:说一下你对ReentrantLock的理解? ReentrantLock是JDK1.5

  • Java 处理高并发负载类优化方法案例详解

    java处理高并发高负载类网站中数据库的设计方法(java教程,java处理大量数据,java高负载数据) 一:高并发高负载类网站关注点之数据库 没错,首先是数据库,这是大多数应用所面临的首个SPOF.尤其是Web2.0的应用,数据库的响应是首先要解决的. 一般来说MySQL是最常用的,可能最初是一个mysql主机,当数据增加到100万以上,那么,MySQL的效能急剧下降.常用的优化措施是M-S(主-从)方式进行同步复制,将查询和操作和分别在不同的服务器上进行操作.我推荐的是M-M-Slaves

  • java实战案例之用户注册并发送邮件激活/发送邮件验证码

    目录 一.前期准备 1.准备两个邮箱账号(一个发邮件,一个收邮件) 1.1)登录需要发送邮件的QQ邮箱,找到设置项 1.2)然后在账户栏下,找到(POP3/SMTP)服务协议 1.3)生成授权码 二.项目 1.准备用户数据表 2.idea 创建项目 2.1)在项目的pom表中导入邮件jar包 2.2)创建user类-用户类 2.3)创建配置文件 2.4)创建EmailController类 2.5)创建EmailService 类 2.6)创建EmailServiceImpl 类 3.准备网页

  • java并发编程JUC CountDownLatch线程同步

    目录 java并发编程JUC CountDownLatch线程同步 1.CountDownLatch是什么? 2.CountDownLatch 如何工作 3.CountDownLatch 代码例子 java并发编程JUC CountDownLatch线程同步 CountDownLatch是一种线程同步辅助工具,它允许一个或多个线程等待其他线程正在执行的一组操作完成.CountDownLatch的概念在java并发编程中非常常见,面试也会经常被问到,所以一定要好好理解掌握. CountDownLa

  • Java 模拟真正的并发请求详情

    java中模拟并发请求,自然是很方便的,只要多开几个线程,发起请求就好了.但是,这种请求,一般会存在启动的先后顺序了,算不得真正的同时并发! 怎么样才能做到真正的同时并发呢? 是本文想说的点,java中提供了闭锁 CountDownLatch, 刚好就用来做这种事就最合适了. 只需要: 开启n个线程,加一个闭锁,开启所有线程: 待所有线程都准备好后,按下开启按钮,就可以真正的发起并发请求了. package com.test; import java.io.BufferedReader; imp

  • JAVA并发图解

    目录 总结 JAVA并发总览 核心问题 并不是程序的漏洞导致的,而是操作系统底层机制导致的 原子性: 可见性问题: 改的是缓存,但是缓存对另一个线程不可见 有序性问题: 正常应该先创建对象,再赋值:而编译器对指令执行顺序出于某些原因进行了优化,然后改变了执行顺序,如下: 解决方案 可见性: 有序性: 这个原则在加了volatile和锁的时候自动生效,也就是说解决了可见性和原子性,可见性顺带就解决了 原子性: 操作系统角度,监视器的名字是管程 解决了原子性问题,可见性和有序性都能解决 并发工具 C

随机推荐