关于通过java调用datax,返回任务执行的方法

DATAX

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

datax的详细介绍

请参考 DataX-Introduction

引言

因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本,阿文为了能更好的管控datax的任务,阿文要求我们对datax进行改造,使用java集成的方式去调用datax,并返回任务执行的详细信息。

datax源码跟踪

从github下完源码开始改造,datax的启动类在datax-core包下Engine类的entry方法,该方法是一个静态方法。

public static void entry(final String[] args) throws Throwable {
 Options options = new Options();
 options.addOption("job", true, "Job config.");
 options.addOption("jobid", true, "Job unique id.");
 options.addOption("mode", true, "Job runtime mode.");

 BasicParser parser = new BasicParser();
 CommandLine cl = parser.parse(options, args);

 String jobPath = cl.getOptionValue("job");

 // 如果用户没有明确指定jobid, 则 datax.py 会指定 jobid 默认值为-1
 String jobIdString = cl.getOptionValue("jobid");
 RUNTIME_MODE = cl.getOptionValue("mode");

 Configuration configuration = ConfigParser.parse(jobPath);

 long jobId;
 if (!"-1".equalsIgnoreCase(jobIdString)) {
  jobId = Long.parseLong(jobIdString);
 } else {
  // only for dsc & ds & datax 3 update
  String dscJobUrlPatternString = "/instance/(\\d{1,})/config.xml";
  String dsJobUrlPatternString = "/inner/job/(\\d{1,})/config";
  String dsTaskGroupUrlPatternString = "/inner/job/(\\d{1,})/taskGroup/";
  List<String> patternStringList = Arrays.asList(dscJobUrlPatternString,
   dsJobUrlPatternString, dsTaskGroupUrlPatternString);
  jobId = parseJobIdFromUrl(patternStringList, jobPath);
 }

 boolean isStandAloneMode = "standalone".equalsIgnoreCase(RUNTIME_MODE);
 if (!isStandAloneMode && jobId == -1) {
  // 如果不是 standalone 模式,那么 jobId 一定不能为-1
  throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, "非 standalone 模式必须在 URL 中提供有效的 jobId.");
 }
 configuration.set(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID, jobId);

 //打印vmInfo
 VMInfo vmInfo = VMInfo.getVmInfo();
 if (vmInfo != null) {
  LOG.info(vmInfo.toString());
 }

 LOG.info("\n" + Engine.filterJobConfiguration(configuration) + "\n");

 LOG.debug(configuration.toJSON());

 ConfigurationValidate.doValidate(configuration);
 Engine engine = new Engine();
 engine.start(configuration);
 }

里面最后通过调用engine.start(configuration) 开始启动,我们点进去,最后会发现在里面是调用JobContainer 的start() 方法。

@Override
 public void start() {
 LOG.info("DataX jobContainer starts job.");

 boolean hasException = false;
 boolean isDryRun = false;
 try {
  this.startTimeStamp = System.currentTimeMillis();
  isDryRun = configuration.getBool(CoreConstant.DATAX_JOB_SETTING_DRYRUN, false);
  if (isDryRun) {
  LOG.info("jobContainer starts to do preCheck ...");
  this.preCheck();
  } else {
  userConf = configuration.clone();
  LOG.debug("jobContainer starts to do preHandle ...");
  this.preHandle();

  LOG.debug("jobContainer starts to do init ...");
  this.init();
  LOG.info("jobContainer starts to do prepare ...");
  this.prepare();
  LOG.info("jobContainer starts to do split ...");
  this.totalStage = this.split();
  LOG.info("jobContainer starts to do schedule ...");
  this.schedule();
  LOG.debug("jobContainer starts to do post ...");
  this.post();

  LOG.debug("jobContainer starts to do postHandle ...");
  this.postHandle();
  LOG.info("DataX jobId [{}] completed successfully.", this.jobId);

  this.invokeHooks();
  }
 } catch (Throwable e) {
  LOG.error("Exception when job run", e);

  hasException = true;

  if (e instanceof OutOfMemoryError) {
  this.destroy();
  System.gc();
  }

  if (super.getContainerCommunicator() == null) {
  // 由于 containerCollector 是在 scheduler() 中初始化的,所以当在 scheduler() 之前出现异常时,需要在此处对 containerCollector 进行初始化

  AbstractContainerCommunicator tempContainerCollector;
  // standalone
  tempContainerCollector = new StandAloneJobContainerCommunicator(configuration);

  super.setContainerCommunicator(tempContainerCollector);
  }

  Communication communication = super.getContainerCommunicator().collect();
  // 汇报前的状态,不需要手动进行设置
  // communication.setState(State.FAILED);
  communication.setThrowable(e);
  communication.setTimestamp(this.endTimeStamp);

  Communication tempComm = new Communication();
  tempComm.setTimestamp(this.startTransferTimeStamp);

  Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);
  super.getContainerCommunicator().report(reportCommunication);

  throw DataXException.asDataXException(
   FrameworkErrorCode.RUNTIME_ERROR, e);
 } finally {
  if (!isDryRun) {

  this.destroy();
  this.endTimeStamp = System.currentTimeMillis();
  if (!hasException) {
   //最后打印cpu的平均消耗,GC的统计
   VMInfo vmInfo = VMInfo.getVmInfo();
   if (vmInfo != null) {
   vmInfo.getDelta(false);
   LOG.info(vmInfo.totalString());
   }

   LOG.info(PerfTrace.getInstance().summarizeNoException());
   this.logStatistics();
  }
  }
 }
 }

而我们需要的任务信息就在this.logStatistics() 中

private void logStatistics() {
 long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
 long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
 if (0L == transferCosts) {
  transferCosts = 1L;
 }

 if (super.getContainerCommunicator() == null) {
  return;
 }

 Communication communication = super.getContainerCommunicator().collect();
 communication.setTimestamp(this.endTimeStamp);

 Communication tempComm = new Communication();
 tempComm.setTimestamp(this.startTransferTimeStamp);

 Communication reportCommunication = CommunicationTool.getReportCommunication(communication, tempComm, this.totalStage);

 // 字节速率
 long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
  / transferCosts;

 long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
  / transferCosts;

 reportCommunication.setLongCounter(CommunicationTool.BYTE_SPEED, byteSpeedPerSecond);
 reportCommunication.setLongCounter(CommunicationTool.RECORD_SPEED, recordSpeedPerSecond);

 super.getContainerCommunicator().report(reportCommunication);

 LOG.info(String.format(
  "\n" + "%-26s: %-18s\n" + "%-26s: %-18s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
   + "%-26s: %19s\n",
  "任务启动时刻",
  dateFormat.format(startTimeStamp),

  "任务结束时刻",
  dateFormat.format(endTimeStamp),

  "任务总计耗时",
  String.valueOf(totalCosts) + "s",
  "任务平均流量",
  StrUtil.stringify(byteSpeedPerSecond)
   + "/s",
  "记录写入速度",
  String.valueOf(recordSpeedPerSecond)
   + "rec/s", "读出记录总数",
  String.valueOf(CommunicationTool.getTotalReadRecords(communication)),
  "读写失败总数",
  String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
 ));

 LOG.info("task-total-info:" + dateFormat.format(startTimeStamp) + "|" +
  dateFormat.format(endTimeStamp) + "|" +
  String.valueOf(totalCosts) + "|" +
  StrUtil.stringify(byteSpeedPerSecond) + "|" +
  String.valueOf(recordSpeedPerSecond) + "|" +
  String.valueOf(CommunicationTool.getTotalReadRecords(communication)) + "|" +
  String.valueOf(CommunicationTool.getTotalErrorRecords(communication))
 );

 if (communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS) > 0
  || communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS) > 0
  || communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS) > 0) {
  LOG.info(String.format(
   "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n",
   "Transformer成功记录总数",
   communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),

   "Transformer失败记录总数",
   communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),

   "Transformer过滤记录总数",
   communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS)
  ));
 }
 }

改造开始

新增返回实体DataxResult (get、set省略)

public class DataxResult {
 //任务启动时刻
 private long startTimeStamp;
 //任务结束时刻
 private long endTimeStamp;
 //任务总时耗
 private long totalCosts;
 //任务平均流量
 private long byteSpeedPerSecond;
 //记录写入速度
 private long recordSpeedPerSecond;
 //读出记录总数
 private long totalReadRecords;
 //读写失败总数
 private long totalErrorRecords;
 //成功记录总数
 private long transformerSucceedRecords;
 // 失败记录总数
 private long transformerFailedRecords;
 // 过滤记录总数
 private long transformerFilterRecords;
 //字节数
 private long readSucceedBytes;
 //转换开始时间
 private long endTransferTimeStamp;
 //转换结束时间
 private long startTransferTimeStamp;
 //转换总耗时
 private long transferCosts;

重写logStatistics方法,返回该实体。

private DataxResult logStatistics(DataxResult resultMsg) {
 long totalCosts = (this.endTimeStamp - this.startTimeStamp) / 1000;
 long transferCosts = (this.endTransferTimeStamp - this.startTransferTimeStamp) / 1000;
 if (0L == transferCosts) {
  transferCosts = 1L;
 }
 if (super.getContainerCommunicator() == null) {
  return resultMsg;
 }
 Communication communication = super.getContainerCommunicator().collect();
 long byteSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES)
  / transferCosts;
 long recordSpeedPerSecond = communication.getLongCounter(CommunicationTool.READ_SUCCEED_RECORDS)
  / transferCosts;

 return resultMsg.getResultMsg(startTimeStamp,
  endTimeStamp,
  totalCosts,
  byteSpeedPerSecond,
  recordSpeedPerSecond,
  communication.getLongCounter(CommunicationTool.TRANSFORMER_SUCCEED_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FAILED_RECORDS),
  communication.getLongCounter(CommunicationTool.TRANSFORMER_FILTER_RECORDS),
  communication.getLongCounter(CommunicationTool.READ_SUCCEED_BYTES),
  this.endTransferTimeStamp,
  this.startTransferTimeStamp,
  transferCosts
 );

 }

还需要重写JobContainer的**start()**方法。

@Override
 public DataxResult start(DataxResult dataxResult) {
 ...
 DataxResult result = new DataxResult();
 result = logStatistics(dataxResult);
 ...
 return result;
 }

然后在Engine 类中添加模拟测试方法mockentry

 public DataxResult mockstart(Configuration allConf) {

 ...
 DataxResult dataxResult = new DataxResult();
 return container.start(dataxResult);
 }

开始测试

在com.alibaba.datax.core.util.container.CoreConstant里修改datax_home 为本地路径

该datax_home路径下有以下几个目录

public class test {

 public static void main(String[] args) {
 String[] datxArgs = {"-job", CoreConstant.DATAX_HOME + "\\job\\job2.json", "-mode", "standalone", "-jobid", "-1"};
 try {
  DataxResult dataxResult= Engine.mockentry(datxArgs);
 } catch (Throwable e) {
  e.printStackTrace();
 }

 }
}

执行结果为

3

大功告成!

以上这篇关于通过java调用datax,返回任务执行的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持我们。

(0)

相关推荐

  • Java通过python命令执行DataX任务的实例

    1.安装datax 2.安装python并配置环境变量 3.把mysql2odps.json文件放在datax安装目录的job文件夹下 4.运行Test.java测试 mysql2odps.json文件: { "job": { "content": [ { "reader": { "name": "mysqlreader", "parameter": { "username&q

  • 关于通过java调用datax,返回任务执行的方法

    DATAX DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL.Oracle.SqlServer.Postgre.HDFS.Hive.ADS.HBase.TableStore(OTS).MaxCompute(ODPS).DRDS 等各种异构数据源之间高效的数据同步功能. datax的详细介绍 请参考 DataX-Introduction 引言 因为业务需要,需要使用到datax把数据从文本写入到数据库,原来的做法都是使用python通过datax.py去调用脚本

  • java调用WebService服务的四种方法总结

    目录 一.前言 二.简介   三.具体解析 第一种方式,首先得下载axis2的jar包,Axis2提供了一个wsdl2java.bat命令可以根据WSDL文件自动产生调用WebService的代码. 第二种RPC 方式,强烈推荐. 第三种:利用HttpURLConnection拼接和解析报文进行调用. 第四种,利用httpclient 总结 一.前言 本来不想写这个的,因为网上类似的是在是太多了.但是想想自己前面段时间用过,而且以后可能再也没机会用了.所以还是记录一下吧.我这儿是以C语言生成的W

  • java调用ffmpeg实现视频转换的方法

    本文实例讲述了java调用ffmpeg实现视频转换的方法.分享给大家供大家参考.具体分析如下: 这里环境我是在windows平台下测试的... 需要在e:\下有ffmpeg.exe;mencoder.exe;drv43260.dll;pncrt.dll共4个文件.   还要在e:\input下放各种文件名为a的以下各种视频文件:还要e:\output:java程序执行后能得到一个a.flv的已转换的文件. ffmpeg.exe能解析的格式:(asx,asf,mpg,wmv,3gp,mp4,mov

  • java调用Restful接口的三种方法

    目录 1,基本介绍 2,HttpURLConnection实现 3.HttpClient实现 4.Spring的RestTemplate 1,基本介绍 Restful接口的调用,前端一般使用ajax调用,后端可以使用的方法比较多, 本次介绍三种: 1.HttpURLConnection实现 2.HttpClient实现 3.Spring的RestTemplate 2,HttpURLConnection实现 @Controller public class RestfulAction { @Aut

  • Java访问WebService返回XML数据的方法

    本文实例讲述了Java访问WebService返回XML数据的方法.分享给大家供大家参考.具体如下: import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import java.io.FileNotFoundException; import java.io

  • Windows系统中Java调用cmd命令及执行exe程序的方法

    Java调用cmd命令,并输出显示信息: package com.anxin.cmd.test; import java.io.BufferedReader; import java.io.InputStreamReader; public class Command { public static void main(String[] args) { try { Runtime rt = Runtime.getRuntime(); Process pr = rt.exec("cmd /c di

  • Java调用Windows的DOS命令的方法

    这是一个使用java代码调用dos命令的实例,在这里我就不多说,直接上代码,代码如下: 复制代码 代码如下: import java.io.*; /**  *  Java调用windows的DOS命令  *  实现调用Windows的ipconfig命令,然后将输出的信息通过IO流输出到控制台.  */ public class RunWindowsCommand{     public static void main(String[] args) {         InputStream

  • 安卓应用开发通过java调用c++ jni的图文使用方法

    首先建议一个工程 HelloJni如下图: 按照默认的配置下一步,直到完成 . 如下图操作,点击windows菜单->Prefrence菜单: 弹出如下图:选择Andriod ->NDK: 配置完成以后,点击工程属性菜单: 做这一步的目的是,增加对c++代码的支持,他会自动生成一些东西,你会看到多一个jni的文件夹. 这个名字可以默认,就用工程的名字,实际上就是产生的c++代码生成.so文件的名称(windows上的dll文件). 完成以后.可以看代码,生成一个HelloJni的.cpp文件.

  • Java调用echarts提供的地图压缩方法来压缩地图

    由于默认情况下的 GeoJSON 文件太大,ECharts 提供了方法可以压缩地图. GeoJSON 介绍:http://geojson.org/ ECharts 提供了压缩算法,但是代码是 JavaScript 的,参考代码写个 Java 的也可以,但是通过 Java 提供的脚本引擎可以很方便的直接执行 JS. 本文压缩代码参考下面的项目: https://github.com/giscafer/mapshaper-plus 这里的算法也是来源于 ECharts 从该项目提取出压缩地图的 JS

  • 解决java调用dll报Unable to load library错误的问题

    如下: 1.确保路径下有该dll文件 2.dll和jdk必须确保同样是64位或者32位. 3.地址url中不能出现"/",必须是"\". 4.路径一定要对. 5.如果有同样的文件名出现,要在路径后面加上 ".dll",比如:System.getProperty("user.dir")+"\library"+"\PlayCtrl.dll" 补充知识:Java语言调用第三方dll文件的使用方

随机推荐