基于Apache Hudi在Google云构建数据湖平台的思路详解

自从计算机出现以来,我们一直在尝试寻找计算机存储一些信息的方法,存储在计算机上的信息(也称为数据)有多种形式,数据变得如此重要,以至于信息现在已成为触手可及的商品。多年来数据以多种方式存储在计算机中,包括数据库、blob存储和其他方法,为了进行有效的业务分析,必须对现代应用程序创建的数据进行处理和分析,并且产生的数据量非常巨大!有效地存储数PB数据并拥有必要的工具来查询它以便使用它至关重要,只有这样对该数据的分析才能产生有意义的结果。
大数据是一门处理分析方法、有条不紊地从中提取信息或以其他方式处理对于典型数据处理应用程序软件而言过于庞大或复杂的数据量的方法的学科。为了处理现代应用程序产生的数据,大数据的应用是非常必要的,考虑到这一点,本博客旨在提供一个关于如何创建数据湖的小教程,该数据湖从应用程序的数据库中读取任何更改并将其写入数据湖中的相关位置,我们将为此使用的工具如下:

  • Debezium
  • MySQL
  • Apache Kafka
  • Apache Hudi
  • Apache Spark

我们将要构建的数据湖架构如下:

第一步是使用 Debezium 读取关系数据库中发生的所有更改,并将所有更改推送到 Kafka 集群。

Debezium 是一个用于变更数据捕获的开源分布式平台,Debezium 可以指向任何关系数据库,并且它可以开始实时捕获任何数据更改,它非常快速且实用,由红帽维护。

首先,我们将使用 docker-compose 在我们的机器上设置 Debezium、MySQL 和 Kafka,您也可以使用这些的独立安装,我们将使用 Debezium 提供给我们的 mysql 镜像,因为其中已经包含数据,在任何生产环境中都可以使用适当的 Kafka、MySQL 和 Debezium 集群,docker compose 文件如下:

version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  mysql:
    image: debezium/example-mysql:${DEBEZIUM_VERSION}
    ports:
     - 3307:3306
    environment:
     - MYSQL_ROOT_PASSWORD=${MYSQL_ROOT_PASS}
     - MYSQL_USER=${MYSQL_USER}
     - MYSQL_PASSWORD=${MYSQL_USER_PASS}
  schema-registry:
    image: confluentinc/cp-schema-registry
    ports:
     - 8181:8181
     - 8081:8081
    environment:
     - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=kafka:9092
     - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
     - SCHEMA_REGISTRY_HOST_NAME=schema-registry
     - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    links:
     - zookeeper
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
     - 8083:8083
    links:
     - kafka
     - mysql
     - schema-registry
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
     - STATUS_STORAGE_TOPIC=my_connect_statuses
     - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
     - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
     - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
     - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
     - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081

DEBEZIUM_VERSION 可以设置为 1.8。 此外请确保设置 MYSQL_ROOT_PASS、MYSQL_USER 和 MYSQL_PASSWORD。

在我们继续之前,我们将查看 debezium 镜像提供给我们的数据库 inventory 的结构,进入数据库的命令行:

docker-compose -f docker-compose-avro-mysql.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'

在 shell 内部,我们可以使用 show tables 命令。 输出应该是这样的:

我们可以通过 select * from customers 命令来查看客户表的内容。 输出应该是这样的:

现在在创建容器后,我们将能够为 Kafka Connect 激活 Debezium 源连接器,我们将使用的数据格式是 Avro 数据格式,Avro 是在 Apache 的 Hadoop 项目中开发的面向行的远程过程调用和数据序列化框架。它使用 JSON 来定义数据类型和协议,并以紧凑的二进制格式序列化数据。

让我们用我们的 Debezium 连接器的配置创建另一个文件。

{
    "name": "inventory-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "tasks.max": "1",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "MYSQL_USER",
        "database.password": "MYSQL_PASSWORD",
        "database.server.id": "184054",
        "database.server.name": "dbserver1",
        "database.include.list": "inventory",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "value.converter.schema.registry.url": "http://schema-registry:8081"
    }
}

正如我们所看到的,我们已经在其中配置了数据库的详细信息以及要从中读取更改的数据库,确保将 MYSQL_USER 和 MYSQL_PASSWORD 的值更改为您之前配置的值,现在我们将运行一个命令在 Kafka Connect 中注册它,命令如下:

curl -i -X POST -H "Accept:application/json" -H "Content-type:application/json" http://localhost:8083/connectors/ -d @register-mysql.json

现在,Debezium 应该能够从 Kafka 读取数据库更改。
下一步涉及使用 Spark 和 Hudi 从 Kafka 读取数据,并将它们以 Hudi 文件格式放入 Google Cloud Storage Bucket。 在我们开始使用它们之前,让我们了解一下 Hudi 和 Spark 是什么。

Apache Hudi 是一个开源数据管理框架,用于简化增量数据处理和数据管道开发。 该框架更有效地管理数据生命周期等业务需求并提高数据质量。 Hudi 使您能够在基于云的数据湖上管理记录级别的数据,以简化更改数据捕获 (CDC) 和流式数据摄取,并帮助处理需要记录级别更新和删除的数据隐私用例。 Hudi 管理的数据集使用开放存储格式存储在云存储桶中,而与 Presto、Apache Hive 和/或 Apache Spark 的集成使用熟悉的工具提供近乎实时的更新数据访问

Apache Spark 是用于大规模数据处理的开源统一分析引擎。 Spark 为具有隐式数据并行性和容错性的集群编程提供了一个接口。 Spark 代码库最初是在加州大学伯克利分校的 AMPLab 开发的,后来被捐赠给了 Apache 软件基金会,该基金会一直在维护它。

现在,由于我们正在 Google Cloud 上构建解决方案,因此最好的方法是使用 Google Cloud Dataproc。 Google Cloud Dataproc 是一种托管服务,用于处理大型数据集,例如大数据计划中使用的数据集。 Dataproc 是 Google 的公共云产品 Google Cloud Platform 的一部分。 Dataproc 帮助用户处理、转换和理解大量数据。

在 Google Dataproc 实例中,预装了 Spark 和所有必需的库。 创建实例后,我们可以在其中运行以下 Spark 作业来完成我们的管道:

spark-submit \
  --packages org.apache.hudi:hudi-spark3.1.2-bundle_2.12:0.10.1,org.apache.spark:spark-avro_2.12:3.1.2 \
  --master yarn --deploy-mode client \
  --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/lib/hadoop/hudi-packages/hudi-utilities-bundle_2.12-0.10.1.jar \
  --table-type COPY_ON_WRITE --op UPSERT \
  --target-base-path gs://your-data-lake-bucket/hudi/customers \
  --target-table hudi_customers --continuous \
  --min-sync-interval-seconds 60 \
  --source-class org.apache.hudi.utilities.sources.debezium.MysqlDebeziumSource \
  --source-ordering-field _event_origin_ts_ms \
  --hoodie-conf schema.registry.url=http://localhost:8081 \
  --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/dbserver1.inventory.customers-value/versions/latest \
  --hoodie-conf hoodie.deltastreamer.source.kafka.topic=dbserver1.inventory.customers \
  --hoodie-conf bootstrap.servers=localhost:9092 \
  --hoodie-conf auto.offset.reset=earliest \
  --hoodie-conf hoodie.datasource.write.recordkey.field=id \
  --hoodie-conf hoodie.datasource.write.partitionpath.field=id \

这将运行一个 spark 作业,该作业从我们之前推送到的 Kafka 中获取数据并将其写入 Google Cloud Storage Bucket。 我们必须指定 Kafka 主题、Schema Registry URL 和其他相关配置。

结论

可以通过多种方式构建数据湖。 我试图展示如何使用 Debezium、Kafka、Hudi、Spark 和 Google Cloud 构建数据湖。 使用这样的设置,可以轻松扩展管道以管理大量数据工作负载! 有关每种技术的更多详细信息,可以访问文档。 可以自定义 Spark 作业以获得更细粒度的控制。 这里显示的 Hudi 也可以与 Presto、Hive 或 Trino 集成。 定制的数量是无穷无尽的。 本文提供了有关如何使用上述工具构建基本数据管道的基本介绍!

到此这篇关于基于Apache Hudi在Google云构建数据湖平台的文章就介绍到这了,更多相关Apache Hudi构建数据湖内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Apache Pulsar结合Hudi构建Lakehouse方案分析

    目录 1. 动机 2. 分析 3. 当前方案 4. 新的Lakehouse存储方案 4.1 新的存储布局 4.2 支持高效Upserts 4.3 将Hudi表当做Pulsar Topic 4.4 可扩展的元数据管理 5. 引用 1. 动机 Lakehouse最早由Databricks公司提出,其可作为低成本.直接访问云存储并提供传统DBMS管系统性能和ACID事务.版本.审计.索引.缓存.查询优化的数据管理系统,Lakehouse结合数据湖和数据仓库的优点:包括数据湖的低成本存储和开放数据格式访

  • Apache Hudi集成Spark SQL操作hide表

    目录 1. 摘要 2. 环境准备 2.1 启动spark-sql 2.2 设置并发度 3. Create Table 4. Insert Into 4.1 Insert 4.2 Select 5. Update 5.1 Update 5.2 Select 6. Delete 6.1 Delete 6.2 Select 7. Merge Into 7.1 Merge Into Insert 7.2 Select 7.4 Merge Into Update 7.5 Select 7.6 Merge

  • Apache Hudi的多版本清理服务彻底讲解

    目录 1. 回收空间以控制存储成本 2. 问题描述 3. 深入了解 Hudi清理服务 4. 清理服务 5. 例子 6. 配置 7. 运行命令 8. 未来计划 Apache Hudi提供了MVCC并发模型,保证写入端和读取端之间快照级别隔离.在本篇博客中我们将介绍如何配置来管理多个文件版本,此外还将讨论用户可使用的清理机制,以了解如何维护所需数量的旧文件版本,以使长时间运行的读取端不会失败. 1. 回收空间以控制存储成本 Hudi 提供不同的表管理服务来管理数据湖上表的数据,其中一项服务称为Cle

  • 基于Apache Hudi在Google云构建数据湖平台的思路详解

    自从计算机出现以来,我们一直在尝试寻找计算机存储一些信息的方法,存储在计算机上的信息(也称为数据)有多种形式,数据变得如此重要,以至于信息现在已成为触手可及的商品.多年来数据以多种方式存储在计算机中,包括数据库.blob存储和其他方法,为了进行有效的业务分析,必须对现代应用程序创建的数据进行处理和分析,并且产生的数据量非常巨大!有效地存储数PB数据并拥有必要的工具来查询它以便使用它至关重要,只有这样对该数据的分析才能产生有意义的结果.大数据是一门处理分析方法.有条不紊地从中提取信息或以其他方式处

  • webpack构建换肤功能的思路详解

    最近项目中要实现一个换肤的功能,大体想了下,记录一下思路 要实现换肤功能,目标就是打包生成多份皮肤文件,需要哪个就用哪个 打包生成多份皮肤文件因为项目是使用webpack构建的,要想生成多份css文件,就要在入口中配置多个入口文件,每个入口文件会提取出一个css文件 config.entry={ app: ['./src/app.js'], defaultTheme: ['./src/theme.default.color.js'], orangeTheme:['./src/theme.oran

  • 基于JS正则表达式实现模板数据动态渲染(实现思路详解)

    最近业务上需要动态渲染模板数据,好久没写前端代码了,有点生疏,将思路简单写下来,防老: 一.业务需求: 1.前端后端定义好模板以及变量名,保存数据库 2.订单数据是前端根据支付结果获取的,最终渲染完的数据模板需要调用打印机打印出来 3.模板相对商家来说比较固定,但是每个商家需要的模板都有可能不一样,所以需要每次登录后,查询一次模板缓存前端,后续每次支付后,动态渲染数据即可 二.考点: 1.正则表达式 2.精简代码量,尽量减少前端的工作量​ 三.实现思路: 1.需要渲染数据的模板,以${变量名}区

  • 基于C++浮点数(float、double)类型数据比较与转换的详解

    浮点数在内存中的存储机制和整型数不同,其有舍入误差,在计算机中用近似表示任意某个实数.具体的说,这个实数由一个整数或定点数(即尾数)乘以某个基数(计算机中通常是2)的整数次幂得到,这种表示方法类似于基数为10的科学记数法.所以浮点数在运算过程中通常伴随着因为无法精确表示而进行的近似或舍入.但是这种设计的好处是可以在固定的长度上存储更大范围的数.1.将字符串转换为float.double过程存在精度损失,只是float.double各自损失的精度不相同而已std::string str="8.2&

  • OnZoom基于Apache Hudi的一体架构实践解析

    1. 背景 OnZoom是Zoom新产品,是基于Zoom Meeting的一个独一无二的在线活动平台和市场.作为Zoom统一通信平台的延伸,OnZoom是一个综合性解决方案,为付费的Zoom用户提供创建.主持和盈利的活动,如健身课.音乐会.站立表演或即兴表演,以及Zoom会议平台上的音乐课程. 在OnZoom data platform中,source数据主要分为MySQL DB数据和Log数据. 其中Kafka数据通过Spark Streaming job实时消费,MySQL数据通过Spark

  • 基于MyBatis的数据持久化框架的使用详解

    目录 一.MyBatis是什么 1.1.概述 1.2.什么是持久化 1.3.什么是ORM 1.4.MyBatis主要内容 1.5.优点 1.6.缺点 二.MyBatis架构 2.1.mybatis所依赖的jar包 2.2.MyBatis准备工作 三.MyBatis 核心对象 一.MyBatis是什么 1.1.概述 Mybatis是一个优秀的开源.轻量级持久层框架,它对JDBC操作数据库的过程进行封装,简化了加载驱动.创建连接.创建 statement 等繁杂的过程,使开发者只需要关注sql本身.

  • 基于Python的Post请求数据爬取的方法详解

    为什么做这个 和同学聊天,他想爬取一个网站的post请求 观察 该网站的post请求参数有两种类型:(1)参数体放在了query中,即url拼接参数(2)body中要加入一个空的json对象,关于为什么要加入空的json对象,猜测原因为反爬虫.既有query参数又有空对象体的body参数是一件脑洞很大的事情. 一开始先在apizza网站 上了做了相关实验才发现上面这个规律的,并发现该网站的请求参数要为raw形式,要是直接写代码找规律不是一件容易的事情. 源码 import requests im

  • 基于Python对数据shape的常见操作详解

    这一阵在用python做DRL建模的时候,尤其是在配合使用tensorflow的时候,加上tensorflow是先搭框架再跑数据,所以调试起来很不方便,经常遇到输入数据或者中间数据shape的类型不统一,导致一些op老是报错.而且由于水平菜,所以一些常用的数据shape转换操作也经常百度了还是忘,所以想再整理一下. 一.数据的基本属性 求一组数据的长度 a = [1,2,3,4,5,6,7,8,9,10,11,12] print(len(a)) print(np.size(a)) 求一组数据的s

  • 基于DataFrame筛选数据与loc的用法详解

    DataFrame筛选数据与loc用法 python中pandas下的DataFrame是一个很不错的数据结构,附带了许多操作.运算.统计等功能. 如何从一个DataFrame中筛选中出一个元素呢. 以tushare返回的交易日信息为例. df = ts.trade_cal() 数据如下: calendarDate isOpen 0 1990/12/19 1 1 1990/12/20 1 2 1990/12/21 1 3 1990/12/22 0 4 1990/12/23 0 5 1990/12

  • docker在win10家庭版下构建laravel开发环境的教程详解

    操作系统: win10 家庭版 安装docker: 官网下载的docker无法安装成功,提示操作系统版本问题~~~~ 所以直接下载了阿里提供的docker安装包: http://mirrors.aliyun.com/doc ... 社区版是-ce后缀的 阿里镜像加速 首先登录阿里云 查找容器镜像服务 win10找到C:Users用户.dockermachinemachinesdefault底下有个config.json文件,在属性RegistryMirror添加加速器地址,docker虚拟机重启

随机推荐