Flink流处理引擎零基础速通之数据的抽取篇

目录
  • 一、CDC
  • 二、常见CDC的比较
  • 三、Flink CDC
  • 四、Flink CDC支持的数据库
  • 五、阿里实现的FlinkCDC使用示例
    • 依赖引入
    • 基于table
    • 基于sql
  • 总结

一、CDC

CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC 。但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术。

二、常见CDC的比较

常见的主要包括Flink CDC,DataX,Canal,Sqoop,Kettle,Oracle Goldengate,Debezium等。

  • DataX,Sqoop和kettle的CDC实现技术主要是基于查询的方式实现的,通过离线调度查询作业,实现批处理请求。这种作业方式无法保证数据的一致性,实时性也较差。
  • Flink CDC,Canal,Debezium和Oracle Goldengate是基于日志的CDC技术。这种技术,利用流处理的方式,实时处理日志数据,保证了数据的一致性,为其他服务提供了实时数据。

三、Flink CDC

2020年 Flink cdc 首次在 Flink forward 大会上官宣, 由 Jark Wu & Qingsheng Ren 两位大佬提出。

Flink CDC connector 可以捕获在一个或多个表中发生的所有变更。该模式通常有一个前记录和一个后记录。Flink CDC connector 可以直接在Flink中以非约束模式(流)使用,而不需要使用类似 kafka 之类的中间件中转数据。

四、Flink CDC支持的数据库

PS:

Flink CDC 2.2才新增OceanBase,PolarDB-X,SqlServer,TiDB 四种数据源接入,均支持全量和增量一体化同步。

截止到目前FlinkCDC已经支持12+数据源。

五、阿里实现的FlinkCDC使用示例

依赖引入

    <!-- flink table支持 -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
    </dependency>
    <!-- 阿里实现的flink mysql CDC -->
    <dependency>
      <groupId>com.alibaba.ververica</groupId>
      <artifactId>flink-connector-mysql-cdc</artifactId>
      <version>1.4.0</version>
    </dependency>
    <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.28</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.80</version>
    </dependency>
    <!-- jackson报错解决 -->
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-core</artifactId>
      <version>${jackson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>${jackson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-parameter-names</artifactId>
      <version>${jackson.version}</version>
    </dependency>

基于table

package spendreport.cdc;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import java.util.List;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
;
/**
 * @author zhengwen
 **/
public class TestMySqlFlinkCDC {
  public static void main(String[] args) throws Exception {
    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    //2.Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传, 需要从 Checkpoint 或者 Savepoint 启动程序
    //2.1 开启 Checkpoint,每隔 5 秒钟做一次 CK
    env.enableCheckpointing(5000L);
    //2.2 指定 CK 的一致性语义
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    //2.3 设置任务关闭的时候保留最后一次 CK 数据
    env.getCheckpointConfig().enableExternalizedCheckpoints(
        CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //2.4 指定从 CK 自动重启策略
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));
    DebeziumSourceFunction<String> sourceFunction = MySQLSource.<String>builder()
        .hostname("127.0.0.1")
        .serverTimeZone("GMT+8")  //时区报错增加这个设置
        .port(3306)
        .username("root")
        .password("123456")
        .databaseList("wz")
        .tableList("wz.user_info")  //注意表一定要写库名.表名这种,多个,隔开
        .startupOptions(StartupOptions.initial())
        //自定义转json格式化
        .deserializer(new MyJsonDebeziumDeserializationSchema())
        //自带string格式序列化
        //.deserializer(new StringDebeziumDeserializationSchema())
        .build();
    DataStreamSource<String> streamSource = env.addSource(sourceFunction);
    //TODO 可以keyBy,比如根据table或type,然后开窗处理
    //3.打印数据
    streamSource.print();
    //streamSource.addSink(); 输出
    //4.执行任务
    env.execute("flinkTableCDC");
  }
  private static class MyJsonDebeziumDeserializationSchema implements
      com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema<String> {
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
        throws Exception {
      Struct value = (Struct) sourceRecord.value();
      Struct source = value.getStruct("source");
      //获取数据库名称
      String db = source.getString("db");
      String table = source.getString("table");
      //获取数据类型
      String type = Envelope.operationFor(sourceRecord).toString().toLowerCase();
      if (type.equals("create")) {
        type = "insert";
      }
      JSONObject jsonObject = new JSONObject();
      jsonObject.put("database", db);
      jsonObject.put("table", table);
      jsonObject.put("type", type);
      //获取数据data
      Struct after = value.getStruct("after");
      JSONObject dataJson = new JSONObject();
      List<Field> fields = after.schema().fields();
      for (Field field : fields) {
        String field_name = field.name();
        Object fieldValue = after.get(field);
        dataJson.put(field_name, fieldValue);
      }
      jsonObject.put("data", dataJson);
      collector.collect(JSONObject.toJSONString(jsonObject));
    }
    @Override
    public TypeInformation<String> getProducedType() {
      return BasicTypeInfo.STRING_TYPE_INFO;
    }
  }
}

运行效果

PS:

  • 操作数据库的增删改就会立马触发
  • 这里是自定义的序列化转json格式字符串,自带的字符串序列化也是可以的(可以自己试试打印的内容)

基于sql

package spendreport.cdc;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
 * @author zhengwen
 **/
public class TestMySqlFlinkCDC2 {
  public static void main(String[] args) throws Exception {
    //1.创建执行环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    //2.创建 Flink-MySQL-CDC 的 Source
    String connectorName = "mysql-cdc";
    String dbHostName = "127.0.0.1";
    String dbPort = "3306";
    String dbUsername = "root";
    String dbPassword = "123456";
    String dbDatabaseName = "wz";
    String dbTableName = "user_info";
    String tableSql = "CREATE TABLE t_user_info ("
        + "id int,mobile varchar(20),"
        + "user_name varchar(30),"
        + "real_name varchar(60),"
        + "id_card varchar(20),"
        + "org_name varchar(100),"
        + "user_stars int,"
        + "create_by int,"
        // + "create_time datetime,"
        + "update_by int,"
        // + "update_time datetime,"
        + "is_deleted int) "
        + " WITH ("
        + " 'connector' = '" + connectorName + "',"
        + " 'hostname' = '" + dbHostName + "',"
        + " 'port' = '" + dbPort + "',"
        + " 'username' = '" + dbUsername + "',"
        + " 'password' = '" + dbPassword + "',"
        + " 'database-name' = '" + dbDatabaseName + "',"
        + " 'table-name' = '" + dbTableName + "'"
        + ")";
    tableEnv.executeSql(tableSql);
    tableEnv.executeSql("select * from t_user_info").print();
    env.execute();
  }
}

运行效果:

总结

既然是基于日志,那么数据库的配置文件肯定要开启日志功能,这里mysql需要开启内容

server-id=1
log_bin=mysql-bin
binlog_format=ROW  #目前还只能支持行
expire_logs_days=30
binlog_do_db=wz #这里binlog的库如果有多个就再写一行,千万不要写成用,隔开

  • 实时性确实高,比那些自动任务定时取体验号百倍
  • 流示的确实丝滑

最后肯定证明这种方式同步数据可行,而且实时性特高,但是就是不知道我们的目标数据库是否可以开启这些日志配置。UP!

到此这篇关于Flink流处理引擎零基础速通之数据的抽取篇的文章就介绍到这了,更多相关Flink数据的抽取内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 浅谈Flink容错机制之作业执行和守护进程

    一.作业执行容错 Flink 的错误恢复机制分为多个级别,即 Execution 级别的 Failover 策略和 ExecutionGraph 级别的 Job Restart 策略.当出现错误时,Flink 会先尝试触发范围小的错误恢复机制,如果仍处理不了才会升级为更大范围的错误恢复机制,具体可以看下面的序列图. 当 Task 发生错误,TaskManager 会通过 RPC 通知 JobManager,后者将对应 Execution 的状态转为 failed 并触发 Failover 策略.

  • Flink入门级应用域名处理示例

    目录 概述 算子 FlatMap KeyBy Reduce 连接socket测试 连接kafka 正式 测试 打包上传服务器 概述 最近做了一个小任务,要使用Flink处理域名数据,在4GB的域名文档中求出每个域名的顶级域名,最后输出每个顶级域名下的前10个子级域名.一个比较简单的入门级Flink应用,代码很容易写,主要用到的算子有FlatMap.KeyBy.Reduce.但是由于Maven打包问题,总是提示找不到入口类,卡了好久,最后也是成功解决了. 主体代码如下: public class

  • SpringBoot超详细讲解集成Flink的部署与打包方法

    目录 一.SpringBoot集成Flink 二.FlinkTask写法调整 三.打包插件 四.Flink的上传与运行 总结 一.SpringBoot集成Flink 其实没什么特别的,就把Flink依赖的包在pom引入就行了.只是FlinkTask的写法要小调整下,把相关依赖交给spring管理就行. 然后如果放弃Flink的Dashboard端监控task执行相关信息,那也可以在SpringBoot的启动类里调用就行,但是可能出现task的相关对象没有注入,这种都是小问题(实际就是spring

  • 解析Flink内核原理与实现核心抽象

    目录 一.环境对象 1.1 执行环境 StreamExecutionEnvironment LocalStreamEnvironment RemoteStreamEnvironment StreamContextEnvironment StreamPlanEnvironment ScalaShellStreamEnvironment 1.2 运行时环境 RuntimeEnvironment SavepointEnvironment 1.3 运行时上下文 StreamingRuntimeConte

  • Flink支持哪些数据类型?

    一.支持的数据类型 Flink 对可以在 DataSet 或 DataStream 中的元素类型进行了一些限制.这样做的原因是系统会分析类型以确定有效的执行策略. 1.Java Tuple 和 Scala Case类: 2.Java POJO: 3.基本类型: 4.通用类: 5.值: 6.Hadoop Writables; 7.特殊类型 二.Flink之Tuple类型 Tuple类型  Tuple 是flink 一个很特殊的类型 (元组类型),是一个抽象类,共26个Tuple子类继承Tuple 

  • Flink流处理引擎零基础速通之数据的抽取篇

    目录 一.CDC 二.常见CDC的比较 三.Flink CDC 四.Flink CDC支持的数据库 五.阿里实现的FlinkCDC使用示例 依赖引入 基于table 基于sql 总结 一.CDC CDC (Change Data Capture) ,在广义的概念上,只要能捕获数据变更的技术,都可以称为 CDC .但通常我们说的CDC 技术主要面向数据库(包括常见的mysql,Oracle, MongoDB等)的变更,是一种用于捕获数据库中数据变更的技术. 二.常见CDC的比较 常见的主要包括Fl

  • 2019最新系统学习路线零基础如何转行大数据

    都知道大数据薪资高,前景好.而大数据又需要Java基础.对于稍微懂些Java的童鞋来说,到底如何转行大数据呢?今天小编给你一个大数据工程师具体的学习路线图.[ps:无java基础也可以学习大数据] 分享转行经验路线 对于Java程序员,大数据的主流平台hadoop是基于Java开发的,所以Java程序员往大数据开发方向转行从语言环境上更为顺畅,另外很多基于大数据的应用框架也是Java的,所以在很多大数据项目里掌握Java语言是有一定优势的. 当然,hadoop核心价值在于提供了分布式文件系统和分

  • SpringBoot2零基础到精通之JUnit 5与指标监控

    目录 1 单元测试JUnit 5 1.1JUnit 5简介以及使用 1.2 常用的测试注解 1.3 断言(assertions) 1.4 前置条件(assumptions) 1.5 嵌套测试 1.6 参数化测试 2 指标监控 2.1 使用url实现监控 2.2 可视化的监控平台 1 单元测试JUnit 5 1.1JUnit 5简介以及使用   JUnit5作为最新版本的JUnit框架与之前版本的Junit框架有很大的不同.JUnit5主要由三个不同子项目的几个不同模块组成:JUnit Platf

  • C语言文件操作零基础新手入门保姆级教程

    目录 一.前言 二.文件操作基础知识 ①什么是文件 ②数据文件类型 ③数据如何存储 ④如何读取二进制文件 ⑤什么是文件名 ⑥文件缓冲区 ⑦文件指针 三.文件操作函数 ①fopen 与 fclose ②fputc与fgetc ③fputs与fgets ④fprintf与fscanf ⑤fwrite与fread ⑥fseek与ftell与rewind ⑦ferror与feof ⑧补充函数 sscanf sprintf ⑨补充函数perror  strerror 总结 一.前言 我们如何使我们设计的程

  • SpringBoot2零基础到精通之数据与页面响应

    目录 1 数据响应 1.1 数据响应(JSON为例) 1.2 数据响应之内容协商 2 页面响应 2.1 模板引擎之Thymeleaf 2.2 拦截器 2.3 文件上传 1 数据响应   数据响应一般分为两种:页面响应和数据响应,一般来说页面响应是用来开发一些单体项目(也就是前后端都在一个开发工具中),而数据响应则是用来进行前后端分离开发的项目,前端发送过来请求后端响应相应的数据. 1.1 数据响应(JSON为例)   如果想让SpringMVC响应返回一个JSON类型的数据,首先需要在项目的po

  • SpringBoot2零基础到精通之异常处理与web原生组件注入

    目录 1 异常处理 1.1 异常处理之错误页面 1.2 异常处理之精确捕获 1.3 异常处理之自定义异常 1.4 异常处理之框架底层异常 2 web原生组件的注入 2.1 servlet组件 2.2 filter组件 2.3 listener组件 3 web实现定制化总结 1 异常处理   默认情况下,SpringBoot会提供/error处理所有的错误请求并返回相应的信息,对于浏览器客户端来说会返回一个包含时间戳.状态码.错误信息.携带的自定义异常信息.发生错误的路径等信息的错误Whitela

  • 零基础易语言入门教程(六)之逻辑型命令

    逻辑型命令,就是非真即假的. 具体方法和步骤如下所示: 1.如果(): 属于逻辑型,不是真就是假,这种时间我们基本在编写程序时,会有两个选择方向,见下图所示: 2.如上图,如果命令属于逻辑型数据,且有两条输出方向,当我们在如果命令里填写的为真,那么我们的系统将会显示输出真的一个,反之则为假. 3.如果()命令在我们编写程序时属于常用命令,他在运行时我们需要给他一个条件,然后才能输出内容,有了条件我们在运行时给他一个输出方向即可, 以上所述是小编给大家介绍的零基础易语言入门教程(六)之逻辑型命令的

  • 零基础易语言入门教程(五)之逻辑型数据类型

    在上篇文章给大家介绍了零基础易语言入门教程(四)之数据类型,上篇针对数值到文本类型知识,今天给大家介绍下逻辑型数据. 具体方法和步骤如下所示: 1.逻辑型数据非真即假: 首先申请一个局部变量(A)类型为:逻辑型,编写代码为:A=1>2,那么输出的结果应为假,因等于1是赋值与1,然后代码中写道1大于2,所以这是假的,见下图所示: 2.关系运算符: 在上图大家需注意的是,A后面的等于号是赋值符号,而后面的≥,≠,<一些符号则是关系运算符. 关系运算符不是非要设置变量给其赋值才可以使用的,同样他可以

  • 零基础易语言入门教程(四)之数据类型

    我们一起了解下易语言的数据类型,跟我们现实生活是一样的,分为文本型和数值型,即是我们所说的文科生和理科生的区别. 参考文章:详解易语言中的数据类型 方法和步骤如下所示: 1.数值型(到数值命令): 使用该命令可将文本型等一类数据更改为数值型:我们来输入一行代码看看其作用: 2.到文本()命令: 我们先输入一行代码试试,见下图 3.小结: 每一行代码前后的数据类型必须转换为同一种,方可进行相连,相加,"+"在数据为文本型时是连接作用,数值型的跟数学里的符号一样. 以上所述是小编给大家介绍

  • 零基础易语言入门教程(三)之了解控制台程序

    易语言简介: 易语言是一门以中文作为程序代码编程语言.以"易"著称.创始人为吴涛.早期版本的名字为E语言.易语言最早的版本的发布可追溯至2000年9月11日.创造易语言的初衷是进行用中文来编写程序的实践.从2000年至今,易语言已经发展到一定的规模,功能上.用户数量上都十分可观. 易语言和其它编程语言一样都有后台程序,它也不一定必须是窗口程序的了,下面小编带大家了解易语言的控制台程序. 方法和步骤如下所示: 1.延时命令: 首先学习一个第一个命令,该命令可将其脚本界面延时.1000毫秒

随机推荐