Java定时调用.ktr文件的示例代码(解决方案)

1.Maven依赖

<!-- Kettle -->
<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>kettle-core</artifactId>
    <version>7.1.0.0-12</version>
</dependency>
<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>kettle-engine</artifactId>
    <version>7.1.0.0-12</version>
</dependency>
<dependency>
    <groupId>pentaho-kettle</groupId>
    <artifactId>metastore</artifactId>
    <version>7.1.0.0-12</version>
</dependency>
<dependency>
    <groupId>commons-io</groupId>
    <artifactId>commons-io</artifactId>
    <version>1.4</version>
</dependency>

<!-- connector -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.49</version>
</dependency>

注意:kettle的jar包依赖会拉不下来,需要将jar包install到本地,命令:

创建 0_install.bat 文件

:: 本地 install kettle-core.jar包
CALL mvn install:install-file -Dfile=kettle-core-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-core  -Dversion=7.1.0.0-12  -Dpackaging=jar 

:: 本地 install kettle-engine.jar包
CALL mvn install:install-file -Dfile=kettle-engine-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-engine  -Dversion=7.1.0.0-12  -Dpackaging=jar 

:: 本地 install metastore.jar包
CALL mvn install:install-file -Dfile=metastore-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=metastore  -Dversion=7.1.0.0-12  -Dpackaging=jar 

pause

或者deploy到内网私服上,命令:

创建 1_deploy.bat 文件

:: 私服 deploy kettle-core.jar包
CALL mvn deploy:deploy-file -Dfile=kettle-core-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-core  -Dversion=7.1.0.0-12  -Dpackaging=jar -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

:: 私服 deploy kettle-engine.jar包
CALL mvn deploy:deploy-file -Dfile=kettle-engine-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=kettle-engine  -Dversion=7.1.0.0-12  -Dpackaging=jar  -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

:: 私服 deploy metastore.jar包
CALL mvn deploy:deploy-file -Dfile=metastore-7.1.0.0-12.jar  -DgroupId=pentaho-kettle  -DartifactId=metastore  -Dversion=7.1.0.0-12  -Dpackaging=jar  -Durl=http://192.168.1.132/nexus/content/repositories/Third/ -DrepositoryId=服务ID

pause

(脚本创建在jar包目录下,创建好之后双击运行即可)

jar包、脚本文件下载地址

https://share.weiyun.com/eaOSjqP7

2.执行.ktr/.kjb工具类

KettleReadUtils.java

import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.util.EnvUtil;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;

import java.io.InputStream;

/**
 * <p> @Title KettleReadUtils
 * <p> @Description Kettle工具包
 *
 * @author zhj
 * @date 2021/4/8 10:50
 */
public class KettleReadUtils {

    /**
     * 调用 kettle ktr
     *
     * @param path 文件路径
     */
    public static void runKtr(String path) {
        try {
            KettleEnvironment.init();
            EnvUtil.environmentInit();
            TransMeta transMeta = new TransMeta(path);
            Trans trans = new Trans(transMeta);
            trans.execute(null);
            trans.waitUntilFinished();
            if (trans.getErrors() > 0) {
                throw new Exception("Errors during transformation execution!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 以流的方式调用 kettle ktr
     *
     * @param in 文件流
     */
    public static void runKtrByStream(InputStream in) {
        try {
            KettleEnvironment.init();
            TransMeta transMeta = new TransMeta(in, null, true, null, null);
            Trans trans = new Trans(transMeta);
            trans.execute(null);
            trans.waitUntilFinished();
            if (trans.getErrors() > 0) {
                throw new Exception("Errors during transformation execution!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 调用 kettle job
     *
     * @param paraNames  多个参数名
     * @param paraValues 多个参数值
     * @param jobPath    如: String fName= "D:\\kettle\\aaa.kjb";
     */
    public static void runJob(String[] paraNames, String[] paraValues, String jobPath) {
        try {
            KettleEnvironment.init();
            JobMeta jobMeta = new JobMeta(jobPath, null);
            Job job = new Job(null, jobMeta);
            // 向Job 脚本传递参数,脚本中获取参数值:${参数名}
            if (paraNames != null && paraValues != null) {
                for (int i = 0; i < paraNames.length && i < paraValues.length; i++) {
                    job.setVariable(paraNames[i], paraValues[i]);
                }
            }
            job.start();
            job.waitUntilFinished();
            if (job.getErrors() > 0) {
                throw new Exception("Errors during job execution!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

3.创建.ktr/.kjb工具类

(此处只是提供java创建途径,可以直接使用Spoon.bat创建好的文件)

import org.apache.commons.io.FileUtils;
import org.pentaho.di.core.KettleEnvironment;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.steps.insertupdate.InsertUpdateMeta;
import org.pentaho.di.trans.steps.tableinput.TableInputMeta;

import java.io.File;

/**
 * <p> @Title KettleReadUtils
 * <p> @Description Kettle工具包
 *
 * @author zhj
 * @date 2021/4/8 10:50
 */
public class KettleWriteUtils {

    /**
     * 数据库连接信息,适用于DatabaseMeta其中 一个构造器DatabaseMeta(String xml)
     */
    private static final String DATABASE_XML_1 =
            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
                    "<connection>" +
                    "<name>db1</name>" +
                    "<server>127.0.0.1</server>" +
                    "<type>MYSQL</type>" +
                    "<access>Native</access>" +
                    "<database>test</database>" +
                    "<port>3306</port>" +
                    "<username>root</username>" +
                    "<password>root</password>" +
                    "</connection>";
    private static final String DATABASE_XML_2 =
            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>" +
                    "<connection>" +
                    "<name>db2</name>" +
                    "<server>127.0.0.1</server>" +
                    "<type>MYSQL</type>" +
                    "<access>Native</access>" +
                    "<database>test</database>" +
                    "<port>3306</port>" +
                    "<username>root</username>" +
                    "<password>root</password>" +
                    "</connection>";
    /**
     * 创建ktr文件
     *
     * @param args args
     */
    public static void main(String[] args) {
        try {
            KettleEnvironment.init();
            KettleWriteUtils kettleWriteUtils = new KettleWriteUtils();
            TransMeta transMeta = kettleWriteUtils.generateMyOwnTrans();
            String transXml = transMeta.getXML();
            String transName = "update_insert_Trans.ktr";
            File file = new File(transName);
            FileUtils.writeStringToFile(file, transXml, "UTF-8");
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }

    }

    /**
     * 生成一个转化,把一个数据库中的数据转移到另一个数据库中,只有两个步骤,第一个是表输入,第二个是表插入与更新操作
     * @return 元数据
     * @throws KettleXMLException 生成XML异常
     */
    private TransMeta generateMyOwnTrans() throws KettleXMLException {
        TransMeta transMeta = new TransMeta();
        //设置转化的名称
        transMeta.setName("insert_update");
        //添加转换的数据库连接
        DatabaseMeta databaseMeta1 = new DatabaseMeta(DATABASE_XML_1);
        transMeta.addDatabase(databaseMeta1);
        DatabaseMeta databaseMeta2 = new DatabaseMeta(DATABASE_XML_2);
        transMeta.addDatabase(databaseMeta2);

        //registry是给每个步骤生成一个标识Id用
        PluginRegistry registry = PluginRegistry.getInstance();
        //第一个表输入步骤(TableInputMeta)
        TableInputMeta tableInput = new TableInputMeta();
        String tableInputPluginId = registry.getPluginId(StepPluginType.class, tableInput);
        //给表输入添加一个DatabaseMeta连接数据库
        DatabaseMeta db1 = transMeta.findDatabase("db1");
        tableInput.setDatabaseMeta(db1);
        String sql = "SELECT USER_ID,USER_NAME FROM t_manager_user";
        tableInput.setSQL(sql);
        //添加TableInputMeta到转换中
        StepMeta tableInputMetaStep = new StepMeta(tableInputPluginId, "table input", tableInput);
        //给步骤添加在spoon工具中的显示位置
        tableInputMetaStep.setDraw(true);
        tableInputMetaStep.setLocation(100, 100);
        transMeta.addStep(tableInputMetaStep);

        //第二个步骤插入与更新
        InsertUpdateMeta insertUpdateMeta = new InsertUpdateMeta();
        String insertUpdateMetaPluginId = registry.getPluginId(StepPluginType.class, insertUpdateMeta);
        //添加数据库连接
        DatabaseMeta db2 = transMeta.findDatabase("db2");
        insertUpdateMeta.setDatabaseMeta(db2);
        //设置操作的表
        insertUpdateMeta.setTableName("t_stat_user_info");
        //设置用来查询的关键字
        insertUpdateMeta.setKeyLookup(new String[]{"USER_ID"});
        insertUpdateMeta.setKeyStream(new String[]{"USER_ID"});
        insertUpdateMeta.setKeyStream2(new String[]{""});
        insertUpdateMeta.setKeyCondition(new String[]{"="});
        //设置要更新的字段
        String[] updatelookup = {"USER_ID","USER_NAME"} ;
        String[] updateStream = {"USER_ID","USER_NAME"} ;
        Boolean[] updateOrNot = {false,true};
        insertUpdateMeta.setUpdateLookup(updatelookup);
        insertUpdateMeta.setUpdateStream(updateStream);
        insertUpdateMeta.setUpdate(updateOrNot);
        //添加步骤到转换中
        StepMeta insertUpdateStep = new StepMeta(insertUpdateMetaPluginId, "insert_update", insertUpdateMeta);
        insertUpdateStep.setDraw(true);
        insertUpdateStep.setLocation(250, 100);
        transMeta.addStep(insertUpdateStep);
        //添加hop把两个步骤关联起来
        transMeta.addTransHop(new TransHopMeta(tableInputMetaStep, insertUpdateStep));
        return transMeta;
    }
}

4.测试执行.ktr文件

执行用例:

public static void main(String[] args) {
    InputStream inputStream = KettleReadUtils.class.getResourceAsStream("/etl/test.ktr");
    runKtrByStream(inputStream);
}

.ktr文件位置:

执行结果:

5.Kettle所使用的mysql-connector 5.1.49 和 8 版本不兼容问题

  • mysql-connector-java 5.1.49 版本中,支持连接驱动,org.gjt.mm.mysql.Driver
  • mysql-connector-java 8.* 版本中,连接驱动,com.mysql.cj.jdbc.Driver
  • 如果直接使用 8.* 版本 去连接 MySQL 数据库的话会出现"错误连接数据库"问题:

Driver class ‘org.gjt.mm.mysql.Driver' could not be found, make sure the ‘MySQL' driver (jar file) is installed.
org.gjt.mm.mysql.Driver

解决方案:

1.关闭Kettle;

2.将/data-integration/lib/ 下面的 mysql-connector-java-5.1.49.jar 替换为 mysql-connector-java-8.*.jar

3.打开Kettle,修改连接类型为 Generic database ,配置驱动名称为 com.mysql.cj.jdbc.Driver;

4.重新导出为.ktr/.kjb文件;

5.再用java调用即可解决问题。

整理完毕,完结撒花~

到此这篇关于Java定时调用.ktr文件的示例代码的文章就介绍到这了,更多相关Java定时调用.ktr文件内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java不解压直接读取压缩包中文件的实现方法

    前言 最近写了个上传压缩包,将压缩包中的图片保存的接口,所以翻了翻网上文件流操作的博客,总结了一个不用解压,直接读取文件的方法 上代码 @RequestMapping(value = "packageUpload") public void packageUpload(HttpServletRequest request, HttpServletResponse response) { File file = null; try { MultipartHttpServletReques

  • Java基础之文件概述

    一.基本概念和常识 下面,我们先介绍一些基本概念和常识,包括二进制思维.文件类型.文本文件的编码.文件系统和文件读写等. 1.1 二进制思维 为了透彻理解文件,我们首先要有一个二进制思维. 所有文件, 不论是可执行文件.图片文件.视频文件.Word文件.压缩文件.txt 文件,都没什么可神秘的,它们都是以0和1的二进制形式保存的.我们 所看到的图片.视频.文本,都是应用程序对这些二进制的解析结果. 作为程序员,我们应该有一个编辑器,能查看文件的二进制形式, 比如UltraEdit,它支持以十六进

  • Java实战之简单的文件管理器

    示例图 可以在指定目录下实现文件的创建.文件夹的创建.文件的复制.粘贴.删除.重命名.返回上一级目录.以及不同设备之间文件的发送 完整代码 package com.atguitu.java; public class FileDemo { public static void main(String[] args) { FileSystem fs = new FileSystem(); fs.start(); } } package com.atguitu.java; import java.a

  • IntelliJ IDEA创建普通的Java 项目及创建 Java 文件并运行的教程

    最近突然看到这篇几年前随手记录的文章,居然浏览量那么高.看来很多小伙伴也开始从 Eclipse 转到 IDEA,这里为了让大家更好的掌握 IDEA 的使用,我建议大家可以看看下面这个 IDEA 教程. IDEA 教程:IntelliJ-IDEA-Tutorial 首先,确保 IDEA 软件正确安装完成,Java 开发工具包 JDK 安装完成. IntelliJ IDEA下载地址:https://www.jetbrains.com/idea/download/#section=windows JD

  • JavaWeb实现文件的上传与下载

    JavaWeb实现文件的上传与下载,供大家参考,具体内容如下 第一步:导包 导入commons-fileupload-1.3.3.jar和commons-io-2.4.jar两个依赖包 第二步:编写前端页面 1.提交页面 index.jsp <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ tagl

  • Java(TM) Platform SE binary 打开jar文件的操作

    直接用javaw.exe想打开aspectj-1.9.4.jar安装aspectJ 选Java™ Platform SE binary提示JVM虚拟机打不开 可能是java的配置出了点问题,这里不想重新去配置java,直接用cmd用指令打开 成功打开AspectJ安装程序开始安装 补充:Java(TM) pPlatform SE binary已停止工作(解决办法) 问题描述: Java™ pPlatform SE binary已停止工作解决办法 问题事件名称:APPCRASH 早起清理了一下电脑

  • java自定义ClassLoader加载指定的class文件操作

    继承ClassLoader并且重写findClass方法就可以自定义一个类加载器,具体什么是类加载器以及类加载器的加载过程与顺序下次再说,下面给出一个小demo 首先定义一个类,比如MyTest,并且将其编译成class文件,然后放到一个指定的文件夹下面,其中文件夹的最后几层就是它的包名,这里我将这个编译好的类放到 : /Users/allen/Desktop/cn/lijie/MyTest.class package cn.lijie; public class MyTest { public

  • Java 如何实现解压缩文件和文件夹

    一 前言 项目开发中,总会遇到解压缩文件的时候.比如,用户下载多个文件时,服务端可以将多个文件压缩成一个文件(例如xx.zip或xx.rar).用户上传资料时,允许上传压缩文件,服务端进行解压读取每一个文件. 基于通用性,以下介绍几种解压缩文件的方式,包装成工具类,供平时开发使用. 二 压缩文件 压缩文件,顾名思义,即把一个或多个文件压缩成一个文件.压缩也有2种形式,一种是将所有文件压缩到同一目录下,此种方式要注意文件重名覆盖的问题.另一种是按原有文件树结构进行压缩,即压缩后的文件树结构保持不变

  • 浅谈javap命令拆解字节码文件

    目的拆解分析反编译字节码 解析成人能够理解的结构 ,然后再对字节码文件进一步分析 源代码 public class test { private static int classV =2; public static void main(String[] args) { classV =200; int localV =4; localV =400; } } 二进制 idea bin_ed插件查看. 看不懂 那就使用人能看的懂的汇编语言查看类文件结构和代码指令. javap 指令和选项 0:无选

  • Java IO流学习总结之文件传输基础

    一.Java IO流总览 二.File类 2.1 常用API package pkg1; import java.io.File; import java.io.IOException; /** * @author Administrator * @date 2021/4/2 */ public class FileDemo { public static void main(String[] args) { // 了解构造函数,可查看API File file = new File("d:\\

  • java中Servlet程序下载文件实例详解

    对于一些普通的文件下载,想必大家都会去点击默认的链接进行资料获取.效率慢是一个方面,有时候下载的过程并不顺序.在学习了python中的一些程序后,我们可以选择使用Servlet进行文件的下载.下面我们先就Servlet进行简单的说明,然后带来有关的下载文件代码实例. 1.说明 Servlet是Sun公司开发的用于交互式地浏览和生成数据,生成动态Web的技术.狭义的Servlet是指Java语言实现的一个接口.但一般情况下,我们把实现了Servlet接口的Java程序叫做Servlet 2.使用s

  • java 如何读取远程主机文件

    我就废话不多说了,大家还是直接看代码吧~ package com.cloudtech.web.util; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import com.cloudtech.web.entity.Role; public class

随机推荐