抽取oracle数据到mysql数据库的实现过程

在oracle数据库迁移至mysql数据库,除了oracle数据库模型移到mysql外,还一个重要环节就是要将oracle数据库的数据移到mysql数据库,本人尝试用过多款数据迁移程序,性能都不是很好的,于是自己动手写一个针对于oracle数据库数据迁移到mysql数据程序,其具体过程如下:

1、要抽取mysql表、字段及过滤条件的配制文件imp_data.sql

2、建立一个目录ETL_DIR

3、运行oracle数据库程序P_ETL_ORA_DATA,生成各表的csv数据文件,同时也生成一个导入mysql的脚本文件imp_data.sql

4、导入mysql数据,文件内容如下

load data infile "alarm_hist_inc.csv" into table alarm_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "button_authority.csv" into table button_authority fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "c3_sms_hist_inc.csv" into table c3_sms_hist_inc fields terminated by "," enclosed by "^" lines terminated by "\r\n";
load data infile "datapermisson.csv" into table datapermisson fields terminated by "," enclosed by "^" lines terminated by "\r\n";

附:数据库脚本P_ETL_ORA_DATA

CREATE OR REPLACE PROCEDURE P_ETL_ORA_DATA
(
  P_ORA_DIR  VARCHAR2,
  P_DATA_PATH VARCHAR2
) IS
  TYPE T_REC IS RECORD(
    TBN VARCHAR2(40),
    WHR VARCHAR2(4000));
  TYPE T_TABS IS TABLE OF T_REC;
  V_TABS   T_TABS := T_TABS();
  V_ETL_DIR  VARCHAR2(40) := P_ORA_DIR;
  V_LOAD_FILE UTL_FILE.FILE_TYPE;
  PROCEDURE ETL_DATA
  (
    P_SQL_STMT VARCHAR2,
    P_DATA_PATH VARCHAR2,
    P_TB_NAME  VARCHAR2
  ) IS
  BEGIN
    DECLARE
      V_VAR_COL  VARCHAR2(32767);
      V_NUM_COL  NUMBER;
      V_DATE_COL DATE;
      V_TMZ    TIMESTAMP;
      V_COLS   NUMBER;
      V_COLS_DESC DBMS_SQL.DESC_TAB;
      V_ROW_STR  VARCHAR2(32767);
      V_COL_STR  VARCHAR2(32767);
      V_SQL_ID  NUMBER;
      V_SQL_REF  SYS_REFCURSOR;
      V_EXP_FILE UTL_FILE.FILE_TYPE;
      V_DATA_PATH VARCHAR2(200);
    BEGIN
      V_DATA_PATH := P_DATA_PATH;
      IF REGEXP_SUBSTR(V_DATA_PATH, '\\$') IS NULL
      THEN
        V_DATA_PATH := V_DATA_PATH || '\';
      END IF;
      V_DATA_PATH := REPLACE(V_DATA_PATH, '\', '\\');
      OPEN V_SQL_REF FOR P_SQL_STMT;
      V_SQL_ID := DBMS_SQL.TO_CURSOR_NUMBER(V_SQL_REF);
      DBMS_SQL.DESCRIBE_COLUMNS(V_SQL_ID, V_COLS, V_COLS_DESC);
      FOR I IN V_COLS_DESC.FIRST .. V_COLS_DESC.LAST
      LOOP
        CASE
          WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_VAR_COL, 32767);
          WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_NUM_COL);
          WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_DATE_COL);
          WHEN V_COLS_DESC(I).COL_TYPE = 180 THEN
            DBMS_SQL.DEFINE_COLUMN(V_SQL_ID, I, V_TMZ);
        END CASE;
      END LOOP;
      DECLARE
        V_FLUSH_OVER PLS_INTEGER := 1;
        V_FILE_OVER PLS_INTEGER := 1;
        V_FILE_NO  PLS_INTEGER := 1;
        V_FILE_NAME VARCHAR2(200);
        V_LINE    VARCHAR2(400);
      BEGIN
        WHILE DBMS_SQL.FETCH_ROWS(V_SQL_ID) > 0
        LOOP
          IF V_FILE_OVER = 1
          THEN
            V_FILE_NAME := P_TB_NAME || '_' || V_FILE_NO || '.csv';
            V_EXP_FILE := UTL_FILE.FOPEN(V_ETL_DIR, V_FILE_NAME, OPEN_MODE => 'w', MAX_LINESIZE => 32767);
          END IF;
          V_ROW_STR := '';
          FOR I IN 1 .. V_COLS
          LOOP
            V_COL_STR := '\N';
            BEGIN
              CASE
                WHEN V_COLS_DESC(I).COL_TYPE IN (1, 9, 96) THEN
                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_VAR_COL);
                  IF V_VAR_COL IS NOT NULL
                  THEN
                    V_COL_STR := '^' || V_VAR_COL || '^';
                  END IF;
                WHEN V_COLS_DESC(I).COL_TYPE = 2 THEN
                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_NUM_COL);
                  IF V_NUM_COL IS NOT NULL
                  THEN
                    V_COL_STR := V_NUM_COL;
                  END IF;
                WHEN V_COLS_DESC(I).COL_TYPE = 12 THEN
                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_DATE_COL);
                  IF V_DATE_COL IS NOT NULL
                  THEN
                    V_COL_STR := '^' || TO_CHAR(V_DATE_COL, 'yyyy-mm-dd hh24:mi:ss') || '^';
                  END IF;
                WHEN V_COLS_DESC(I).COL_TYPE IN (180, 181, 231) THEN
                  DBMS_SQL.COLUMN_VALUE(V_SQL_ID, I, V_TMZ);
                  IF V_TMZ IS NOT NULL
                  THEN
                    V_COL_STR := '^' || TO_CHAR(V_TMZ, 'yyyy-mm-dd hh24:mi:ss.ff6') || '^';
                  END IF;
              END CASE;
              IF I = 1
              THEN
                V_ROW_STR := V_COL_STR;
              ELSE
                V_ROW_STR := V_ROW_STR || ',' || V_COL_STR;
              END IF;
            END;
          END LOOP;
          UTL_FILE.PUT_LINE(V_EXP_FILE, CONVERT(V_ROW_STR, 'UTF8'));
          IF V_FILE_OVER > 200000 /*每200000条记录就产生一个新的文件*/
          THEN
            V_FILE_OVER := 1;
            V_FLUSH_OVER := 1;
            V_FILE_NO  := V_FILE_NO + 1;
            UTL_FILE.FCLOSE(V_EXP_FILE);
            V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
            V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
            UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
            UTL_FILE.FFLUSH(V_LOAD_FILE);
            CONTINUE;
          END IF;
          V_FILE_OVER := V_FILE_OVER + 1;
          IF V_FLUSH_OVER > 2000 /*每2000条记录就刷新缓存,写到文件中 */
          THEN
            UTL_FILE.FFLUSH(V_EXP_FILE);
            V_FLUSH_OVER := 1;
          ELSE
            V_FLUSH_OVER := V_FLUSH_OVER + 1;
          END IF;
        END LOOP;
        DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
        IF UTL_FILE.IS_OPEN(V_EXP_FILE)
        THEN
          UTL_FILE.FCLOSE(V_EXP_FILE);
          V_LINE := 'load data infile "' || V_DATA_PATH || V_FILE_NAME || '" into table ' || P_TB_NAME;
          V_LINE := V_LINE || ' fields terminated by "," enclosed by "^" lines terminated by "\r\n";';
          UTL_FILE.PUT_LINE(V_LOAD_FILE, V_LINE);
          UTL_FILE.FFLUSH(V_LOAD_FILE);
        END IF;
      END;
    EXCEPTION
      WHEN OTHERS THEN
        IF DBMS_SQL.IS_OPEN(V_SQL_ID)
        THEN
          DBMS_SQL.CLOSE_CURSOR(V_SQL_ID);
        END IF;
        IF UTL_FILE.IS_OPEN(V_EXP_FILE)
        THEN
          UTL_FILE.FCLOSE(V_EXP_FILE);
        END IF;
        DBMS_OUTPUT.PUT_LINE(SQLERRM);
        DBMS_OUTPUT.PUT_LINE(P_SQL_STMT);
    END;
  END;
BEGIN
  BEGIN
    EXECUTE IMMEDIATE 'create table mysql_etl_tbs(tn varchar2(40),cn varchar2(40),ci number) ';
  EXCEPTION
    WHEN OTHERS THEN
      NULL;
  END;
  EXECUTE IMMEDIATE 'truncate table mysql_etl_tbs';
  DECLARE
    V_CI    PLS_INTEGER;
    V_CN    VARCHAR2(40);
    V_ETL_COLS VARCHAR2(32767);
    V_TBN   VARCHAR2(30);
    V_ETL_CFG VARCHAR2(32767);
    V_CNF_FILE UTL_FILE.FILE_TYPE;
    V_FROM_POS PLS_INTEGER;
  BEGIN
    V_CNF_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'ETL_TABS.CNF', 'r', 32767);
    LOOP
      UTL_FILE.GET_LINE(V_CNF_FILE, V_ETL_CFG, 32767);
      V_FROM_POS := REGEXP_INSTR(V_ETL_CFG, 'from', 1, 1, 0, 'i');
      V_ETL_COLS := SUBSTR(V_ETL_CFG, 1, V_FROM_POS - 1);
      V_ETL_COLS := REGEXP_SUBSTR(V_ETL_COLS, '(select)(.+)', 1, 1, 'i', 2);
      V_TBN   := REGEXP_SUBSTR(V_ETL_CFG, '(\s+from\s+)(\w+)(\s*)', 1, 1, 'i', 2);
      V_TBN   := UPPER(V_TBN);
      V_TABS.EXTEND();
      V_TABS(V_TABS.LAST).TBN := V_TBN;
      V_TABS(V_TABS.LAST).WHR := REGEXP_SUBSTR(V_ETL_CFG, '\s+where .+', 1, 1, 'i');
      V_CI := 1;
      LOOP
        V_CN := REGEXP_SUBSTR(V_ETL_COLS, '\S+', 1, V_CI);
        EXIT WHEN V_CN IS NULL;
        V_CN := UPPER(V_CN);
        EXECUTE IMMEDIATE 'insert into mysql_etl_tbs(tn,cn,ci) values(:1,:2,:3)'
          USING V_TBN, V_CN, V_CI;
        COMMIT;
        V_CI := V_CI + 1;
      END LOOP;
    END LOOP;
  EXCEPTION
    WHEN UTL_FILE.INVALID_PATH THEN
      DBMS_OUTPUT.PUT_LINE('指定的目录:ETL_DIR"' || '"无效!');
      RETURN;
    WHEN UTL_FILE.INVALID_FILENAME THEN
      DBMS_OUTPUT.PUT_LINE('指定的文件:" ETL_TABS.CNF' || '"无效!');
      RETURN;
    WHEN NO_DATA_FOUND THEN
      UTL_FILE.FCLOSE(V_CNF_FILE);
    WHEN OTHERS THEN
      DBMS_OUTPUT.PUT_LINE(SQLERRM);
      RETURN;
  END;
  DECLARE
    V_CUR_MATCH  SYS_REFCURSOR;
    V_SQL_SMT   VARCHAR2(32767);
    V_TN     VARCHAR2(40);
    V_CN     VARCHAR2(40);
    V_CI     PLS_INTEGER;
    V_COLUMN_NAME VARCHAR2(40);
    V_ETL_COLS  VARCHAR2(32767);
    V_LINE    VARCHAR2(4000);
    V_TBN     VARCHAR2(40);
  BEGIN
    V_LOAD_FILE := UTL_FILE.FOPEN(V_ETL_DIR, 'load_data.sql', OPEN_MODE => 'w', MAX_LINESIZE => 32767);
    FOR T_IX IN V_TABS.FIRST .. V_TABS.LAST
    LOOP
      V_SQL_SMT := 'select tn,cn,column_name,ci from ( select * from mysql_etl_tbs where tn='':tbn:'' ) l left join user_tab_columns r on l.tn = r.table_name and l.cn= r.column_name order by ci';
      V_TBN   := V_TABS(T_IX).TBN;
      V_SQL_SMT := REPLACE(V_SQL_SMT, ':tbn:', V_TBN);
      V_ETL_COLS := NULL;
      OPEN V_CUR_MATCH FOR V_SQL_SMT;
      LOOP
        FETCH V_CUR_MATCH
          INTO V_TN, V_CN, V_COLUMN_NAME, V_CI;
        EXIT WHEN V_CUR_MATCH%NOTFOUND;
        IF V_CI > 1
        THEN
          V_ETL_COLS := V_ETL_COLS || ' , ';
        END IF;
        IF V_COLUMN_NAME IS NULL
        THEN
          V_ETL_COLS := V_ETL_COLS || ' cast(null as number) ' || V_CN;
        ELSE
          V_ETL_COLS := V_ETL_COLS || V_CN;
        END IF;
      END LOOP;
      CLOSE V_CUR_MATCH;
      V_TBN   := LOWER(V_TBN);
      V_SQL_SMT := 'select ' || V_ETL_COLS || ' from ' || V_TBN || V_TABS(T_IX).WHR;
      ETL_DATA(V_SQL_SMT, P_DATA_PATH, V_TBN);
    END LOOP;
    IF UTL_FILE.IS_OPEN(V_LOAD_FILE)
    THEN
      UTL_FILE.FCLOSE(V_LOAD_FILE);
    END IF;
  END;
END P_ETL_ORA_DATA;

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对我们的支持。如果你想了解更多相关内容请查看下面相关链接

(0)

相关推荐

  • 为Plesk PHP7启用Oracle OCI8扩展方法总结

    注:本文适用于RHEL/CentOS发行版. 步骤1.安装构建自定义PHP 7模块所需的devel包 # yum install plesk-php70-devel gcc glibc-devel libmemcached-devel zlib-devel make libaio.x86_64 步骤2.下载oracle-instantclient12.1-basic-12.1.0.2.0-1.x86_64.rpm和oracle-instantclient12.1-devel-12.1.0.2.0

  • oracle如何解锁封锁的账号

    (1)我们在安装Oracle的时候最后一步有一个管理账户的,里边可以解锁所所需的账户,但是很多的人有时候没有解锁, 下边研究一下使用命令行的方式解锁账户 (2)启动SQL Plus 先登录SYSTEM账户(密码是你自己安装的时候创建的) 然后连接成功之后输入:alter user scott account unlock; (3)这时候一看到scott账户已成功解锁 (4)登陆一下试试 好的  连接成功! 总结 以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价

  • SELECT INTO 和 INSERT INTO SELECT 两种表复制语句详解(SQL数据库和Oracle数据库的区别)

    1.INSERT INTO SELECT语句 语句形式为:Insert into Table2(field1,field2,...) select value1,value2,... from Table1 或者:Insert into Table2 select * from Table1 注意:(1)要求目标表Table2必须存在,并且字段field,field2...也必须存在 (2)注意Table2的主键约束,如果Table2有主键而且不为空,则 field1, field2...中必须

  • Oracle导入导出数据的几种方式

    oracle导入导出数据 1.导出dmp格式文件 --备份某几张表 !!!! exp smsc/smsc file=/data/oracle_bak/dmp/bakup0209_2.dmp tables=\(send_msg_his,send_msg,recv_msg_his,recv_msg\) --备份整个数据库 !!!! --方式1 exp smsc/smsc file=/data/oracle_bak/dmp/bakupsmmc0209_2.dmp full=y --方式2 exp co

  • 如何利用Oracle命令解决函数运行错误

    1 问题 自定义了一个 Oracle 函数.编译正常:使用 PL/SQL Developer 的 Test 窗口模式,测试通过.但 Java 直接调用失败:使用 PL/SQL Developer 的 SQL 窗口模式,执行失败. 没有有效的错误提示信息. 2 分析 肯定是函数本身有问题,我们要使用有效的工具来定位出问题. 在 Oracle 函数中,加入异常处理. 异常处理 (EXCEPTION)  可用来处理正常执行过程中未预料的事件.如果 PL/SQL 程序块产生异常,但没有指定如何处理时 ,

  • Oracle merge合并更新函数实例详解

    前言 MERGE语句是Oracle9i新增的语法,用来合并UPDATE和INSERT语句. 通过MERGE语句,根据一张表或多表联合查询的连接条件对另外一张表进行查询,连接条件匹配上的进行UPDATE,无法匹配的执行INSERT.这个语法仅需要一次全表扫描就完成了全部工作,执行效率要高于INSERT+UPDATE.通过这个MERGE你能够在一个SQL语句中对一个表同时执行INSERT和UPDATE操作. 本博客介绍一下Oracle merge合并函数,业务场景:新增数据的时候要先查询数据库是否已

  • MySQL实现类似Oracle序列的方案

    MySQL实现类似Oracle的序列 Oracle一般使用序列(Sequence)来处理主键字段,而MySQL则提供了自增长(increment)来实现类似的目的: 但在实际使用过程中发现,MySQL的自增长有诸多的弊端:不能控制步长.开始索引.是否循环等:若需要迁移数据库,则对于主键这块,也是个头大的问题. 本文记录了一个模拟Oracle序列的方案,重点是想法,代码其次. Oracle序列的使用,无非是使用.nextval和.currval伪列,基本想法是: 1.MySQL中新建表,用于存储序

  • Oracle中sql语句(+)符号代表连接的使用讲解

    oracle中sql语句(+)符号代表连接 (+)在=前边为右连接 (+)在=后边为左连接 SELECT a.*, b.* from a(+) = b就是一个右连接,等同于select a.*, b.* from a right join b SELECT a.*, b.* from a = b(+)就是一个左连接,等同于select a.*, b.* from a left join b 内连接 常用的连接运算符=.<.> 使用比较运算符根据每个表共有的列的值匹配两个表中的行 外连接 左连接

  • Mapper批量插入Oracle数据@InsertProvider注解

    Mapper: @Mapper @OracleRepository public interface OracleRadiusMapper{ @InsertProvider(type = OracleRadiusProvider.class , method = "insertRadiusDataBatch") int insertRadiusDataBatch(@Param("list")List<AcctInfo> acctInfoList); }

  • Oracle Index索引无效的原因与解决方法

    索引无效原因 最近遇到一个Oracle SQL语句的性能问题,修改功能之前的运行时间平均为0.3s,可是添加新功能后,时间达到了4~5s.虽然几张表的数据量都比较大(都在百万级以上),但是也都有正确创建索引,不知道到底慢在了哪里,下面展开调查. 经过几次排除,把问题范围缩小在索引上,首先在确定索引本身没有问题的前提下,考虑索引有没有被使用到,那么新的问题来了,怎么知道指定索引是否被启用. 判断索引是否被执行 1. 分析索引 即将索引至于监控状态下,对索引进行分析.如下对 ID_TT_SHOHOU

随机推荐