Apache SeaTunnel实现 非CDC数据抽取实践

目录
  • 01 SeaTunnel简介
    • 1. Apache SeaTunnel整体介绍
    • 2. Apache SeaTunnel技术特性
    • 3. Apache SeaTunnel工作流程
    • 4. Apache SeaTunnel环境依赖
    • 5. Apache SeaTunnel用户使用情况
  • 02 SeaTunnel应用场景
    • 1. 交管行业数据简介
    • 2. 交管行业数据特点
  • 03 相关业务痛点
    • 1. 数据抽取限制较多
  • 04 选择SeaTunnel的原因
    • 1. SeaTunnel的优势
    • 2. SeaTunnel的安装部署
    • 3. SeaTunnel配置文件
    • 4. SeaTunnel插件支持
  • 05 具体实现方案
    • 1. 数据增量更新具体实现
    • 2. 具体方法
  • 06 具体实现流程
    • 1. 确定运算资源
    • 2. 确定数据来源
    • 3. 数据转换
    • 4. 数据输出
    • 5. 脚本和调度执行

导读: 随着全球数据量的不断增长,越来越多的业务需要支撑高并发、高可用、可扩展、以及海量的数据存储,在这种情况下,适应各种场景的数据存储技术也不断的产生和发展。与此同时,各种数据库之间的同步与转化的需求也不断增多,数据集成成为大数据领域的热门方向,于是SeaTunnel应运而生。SeaTunnel是一个分布式、高性能、易扩展、易使用、用于海量数据(支持实时流式和离线批处理)同步和转化的数据集成平台,架构于Apache Spark和Apache Flink之上。本文主要介绍SeaTunnel 1.X在交管行业中的应用,以及其中如何实现从Oracle数据库把数据增量导入数仓这样一个具体的场景。

今天的介绍会围绕下面六点展开:

  • SeaTunnel简介
  • SeaTunnel应用场景
  • 相关业务痛点
  • 选择SeaTunnel的原因
  • 具体实现方案
  • 具体实现流程

01 SeaTunnel简介

下面对SeaTunnel从产品功能,技术特性、工作流程、环境依赖、用户使用等方面做一个总体的介绍。

1. Apache SeaTunnel整体介绍

互联网行业数据量非常大,对性能还有其他各方面的技术要求都非常高,在笔者所在的交管行业中,情况就不太一样,各方面的要求也没有互联网行业那么高,在具体的数据集成应用中,主要是使用SeaTunnel1.X版本。

上图所示内容引用了Apache SeaTunnel官网中的介绍。

Apache Spark对于分布式数据处理来说是一个伟大的进步,但是直接使用Spark框架还是有一定门槛的,SeaTunnel这个产品把业界使用Spark的优质经验固化到了其中,明显降低了学习成本,加快分布式数据处理能力在生产环境中落地。在SeaTunnel2.X版本中,除了Spark,也增加了对Flink的支持。

除此之外,SeaTunnel还可以较好的解决实际业务场景中碰到的下列问题:

  • 数据丢失与重复
  • 数据集成中任务堆积与延迟
  • 数据同步较低的吞吐量
  • Spark/Flink应用到生产环境周期较长、复杂度较高
  • 缺少应用运行状态的监控

2. Apache SeaTunnel技术特性

SeaTunnel具备如上图所示的技术特性:

  • 简单易用,开发配置简单、灵活,无需编码开发,支持通过SQL进行数据处理和聚合,使用成本低
  • 分布式,高性能,经历大规模生产环境使用和海量数据检验,成熟稳定
  • 模块化和插件化,内置丰富插件,并且可以开发定制个性化插件,支持热插拔,具备高扩展性
  • 使用Spark/Flink作为底层数据同步引擎使其具备分布式执行能力

3. Apache SeaTunnel工作流程

SeaTunnel的架构和整个工作流程如下图所示,Input/Source [数据源输入] -> Filter/Transform [数据处理] -> Output/Sink [结果输出],数据处理流水线由多个过滤器构成,以满足多种数据处理需求。如果用户习惯了SQL,也可以直接使用SQL构建数据处理管道,更加简单高效。目前,SeaTunnel支持的过滤器列表也在扩展中。

在插件方面,SeaTunnel已支持多种Input/Sink插件,同时也支持多种Filter/Transform处理插件,整体上基于系统非常易于扩展,用户还可以自行开发数据处理插件,具体如下:

  • Input/Source 插件

Fake, File, Hive/Hdfs, Kafka, Jdbc, ClickHouse, TiDB, HBase, Kudu, S3, Socket, 自行开发的Input插件

  • Filter/Transform 插件

Add, Checksum, Convert, Date, Drop, Grok, Json, Kv, Lowercase, Remove, Rename, Repartition, Replace, Sample, Split, Sql, Table, Truncate, Uppercase, Uuid, 自行开发的Filter/Transform插件

  • Output/Sink 插件

Elasticsearch, File, Hdfs, Jdbc, Kafka, Mysql, ClickHouse, Stdout, 自行开发的Output 插件

4. Apache SeaTunnel环境依赖

SeaTunnel1.X支持Spark计算引擎,SeaTunnel2.X目前支持Spark/Flink两种计算引擎,在笔者的实际项目中使用的是SeaTunnel1.X版本。

5. Apache SeaTunnel用户使用情况

目前有很多公司都在使用SeaTunnel,其中不乏大型公司,例如:中国移动、腾讯云、今日头条、还有笔者所在的中电科。

02 SeaTunnel应用场景

SeaTunnel特别适合以下场景使用:

  • 海量数据集成和ETL
  • 海量数据聚合
  • 多源数据处理

下面主要介绍SeaTunnel在交管行业中的应用。

1. 交管行业数据简介

在交管行业中,数据主要包括驾驶人、车辆相关的数据,平时在道路上发生的一些交通警情数据,交通违法数据,机动车登记信息,执勤执法的数据,交通事故以及其他一些互联网数据,这些数据的量不是很大,另外还有卡口过车、车辆GPS数据,这两种数据的数据量都比较大,例如一些省会城市,每秒钟至少有几千条过车数据,这些数据都是属于交管行业内的数据。

2. 交管行业数据特点

交管行业数据,跟互联网行业的数据还是有很大区别的,首先这些数据的体量大小不一,并且分布在内部的公安网以及智能专网,这两个网之间是物理隔离的,我们需要把这些数据在两个网络之间转移,在这个过程中,还要做一些数据处理。其次,在数据处理实时性方面的要求,并不是非常高,数据的更新频率也不是很高。然后,在数据安全方面,要求比较高,数据是不能丢的,同时对保密性要求也比较高,所以具体的数据也不能展示出来。

03 相关业务痛点

1. 数据抽取限制较多

在做业务的过程中,会有一些业务痛点,首先因为交管行业是政府行业,基本各个子平台的数据都是存储在Oracle数据库中的,我们需要把数据从Oracle数据库中抽取到我们的数仓里面,出于安全性的考虑,无法得到用户级别的权限,我们只能通过一些视图级别的用户权限去处理数据,对于数据源表结构的变更也无法及时知晓。其次,会话数是受到限制的,多线程抽取数据的话,如果会话数达到上限,连接就会受到影响,而且这个分配的用户也同时会用于其他用途。最后,我们在处理一些增量数据的时候,一般情况下需要一个增量列,用于保持一个增量更新,很多时候,是没办法确定哪些列可以作为增量列的。以上就是在做业务的过程中,经常会遇到的一些问题,下图也把这些问题列举了出来。

04 选择SeaTunnel的原因

最初的时候,做数据处理、数据抽取的时候,并没有使用SeaTunnel,而是使用Apache NiFi,这个工具功能比较强大而且全面,但是NiFi中用于数据处理的处理器比较多,而且数据处理链路中要做很多转换,所以需要对NiFi里面的各种组件要非常熟悉,对使用者的要求也比较高。

1. SeaTunnel的优势

我们一开始也用Spark程序做数据处理,对大数据相关人员的要求比较高,我们这边大数据人员比较少,有时处理一些新的需求的时候,会比较繁忙。如果不需要通过编码,而是直接使用工具,进行简单的配置就能实现的话,会带来较大的便利和效率的提高。

前面在SeaTunnel的介绍中,已经讲到SeaTunnel是比较易于使用的,安装部署方便,开箱即用,执行效率很高,因为它是分布式的,可以应用整个集群资源来做数据处理工作。

SeaTunnel无需编程,只要做简单的配置,并且它的Source和Sink都比较丰富,并且可以自己根据接口开发需要的插件,对数据源的权限要求也不高。

更加重要的是,SeaTunnel是首个进入Apache孵化的国人开源数据集成平台。

2. SeaTunnel的安装部署

如上图所示是SeaTunnel官方部署文档,只需要简单几步,就可以把SeaTunnel安装到我们的环境之中,然后就可以使用了。

3. SeaTunnel配置文件

下图所示是一个配置文件的示例,这个配置文件是SeaTunnel1.X版本的一个配置,一个完整的SeaTunnel配置包含spark, input, filter, output四部分,其中spark是spark相关的配置,例如,启动多少个executor,每个 executor使用多少核数的CPU,多少内存等,input可配置任意的input插件及其参数,具体参数随不同的input插件而变化,filter可配置任意的filter插件及其参数,具体参数随不同的filter插件而变化,filter中的多个插件按配置顺序形成了数据处理的pipeline, 上一个filter的输出是下一个filter的输入,通过input插件把数据取出,成为了spark里面的一个数据集,然后filter插件会对这个数据集做一些转换操作,output可配置任意的output插件及其参数,具体参数随不同的output插件而变化,filter处理完的数据,会发送给output中配置的每个插件

4. SeaTunnel插件支持

如下图所示,SeaTunnel支持的插件非常丰富,日常所能用到的基本都有。

这里面着重介绍一下filter插件中的sql插件,这个插件非常灵活,在用sql插件做转换操作时,只要是sparksql里面支持的函数等内容,都可以在这里使用,然后再output到目标数据存储,例如HDFS、Kafka、ES、Clickhouse等。

05 具体实现方案

接下来讲一下具体的实现方案,在我们具体的业务中,如何把这些行业数据从智能专网直接抽取到公安网中,这里会涉及到数据的增量更新。

1. 数据增量更新具体实现

当需要实现一个增量更新的时候,首先就是增量列的选择,之前提到原先是用NiFi来做增量更新,但是对增量列的支持不是特别好,尤其是对日期类型的支持不是很好。但是SeaTunnel对增量列的支持不受列的类型限制,可以比较灵活的进行选择。

2. 具体方法

实际业务当中,选取了记录的更新时间列作为增量列,每次数据抽取过来,会记录增量列的最大值,下次数据抽取时,可以从这个位置继续抽取数据,这个也是受以前写spark程序的启发,把checkpoint存储在HDFS里面。当然,增量列的选择,在实际应用中,除了更新时间,增量ID以外,还有其他业务字段可以做为增量列,增量列的选择一定是根据真正的业务需求,实时的程度和粒度来决定的。

06 具体实现流程

做数据增量更新,最重要的是实现的思路,接下来详细描述一下具体实现过程。

1. 确定运算资源

首先,如下图所示,先要确定计算资源,这里使用了spark,并且针对spark做了相关的配置。

2. 确定数据来源

选择一个增量列,对增量列每次产生的最大值(checkpoint),保存在HDFS一个具体的目录下。这里input插件选择HDFS,每次产生的那个增量数据,指向HDFS的一个具体路径下面,input插件有个通用参数叫做result_table_name,当指定result_table_name时,处理后的数据,会被注册为一个可供其他插件直接访问的数据集,或者被称为临时表。当增量列的最大值保存到HDFS之后,需要取出时,会保存在result_table_name指定的表中。接下来因为是从Oracle数据库中取数据,所以设置相应的Jdbc。当数据量比较大的时候,还可以指定分区列,这样的话,数据处理的效率会提高,详细配置,如下图所示。

3. 数据转换

下图所示是必要的数据转换,在实际业务中,需要做一个过滤操作,取出大于最大更新时间的数据,convert插件里面做的是中间的一些数据类型转换操作,最后使用了一个sql插件,用于记录本次取到的数据的一个最大值,用于下次取数的比较。

4. 数据输出

下图所示的是数据处理后的输出,也就是output插件对应的配置,具体是把数据抽取到Clickhouse里面。然后数据集里面,那个更新列的最大值,通过追加模式,写回到HDFS中,供下次使用。

5. 脚本和调度执行

整个过程是通过下图所示的shell脚本来做的,通过nohup后台执行的方式,利用Crontab进行调度执行,因为在我们实际的业务中,对定时调度的要求不是很高,所以可以采用Crontab或者开源的Dolphin Scheduler都是可以满足的。

下面的截图,是实际运行过程中,产生在HDFS上的增量文件,Crontab调度脚本,以及执行过程中产生的一些Yarn任务列表。

在上述整体数据处理过程中,由于实际情况的限制,尤其我们的数据源是高度受限的Oracle数据库。但是对于很多传统公司,如果老系统是以Oracle为主,并且掌控力度比较大的话,现在想做数据架构升级,需要迁移Oracle中的数据,那么可以采用CDC读取日志或者触发器的方式,把数据变化写入到消息队列里面,通过SeaTunnel就可以很容易的把数据实时写入到其他异构的数据库。

到此这篇关于Apache SeaTunnel实现非CDC数据抽取实践的文章就介绍到这了,更多相关Apache SeaTunnel数据抽取内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 使用 Apache Superset 可视化 ClickHouse 数据的两种方法

    Apache Superset是一个强大的BI工具,它提供了查看和探索数据的方法.它在 ClickHouse 用户中也越来越受欢迎. 我们将介绍安装 Superset 的 2 种方法,然后展示如何从 Superset 连接到您的第一个 ClickHouse 数据库.代码示例基于 Ubuntu 18.04.Superset 1.1.0 和 clickhouse-sqlalchemy 0.1.6. 方法一:Python虚拟环境 第一种方法直接在您的主机上安装 Superset.我们将首先创建一个 P

  • 一文解析Apache Avro数据

    摘要:本文将演示如果序列化生成avro数据,并使用FlinkSQL进行解析.  Avro官方文档所写,http://avro.apache.org/docs/current/index.html. Avro简介 avro是一个数据序列化系统 提供了: 丰富的数据结构 紧凑的,快速的,二进制的数据格式 一种文件格式,用于存储持久化数据 远程过程调用系统(RPC) 和动态语言的简单交互.并不需要为数据文件读写产生代码,也不需要使用或实现RPC协议.代码生成是一种优化方式,但是只对于静态语言有意义.

  • Apache Hudi数据布局黑科技降低一半查询时间

    目录 1. 背景 2. Clustering架构 2.1 调度Clustering 2.2 运行Clustering 2.3 Clustering配置 3. 表查询性能 3.1 进行Clustering之前 3.2 进行Clustering之后 4. 总结 1. 背景 Apache Hudi将流处理带到大数据,相比传统批处理效率高一个数量级,提供了更新鲜的数据.在数据湖/仓库中,需要在摄取速度和查询性能之间进行权衡,数据摄取通常更喜欢小文件以改善并行性并使数据尽快可用于查询,但很多小文件会导致查

  • 基于Apache Hudi在Google云构建数据湖平台的思路详解

    自从计算机出现以来,我们一直在尝试寻找计算机存储一些信息的方法,存储在计算机上的信息(也称为数据)有多种形式,数据变得如此重要,以至于信息现在已成为触手可及的商品.多年来数据以多种方式存储在计算机中,包括数据库.blob存储和其他方法,为了进行有效的业务分析,必须对现代应用程序创建的数据进行处理和分析,并且产生的数据量非常巨大!有效地存储数PB数据并拥有必要的工具来查询它以便使用它至关重要,只有这样对该数据的分析才能产生有意义的结果.大数据是一门处理分析方法.有条不紊地从中提取信息或以其他方式处

  • Apache Hudi结合Flink的亿级数据入湖实践解析

    目录 1. 实时数据落地需求演进 2. 基于Spark+Hudi的实时数据落地应用实践 3. 基于Flink自定义实时数据落地实践 4. 基于Flink + Hudi的落地数据实践 5. 后续应用规划及展望 5.1 取代离线报表,提高报表实时性及稳定性 5.2 完善监控体系,提升落数据任务稳定性 5.3 落数据中间过程可视化探索 本次分享分为5个部分介绍Apache Hudi的应用与实践 1. 实时数据落地需求演进 实时平台上线后,主要需求是开发实时报表,即抽取各类数据源做实时etl后,吐出实时

  • Apache SeaTunnel实现 非CDC数据抽取实践

    目录 01 SeaTunnel简介 1. Apache SeaTunnel整体介绍 2. Apache SeaTunnel技术特性 3. Apache SeaTunnel工作流程 4. Apache SeaTunnel环境依赖 5. Apache SeaTunnel用户使用情况 02 SeaTunnel应用场景 1. 交管行业数据简介 2. 交管行业数据特点 03 相关业务痛点 1. 数据抽取限制较多 04 选择SeaTunnel的原因 1. SeaTunnel的优势 2. SeaTunnel的

  • Python使用正则表达式实现爬虫数据抽取

    1. 为什么要使用正则表达式? 首先,大家来看一个例子.一个文本文件里面存储了一些市场职位信息,格式如下所示: Python3 高级开发工程师 上海互教教育科技有限公司上海-浦东新区2万/月02-18满员 测试开发工程师(C++/python) 上海墨鹍数码科技有限公司上海-浦东新区2.5万/每月02-18未满员 Python3 开发工程师 上海德拓信息技术股份有限公司上海-徐汇区1.3万/每月02-18剩余11人 测试开发工程师(Python) 赫里普(上海)信息科技有限公司上海-浦东新区1.

  • Vue项目数据动态过滤实践及实现思路

    这个问题是在下在做一个Vue项目中遇到的实际场景,这里记录一下我遇到问题之后的思考和最后怎么解决的(老年程序员记性不好 -.-),过程中会涉及到一些Vue源码的概念比如 $mount . render watcher 等 问题是这样的:页面从后台拿到的数据是由 0 . 1 之类的key,而这个key代表的value比如 0-女 . 1-男 的对应关系是要从另外一个数据字典接口拿到的:类似于这样的Api: { "SEX_TYPE": [ { "paramValue":

  • 基于Java8 Stream API实现数据抽取收集

    目标&背景 我们以"处理订单数据"为例,假设我们的应用是一个分布式应用,有"订单应用","物流应用","商品应用"等都是独立的服务.本次我们的目的需要展示订单列表完整数据: 1.查询订单列表. 2.批量查询物流信息. 3.将物流信息填充到订单主信息中. 假设我们定义了一个订单类,具有几个关键的属性:订单号,状态,订单价,快递信息.如下所示: class Order{ String orderSeq; String st

  • Apache中使非伪静态url跳转到伪静态url的方法

    如何使用.htaccess使非伪静态页面301跳转到伪静态页面然后再跳转?这是一个比较蛋疼的跳转.因为不小心就会搞成死循环导致页面无法打开.好在问题最终解决了.记录并分享出来,给需要的朋友借鉴借鉴. 规则如下: RewriteRule ^article-([0-9]+).html$ /article.php?id=$1&r=1 [L] RewriteCond %{QUERY_STRING} ^id=([0-9]+)$ [NC] RewriteRule ^article.php$ /article

  • 解析SQL Server CDC配合Kafka Connect监听数据变化的问题

    写在前面 好久没更新Blog了,从CRUD Boy转型大数据开发,拉宽了不少的知识面,从今年年初开始筹备.组建.招兵买马,到现在稳定开搞中,期间踏过无数的火坑,也许除了这篇还很写上三四篇. 进入主题,通常企业为了实现数据统计.数据分析.数据挖掘.解决信息孤岛等全局数据的系统化运作管理 ,为BI.经营分析.决策支持系统等深度开发应用奠定基础,挖掘数据价值 ,企业会开始着手建立数据仓库,数据中台.而这些数据来源则来自于企业的各个业务系统的数据或爬取外部的数据,从业务系统数据到数据仓库的过程就是一个E

  • OnZoom基于Apache Hudi的一体架构实践解析

    1. 背景 OnZoom是Zoom新产品,是基于Zoom Meeting的一个独一无二的在线活动平台和市场.作为Zoom统一通信平台的延伸,OnZoom是一个综合性解决方案,为付费的Zoom用户提供创建.主持和盈利的活动,如健身课.音乐会.站立表演或即兴表演,以及Zoom会议平台上的音乐课程. 在OnZoom data platform中,source数据主要分为MySQL DB数据和Log数据. 其中Kafka数据通过Spark Streaming job实时消费,MySQL数据通过Spark

  • SQLServer 2008 CDC功能实现数据变更捕获脚本

    CDC:Change Data Capture 复制代码 代码如下: --步骤:本文中以GPOSDB为例 --第一步.对目标库显式启用CDC:--在当前库使用sys.sp_cdc_enable_db.返回0(成功)或1(失败).--注意,无法对系统数据库和分发数据库启用该功能.且执行者需要用sysadmin角色权限.--该存储过程的作用域是整个目标库.包含元数据.DDL触发器.cdc架构和cdc用户.--使用以下代码启用:USE GPOSDB  --要启用CDC的数据库GOEXECUTE sys

  • 详解pandas中利用DataFrame对象的.loc[]、.iloc[]方法抽取数据

    pandas的DataFrame对象,本质上是二维矩阵,跟常规二维矩阵的差别在于前者额外指定了每一行和每一列的名称.这样内部数据抽取既可以用"行列名称(对应.loc[]方法)",也可以用"矩阵下标(对应.iloc[]方法)"两种方式进行. 下面具体说明: (以下程序均在Jupyter notebook中进行,部分语句的print()函数省略) 首先生成一个DataFrame对象: import pandas as pd score = [[34,67,87],[68

随机推荐