SpringBoot整合dataworks的实现过程

目录
  • 注意事项
  • 整合实现
    • 依赖引入
    • 请求参数类编写
    • 工具类编写
    • 初始化操作
  • 测试代码
  • 测试结果
  • 项目地址

注意事项

阿里云的dataworks提供了OpenApi, 需要是企业版或旗舰版才能够调用,也就是付费项目。

这里测试主要是调用拉取dataworks上拉取的脚本,并存储到本地。
脚本包含两部分

1、开发的odps脚本(通过OpenApi获取)2、建表语句脚本(通过dataworks信息去连接maxCompute获取建立语句)

阿里云Dataworks的openApi分页查询限制,一次最多查询100条。我们拉取脚本需要分多页查询

该项目使用到了MaxCompute的SDK/JDBC方式连接,SpringBoot操作MaxCompute SDK/JDBC连接

整合实现

实现主要是编写工具类,如果需要则可以配置成SpringBean,注入容器即可使用

依赖引入

<properties>
    <java.version>1.8</java.version>
    <!--maxCompute-sdk-版本号-->
    <max-compute-sdk.version>0.40.8-public</max-compute-sdk.version>
    <!--maxCompute-jdbc-版本号-->
    <max-compute-jdbc.version>3.0.1</max-compute-jdbc.version>
    <!--dataworks版本号-->
    <dataworks-sdk.version>3.4.2</dataworks-sdk.version>
    <aliyun-java-sdk.version>4.5.20</aliyun-java-sdk.version>
</properties>
<dependencies>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-configuration-processor</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<!--max compute sdk-->
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-sdk-core</artifactId>
    <version>${max-compute-sdk.version}</version>
</dependency>
<!--max compute jdbc-->
<dependency>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>odps-jdbc</artifactId>
    <version>${max-compute-jdbc.version}</version>
    <classifier>jar-with-dependencies</classifier>
</dependency>
<!--dataworks需要引入aliyun-sdk和dataworks本身-->
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-core</artifactId>
    <version>${aliyun-java-sdk.version}</version>
</dependency>
<dependency>
    <groupId>com.aliyun</groupId>
    <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
    <version>${dataworks-sdk.version}</version>
</dependency>
</dependencies>

请求参数类编写

/**
 * @Description
 * @Author itdl
 * @Date 2022/08/09 15:12
 */
@Data
public class DataWorksOpenApiConnParam {
    /**
     * 区域 eg. cn-shanghai
     */
    private String region;

    /**
     * 访问keyId
     */
    private String aliyunAccessId;
    /**
     * 密钥
     */
    private String aliyunAccessKey;

    /**
     * 访问端点  就是API的URL前缀
     */
    private String endPoint;

    /**
     * 数据库类型 如odps
     */
    private String datasourceType;

    /**
     * 所属项目
     */
    private String project;

    /**
     * 项目环境 dev  prod
     */
    private String projectEnv;
}

工具类编写

基础类准备,拉取脚本之后的回调函数

为什么需要回调函数,因为拉取的是所有脚本,如果合并每次分页结果的话,会导致内存溢出,而使用回调函数只是每次循环增加处理函数

/**
 * @Description
 * @Author itdl
 * @Date 2022/08/09 15:12
 */
@Data
public class DataWorksOpenApiConnParam {
    /**
     * 区域 eg. cn-shanghai
     */
    private String region;

    /**
     * 访问keyId
     */
    private String aliyunAccessId;
    /**
     * 密钥
     */
    private String aliyunAccessKey;

    /**
     * 访问端点  就是API的URL前缀
     */
    private String endPoint;

    /**
     * 数据库类型 如odps
     */
    private String datasourceType;

    /**
     * 所属项目
     */
    private String project;

    /**
     * 项目环境 dev  prod
     */
    private String projectEnv;
}

初始化操作

主要是实例化dataworks openApi接口的客户端信息,maxCompute连接的工具类初始化(包括JDBC,SDK方式)

private static final String MAX_COMPUTE_JDBC_URL_FORMAT = "http://service.%s.maxcompute.aliyun.com/api";
/**默认的odps接口地址 在Odps中也可以看到该变量*/
private static final String defaultEndpoint = "http://service.odps.aliyun.com/api";
/**
 * dataworks连接参数
 *
 */
private final DataWorksOpenApiConnParam connParam;

/**
 * 可以使用dataworks去连接maxCompute 如果连接的引擎是maxCompute的话
 */
private final MaxComputeJdbcUtil maxComputeJdbcUtil;

private final MaxComputeSdkUtil maxComputeSdkUtil;

private final boolean odpsSdk;

/**
 * 客户端
 */
private final IAcsClient client;

public DataWorksOpenApiUtil(DataWorksOpenApiConnParam connParam, boolean odpsSdk) {
    this.connParam = connParam;
    this.client = buildClient();
    this.odpsSdk = odpsSdk;
    if (odpsSdk){
        this.maxComputeJdbcUtil = null;
        this.maxComputeSdkUtil = buildMaxComputeSdkUtil();
    }else {
        this.maxComputeJdbcUtil = buildMaxComputeJdbcUtil();
        this.maxComputeSdkUtil = null;
    }
}

private MaxComputeSdkUtil buildMaxComputeSdkUtil() {
    final MaxComputeSdkConnParam param = new MaxComputeSdkConnParam();

    // 设置账号密码
    param.setAliyunAccessId(connParam.getAliyunAccessId());
    param.setAliyunAccessKey(connParam.getAliyunAccessKey());

    // 设置endpoint
    param.setMaxComputeEndpoint(defaultEndpoint);

    // 目前只处理odps的引擎
    final String datasourceType = connParam.getDatasourceType();
    if (!"odps".equals(datasourceType)){
        throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
    }

    // 获取项目环境,根据项目环境连接不同的maxCompute
    final String projectEnv = connParam.getProjectEnv();

    if ("dev".equals(projectEnv)){
        // 开发环境dataworks + _dev就是maxCompute的项目名
        param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
    }else {
        // 生产环境dataworks的项目名和maxCompute一致
        param.setProjectName(connParam.getProject());
    }

    return new MaxComputeSdkUtil(param);
}

private MaxComputeJdbcUtil buildMaxComputeJdbcUtil() {
    final MaxComputeJdbcConnParam param = new MaxComputeJdbcConnParam();

    // 设置账号密码
    param.setAliyunAccessId(connParam.getAliyunAccessId());
    param.setAliyunAccessKey(connParam.getAliyunAccessKey());

    // 设置endpoint
    param.setEndpoint(String.format(MAX_COMPUTE_JDBC_URL_FORMAT, connParam.getRegion()));

    // 目前只处理odps的引擎
    final String datasourceType = connParam.getDatasourceType();
    if (!"odps".equals(datasourceType)){
        throw new BizException(ResultCode.DATA_WORKS_ENGINE_SUPPORT_ERR);
    }

    // 获取项目环境,根据项目环境连接不同的maxCompute
    final String projectEnv = connParam.getProjectEnv();

    if ("dev".equals(projectEnv)){
        // 开发环境dataworks + _dev就是maxCompute的项目名
        param.setProjectName(String.join("_", connParam.getProject(), projectEnv));
    }else {
        // 生产环境dataworks的项目名和maxCompute一致
        param.setProjectName(connParam.getProject());
    }

    return new MaxComputeJdbcUtil(param);
}

调用OpenApi拉取所有脚本

/**
 * 根据文件夹路径分页查询该路径下的文件(脚本)
 * @param pageSize 每页查询多少数据
 * @param folderPath 文件所在目录
 * @param userType 文件所属功能模块 可不传
 * @param fileTypes 设置文件代码类型 逗号分割 可不传
 */
public void listAllFiles(Integer pageSize, String folderPath, String userType, String fileTypes, CallBack.FileCallBack callBack) throws ClientException {
    pageSize = setPageSize(pageSize);
    // 创建请求
    final ListFilesRequest request = new ListFilesRequest();

    // 设置分页参数
    request.setPageNumber(1);
    request.setPageSize(pageSize);

    // 设置上级文件夹
    request.setFileFolderPath(folderPath);

    // 设置区域和项目名称
    request.setSysRegionId(connParam.getRegion());
    request.setProjectIdentifier(connParam.getProject());

    // 设置文件所属功能模块
    if (!ObjectUtils.isEmpty(userType)){
        request.setUseType(userType);
    }
    // 设置文件代码类型
    if (!ObjectUtils.isEmpty(fileTypes)){
        request.setFileTypes(fileTypes);
    }

    // 发起请求
    ListFilesResponse res = client.getAcsResponse(request);

    // 获取分页总数
    final Integer totalCount = res.getData().getTotalCount();
    // 返回结果
    final List<ListFilesResponse.Data.File> resultList = res.getData().getFiles();
    // 计算能分几页
    long pages = totalCount % pageSize == 0 ? (totalCount / pageSize) : (totalCount / pageSize) + 1;
    // 只有1页 直接返回
    if (pages <= 1){
        callBack.handle(resultList);
        return;
    }

    // 第一页执行回调
    callBack.handle(resultList);

    // 分页数据 从第二页开始查询 同步拉取,可以优化为多线程拉取
    for (int i = 2; i <= pages; i++) {
        //第1页
        request.setPageNumber(i);
        //每页大小
        request.setPageSize(pageSize);
        // 发起请求
        res = client.getAcsResponse(request);
        final List<ListFilesResponse.Data.File> tableEntityList = res.getData().getFiles();
        if (!ObjectUtils.isEmpty(tableEntityList)){
            // 执行回调函数
            callBack.handle(tableEntityList);
        }
    }
}

内部连接MaxCompute拉取所有DDL脚本内容

DataWorks工具类代码,通过回调函数处理

    /**
     * 获取所有的DDL脚本
     * @param callBack 回调处理函数
     */
    public void listAllDdl(CallBack.DdlCallBack callBack){
        if (odpsSdk){
            final List<TableMetaInfo> tableInfos = maxComputeSdkUtil.getTableInfos();
            for (TableMetaInfo tableInfo : tableInfos) {
                final String tableName = tableInfo.getTableName();
                final String sqlCreateDesc = maxComputeSdkUtil.getSqlCreateDesc(tableName);
                callBack.handle(tableName, sqlCreateDesc);
            }
        }
    }

MaxCompute工具类代码,根据表名获取建表语句, 以SDK为例, JDBC直接执行show create table即可拿到建表语句

/**
 * 根据表名获取建表语句
 * @param tableName 表名
 * @return
 */
public String getSqlCreateDesc(String tableName) {
    final Table table = odps.tables().get(tableName);
    // 建表语句
    StringBuilder mssqlDDL = new StringBuilder();

    // 获取表结构
    TableSchema tableSchema = table.getSchema();
    // 获取表名表注释
    String tableComment = table.getComment();

    //获取列名列注释
    List<Column> columns = tableSchema.getColumns();
    /*组装成mssql的DDL*/
    // 表名
    mssqlDDL.append("CREATE TABLE IF NOT EXISTS ");
    mssqlDDL.append(tableName).append("\n");
    mssqlDDL.append(" (\n");
    //列字段
    int index = 1;
    for (Column column : columns) {
        mssqlDDL.append("  ").append(column.getName()).append("\t\t").append(column.getTypeInfo().getTypeName());
        if (!ObjectUtils.isEmpty(column.getComment())) {
            mssqlDDL.append(" COMMENT '").append(column.getComment()).append("'");
        }
        if (index == columns.size()) {
            mssqlDDL.append("\n");
        } else {
            mssqlDDL.append(",\n");
        }
        index++;
    }
    mssqlDDL.append(" )\n");
    //获取分区
    List<Column> partitionColumns = tableSchema.getPartitionColumns();
    int partitionIndex = 1;
    if (!ObjectUtils.isEmpty(partitionColumns)) {
        mssqlDDL.append("PARTITIONED BY (");
    }
    for (Column partitionColumn : partitionColumns) {
        final String format = String.format("%s %s COMMENT '%s'", partitionColumn.getName(), partitionColumn.getTypeInfo().getTypeName(), partitionColumn.getComment());
        mssqlDDL.append(format);
        if (partitionIndex == partitionColumns.size()) {
            mssqlDDL.append("\n");
        } else {
            mssqlDDL.append(",\n");
        }
        partitionIndex++;
    }

    if (!ObjectUtils.isEmpty(partitionColumns)) {
        mssqlDDL.append(")\n");
    }
//        mssqlDDL.append("STORED AS ALIORC  \n");
//        mssqlDDL.append("TBLPROPERTIES ('comment'='").append(tableComment).append("');");
    mssqlDDL.append(";");
    return mssqlDDL.toString();
}

测试代码

public static void main(String[] args) throws ClientException {
    final DataWorksOpenApiConnParam connParam = new DataWorksOpenApiConnParam();
    connParam.setAliyunAccessId("您的阿里云账号accessId");
    connParam.setAliyunAccessKey("您的阿里云账号accessKey");
    // dataworks所在区域
    connParam.setRegion("cn-chengdu");
    // dataworks所属项目
    connParam.setProject("dataworks所属项目");
    // dataworks所属项目环境 如果不分环境的话设置为生产即可
    connParam.setProjectEnv("dev");
    // 数据引擎类型 odps
    connParam.setDatasourceType("odps");
    // ddataworks接口地址
    connParam.setEndPoint("dataworks.cn-chengdu.aliyuncs.com");
    final DataWorksOpenApiUtil dataWorksOpenApiUtil = new DataWorksOpenApiUtil(connParam, true);

    // 拉取所有ODPS脚本
    dataWorksOpenApiUtil.listAllFiles(100, "", "", "10", files -> {
        // 处理文件
        for (ListFilesResponse.Data.File file : files) {
            final String fileName = file.getFileName();
            System.out.println(fileName);
        }
    });

    // 拉取所有表的建表语句
    dataWorksOpenApiUtil.listAllDdl((tableName, tableDdlContent) -> {
        System.out.println("=======================================");
        System.out.println("表名:" + tableName + "内容如下:\n");
        System.out.println(tableDdlContent);
        System.out.println("=======================================");
    });
}

测试结果

test_001脚本
test_002脚本
test_003脚本
test_004脚本
test_005脚本
=======================================
表名:test_abc_info内容如下:

CREATE TABLE IF NOT EXISTS test_abc_info
 (
    test_abc1        STRING COMMENT '字段1',
    test_abc2        STRING COMMENT '字段2',
    test_abc3        STRING COMMENT '字段3',
    test_abc4        STRING COMMENT '字段4',
    test_abc5        STRING COMMENT '字段5',
    test_abc6        STRING COMMENT '字段6',
    test_abc7        STRING COMMENT '字段7',
    test_abc8        STRING COMMENT '字段8'
 )
PARTITIONED BY (p_date STRING COMMENT '数据日期'
)
;
=======================================
Disconnected from the target VM, address: '127.0.0.1:59509', transport: 'socket'

项目地址

https://github.com/HedongLin123/dataworks_odps_demo

到此这篇关于SpringBoot整合dataworks的实现过程的文章就介绍到这了,更多相关SpringBoot整合dataworks内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • springboot多模块化整合mybatis,mapper自动注入失败问题及解决

    目录 springboot多模块化整合mybatis,mapper自动注入失败 问题 解决 springboot mapper注入失败的一种原因 具体情况是 解决办法 springboot多模块化整合mybatis,mapper自动注入失败 问题 启动类添加@MapperScan或@ComponentScan,mapper类添加@Mapper或@Repository ==> Consider defining a bean of type 'com.ten.mapper.UserMapper'

  • Springboot整合mqtt服务的示例代码

    首先在pom文件里引入mqtt的依赖配置 <!--mqtt--> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.4</version> </dependency> 其次在springboot 的配置yml文件,配

  • SpringBoot整合mybatis的方法详解

    目录 1依赖配置 2使用 2.1SpringBoot配置整合mybatis: 2.2SpringBoot注解整合mybatis: 2.3在配置类上增加@MapperScan注解,扫描某个包下的全部Mapper文件: 总结 1 依赖配置 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> &l

  • SpringBoot整合第三方技术的详细步骤

    目录 SpringBoot整合第三方技术 一.整合Junit 二.整合Mybatis 三.整合Mybatis-Plus 四.整合Druid 五.总结 SpringBoot整合第三方技术 一.整合Junit 新建一个SpringBoot项目 使用@SpringBootTest标签在test测试包内整合Junit @SpringBootTest class Springboot03JunitApplicationTests { @Autowired private BookService bookS

  • SpringBoot整合dataworks的实现过程

    目录 注意事项 整合实现 依赖引入 请求参数类编写 工具类编写 初始化操作 测试代码 测试结果 项目地址 注意事项 阿里云的dataworks提供了OpenApi, 需要是企业版或旗舰版才能够调用,也就是付费项目. 这里测试主要是调用拉取dataworks上拉取的脚本,并存储到本地.脚本包含两部分 1.开发的odps脚本(通过OpenApi获取)2.建表语句脚本(通过dataworks信息去连接maxCompute获取建立语句) 阿里云Dataworks的openApi分页查询限制,一次最多查询

  • Springboot整合MybatisPlus的实现过程解析

    这篇文章主要介绍了Springboot整合MybatisPlus的实现过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.pom文件 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3

  • SpringBoot整合mybatis简单案例过程解析

    这篇文章主要介绍了SpringBoot整合mybatis简单案例过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 1.在springboot项目中的pom.xml中添加mybatis的依赖 <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifac

  • Springboot整合thymleaf模板引擎过程解析

    这篇文章主要介绍了Springboot整合thymleaf模板引擎过程解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 thymeleaf作为springboot官方推荐使用的模板引擎,简单易上手,功能强大,thymeleaf的功能和jsp有许多相似之处,两者都属于服务器端渲染技术,但thymeleaf比jsp的功能更强大. 1. thymeleaf入门 1.1 引入坐标 <!--springBoot整合thymeleaf--> <d

  • springboot 整合fluent mybatis的过程,看这篇够了

    1.导入pom依赖 <!-- mybatis--> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.3.1</version> </dependency> <!--mysql依赖--> <de

  • SpringBoot整合Mybatis-plus的具体过程使用

    目录 1.MyBatisX插件 2.引入依赖 3.编写配置 4.编写接口 5.运行测试 6.完整代码 1.MyBatisX插件 在使用mybatis或者mybatis-plus时,我们可以安装IDEA的MyBatis的插件 - MyBatisX, 这样我们就可以实现点击接口跳转到sql文件, 点击sql文件可以跳转到接口的功能, 很方便.这个插件的功能还有很多, 可以查看MyBatis-Plus官网 安装方法:打开 IDEA,进入 File -> Settings -> Plugins -&g

  • SpringBoot整合JPA框架实现过程讲解

    目录 一. Spring Boot数据访问概述 二. Spring Data JPA简介 2.1 编写ORM实体类 2.2 编写Repository接口 2.2.1 继承XXRepository<T, ID>接口 2.2.2 操作数据的多种方式 2.2.3 @Transactional事务管理 2.2.4 @Moditying注解 2.3.5 复杂条件查询 三. 使用Spring Boot整合JPA 3.1 添加Spring Data JPA依赖启动器 3.2 编写ORM实体类 3.3 编写R

  • SpringBoot整合Swagger2的完整过程记录

    目录 前言 一.Spring Boot Web 整合 Swagger2 过程 1.1.添加 Swagger2 相关依赖 1.2.配置 Swagger2 配置类 二.配置 Swagger2 接口常用注解 2.1.@Api 请求类说明 2.2.@ApiOperation 方法的说明 2.3.@ApiImplicitParams 和 @ApiImplicitParam 方法参数说明 2.4.@ApiResponses 和 @ApiResponse 方法返回值的说明 2.5.@ApiModel 和 @A

  • SpringBoot 整合 Docker的详细过程

    目录 1. Demo Project 1.1 接口准备 1.2 配置准备 2. Docker 开启远程连接 1.1 修改配置文件 1.2 刷新配置.重启 1.3 认证登录 3. IDEA 安装 Docker 插件 4. Maven 添加 Docker 插件 5. 编写Dockerfile 6. 打包项目 7. 创建容器 8. 校验部署 最近备忘录新加的东西倒是挺多的,但到了新环境水土不服没动力去整理笔记 1. Demo Project 首先准备一个简单的项目,用来部署到 Docker 主机上,并

  • springboot整合微信支付sdk过程解析

    前言 之前做的几个微信小程序项目,大部分客户都有要在微信小程序前端提现的需求.提现功能的实现,自然使用企业付款接口,不过这个功能开通比较麻烦,要满足3个条件; 之前实现过几个微信支付的接口,不过都是自己码的代码,从网上找找拼凑,觉得看起来不舒服~_~ 于是乎找到了微信官方提供的支付sdk.这里用的是java版本,springboot整合java 下载sdk,引入项目 这里可以直接下载官方提供的sdk,然后将几个java类拷贝到你的项目,也可以直接引入maven依赖,这里是直接将Java类拷贝到我

随机推荐