Z-Order加速Hudi大规模数据集方案分析

目录
  • 1. 背景
  • 2. Z-Order介绍
  • 3. 具体实现
    • 3.1 z-value的生成和排序
    • 3.1.1 基于映射策略的z值生成方法
    • 3.1.2 基于RangeBounds的z-value生成策略
    • 3.2 与Hudi结合
    • 3.2.1 表数据的Z排序重组
    • 3.2.2 收集保存统计信息
    • 3.2.3 应用到Spark查询
  • 4. 测试结果

1. 背景

多维分析是大数据分析的一个典型场景,这种分析一般带有过滤条件。对于此类查询,尤其是在高基字段的过滤查询,理论上只我们对原始数据做合理的布局,结合相关过滤条件,查询引擎可以过滤掉大量不相关数据,只需读取很少部分需要的数据。例如我们在入库之前对相关字段做排序,这样生成的每个文件相关字段的min-max值是不存在交叉的,查询引擎下推过滤条件给数据源结合每个文件的min-max统计信息,即可过滤掉大量不相干数据。 上述技术即我们通常所说的data clustering 和 data skip。直接排序可以在单个字段上产生很好的效果,如果多字段直接排序那么效果会大大折扣的,Z-Order可以较好的解决多字段排序问题。

本文基于Apache Spark 以及 Apache Hudi 结合Z-order技术介绍如何更好的对原始数据做布局, 减少不必要的I/O,进而提升查询速度。具体提案可参考Hudi RFC-28:Support Z-order curve

2. Z-Order介绍

Z-Order是一种可以将多维数据压缩到一维的技术,在时空索引以及图像方面使用较广。Z曲线可以以一条无限长的一维曲线填充任意维度的空间,对于数据库的一条数据来说,我们可以将其多个要排序的字段看作是数据的多个维度,z曲线可以通过一定的规则将多维数据映射到一维数据上,构建z-value 进而可以基于该一维数据进行排序。z-value的映射规则保证了排序后那些在多维维度临近的数据在一维曲线上仍然可以彼此临近。

wiki定义:假设存在一个二维坐标对(x, y),这些坐标对于于一个二维平面上,使用Z排序,我们可以将这些坐标对压缩到一维。

当前在delta lake的商业版本实现了基于Z-Order的data Clustering技术,开源方面Spark/Hive/Presto 均未有对Z-Order的支持。

3. 具体实现

我们接下来分2部分介绍如何在Hudi中使用Z-Order:

  • z-value的生成和排序
  • 与Hudi结合

3.1 z-value的生成和排序

这部分是Z-Order策略的核心,这部分逻辑是公用的,同样适用其他框架。

Z-Order的关键在于z-value的映射规则。wiki上给出了基于位交叉的技术,每个维度值的比特位交叉出现在最终的z-value里。例如假设我们想计算二维坐标(x=97, y=214)的z-value,我们可以按如下步骤进行

第一步:将每一维数据用bits表示

x value:01100001
y value:11010110

第二步:从y的最左侧bit开始,我们将x和y按位做交叉,即可得到z 值,如下所示

z-value: 1011011000101001

对于多维数据,我们可以采用同样的方法对每个维度的bit位做按位交叉形成 z-value,一旦我们生成z-values 我们即可用该值做排序,基于z值的排序自然形成z阶曲线对多个参与生成z值的维度都有良好的聚合效果。

上述生成z-value的方法看起来非常好,但在实际生产环境上我们要使用位交叉技术产生z-value 还需解决如下问题:

  • 上述介绍是基于多个unsigned int类型的递增数据,通过位交叉生成z-value的。实际上的数据类型多种多样,如何处理其他类型数据
  • 不同类型的维度值转成bit位表示,长度不一致如何处理
  • 如何选择数据类型合理的保存z-value,以及相应的z值排序策略

针对上述问题,我们采用两种策略生成z值。

3.1.1 基于映射策略的z值生成方法

第一个问题:对不同的数据类型采用不同的转换策略

  • 无符号类型整数: 直接转换成bits位表示
  • Int类型的数据: 直接转成二进制表示会有问题,因为java里面负数的二进制表示最高位(符号位)为1,而正整数的二进制表示最高位为0(如下图所示), 直接转换后会出现负数大于正数的现象。
十进制 二进制
0 0000 0000
1 0000 0001
2 0000 0010
126 0111 1110
127 0111 1111
-128 1000 0000
-127 1000 0001
-126 1000 0010
-2 1111 1110
-1 1111 1111

对于这个问题,我们可以直接将二进制的最高位反转,就可以保证转换后的词典顺序和原值相同。如下图

十进制 二进制 最高位反转 最高位反转后十进制
0 0000 0000 1000 0000 128
1 0000 0001 1000 0001 129
2 0000 0010 1000 0010 130
126 0111 1110 1111 1110 254
127 0111 1111 1111 1111 255
-128 1000 0000 0000 0000 0
-127 1000 0001 0000 0001 1
-126 1000 0010 0000 0010 2
-2 1111 1110 0111 1110 126
-1 1111 1111 0111 1111 127
  • Long类型的数据:转换方式和Int类型一样,转成二进制形式并将最高位反转
  • Double、Float类型的数据: 转成Long类型,之后转成二进制形式并将最高位反转
  • Decimal/Date/TimeStamp类型数据:转换成long类型,然后直接用二进制表示。
  • UTF-8 String类型的数据:String类型的数据 直接用二进制表示即可保持原来的自然序, 但是字符串是不定长的无法直接用来做位交叉。 我们采用如下策略处理string类型大于8bytes的字符串截断成8bytes, 不足8bytes的string 填充成8bytes。
  • null值处理:
    • 数值类型的null直接变成该数值类型的最大值,之后按上述步骤转换;
    • String类型null 直接变成空字符串之后再做转换;

第二个问题:生成的二进制值统一按64位对齐即可

第三个问题:可以用Array[Byte]来保存z值(参考Amazon的DynamoDB 可以限制该数组的长度位1024)。对于 Array[Byte]类型的数据排序,hbase的rowkey 排序器可以直接拿来解决这个问题

基于映射策略的z值生成方法,方便快捷很容易理解,但是有一定缺陷:

参与生成z-value的字段理论上需要是从0开始的正整数,这样才能生成很好的z曲线。 真实的数据集中 是不可能有这么完美的情况出现的, zorder的效果将会打折扣。比如x 字段取值(0, 1, 2), y字段取值(100, 200, 300), 用x, y生成的z-value只是完整z曲线的一部分,对其做z值排序的效果和直接用x排序的效果是一样的; 再比如x的基数值远远低于y的基数值时采用上述策略排序效果基本和按y值排序是一样的,真实效果还不如先按x排序再按y排序。

String类型的处理, 上述策略对string类型是取前8个字节的参与z值计算, 这将导致精度丢失。 当出现字符串都是相同字符串前缀的情况就无法处理了,比如"https://www.baidu.com" , "https://www.google.com" 这两个字符串前8个字节完全一样, 对这样的数据截取前8个字节参与z值计算没有任何意义。

上述策略出现缺陷的主要原因是数据的分布并不总是那么好导致。有一种简单的方案可以解决上述问题: 对参与z值计算的所有维度值做全局Rank,用Rank值代替其原始值参与到z值计算中,由于Rank值一定是从0开始的正整数,完全符合z值构建条件,较好的解决上述问题。 在实验中我们发现这种用Rank值的方法确实很有效,但是z值生成效率极低,计算引擎做全局Rank的代价是非常高的,基于Rank的方法效率瓶颈在于要做全局Rank计算,那么我们可不可以对原始数据做采样减少数据量,用采样后的数据计算z值呢,答案是肯定的。

/** Generates z-value*/
val newRDD = df.rdd.map { row =>
  val values = zFields.map { case (index, field) =>
    field.dataType match {
      case LongType =>
        ZOrderingUtil.longTo8Byte(row.getLong(index))
      case DoubleType =>
        ZOrderingUtil.doubleTo8Byte(row.getDouble(index))
      case IntegerType =>
        ZOrderingUtil.intTo8Byte(row.getInt(index))
      case FloatType =>
        ZOrderingUtil.doubleTo8Byte(row.getFloat(index).toDouble)
      case StringType =>
        ZOrderingUtil.utf8To8Byte(row.getString(index))
      case DateType =>
        ZOrderingUtil.longTo8Byte(row.getDate(index).getTime)
      case TimestampType =>
        ZOrderingUtil.longTo8Byte(row.getTimestamp(index).getTime)
      case ByteType =>
        ZOrderingUtil.byteTo8Byte(row.getByte(index))
      case ShortType =>
        ZOrderingUtil.intTo8Byte(row.getShort(index).toInt)
      case d: DecimalType =>
        ZOrderingUtil.longTo8Byte(row.getDecimal(index).longValue())
      case _ =>
        null
    }
  }.filter(v => v != null).toArray
  val zValues = ZOrderingUtil.interleaveMulti8Byte(values)
  Row.fromSeq(row.toSeq ++ Seq(zValues))
}.sortBy(x => ZorderingBinarySort(x.getAs[Array[Byte]](fieldNum)))

3.1.2 基于RangeBounds的z-value生成策略

在介绍基于RangeBounds的z-value生成策略之前先看看Spark的排序过程,Spark排序大致分为2步

  • 对输入数据的key做sampling来估计key的分布,按指定的分区数切分成range并排序。计算出来的rangeBounds是一个长度为numPartition - 1 的数组,该数组里面每个元素表示一个分区内key值的上界/下界。
  • shuffle write 过程中,每个输入的key应该分到哪个分区内,由第一步计算出来的rangeBounds来确定。每个分区内的数据虽然没有排序,但是注意rangeBounds是有序的因此分区之间宏观上看是有序的,故只需对每个分区内数据做好排序即可保证数据全局有序。

参考Spark的排序过程,我们可以这样做

  • 对每个参与Z-Order的字段筛选规定个数(类比分区数)的Range并对进行排序,并计算出每个字段的RangeBounds;
  • 实际映射过程中每个字段映射为该数据所在rangeBounds的中的下标,然后参与z-value的计算。可以看出由于区间下标是从0开始递增的正整数,完全满足z值生成条件;并且String类型的字段映射问题也被一并解决了。基于RangeBounds的z值生成方法,很好的解决了第一种方法所面临的缺陷。由于多了一步采样生成RangeBounds的过程,其效率显然不如第一种方案,我们实现了上述两种z值生成方法以供选择。
/** Generates z-value */
val indexRdd = internalRdd.mapPartitionsInternal { iter =>
  val bounds = boundBroadCast.value
  val origin_Projections = sortingExpressions.map { se =>
    UnsafeProjection.create(Seq(se), outputAttributes)
  }
  iter.map { unsafeRow =>
    val interleaveValues = origin_Projections.zip(origin_lazyGeneratedOrderings).zipWithIndex.map { case ((rowProject, lazyOrdering), index) =>
      val row = rowProject(unsafeRow)
      val decisionBound = new DecisionBound(sampleRdd, lazyOrdering)
      if (row.isNullAt(0)) {
        bounds(index).length + 1
      } else {
        decisionBound.getBound(row, bounds(index).asInstanceOf[Array[InternalRow]])
      }
    }.toArray.map(ZOrderingUtil.toBytes(_))
    val zValues = ZOrderingUtil.interleaveMulti4Byte(interleaveValues)
    val mutablePair = new MutablePair[InternalRow, Array[Byte]]()
    mutablePair.update(unsafeRow, zValues)
  }
}.sortBy(x => ZorderingBinarySort(x._2), numPartitions = fileNum).map(_._1)

3.2 与Hudi结合

与Hudi的结合大致分为两部分

3.2.1 表数据的Z排序重组

这块相对比较简单,借助Hudi内部的Clustering机制结合上述z值的生成排序策略我们可以直接完成Hudi表数据的数据重组,这里不再详细介绍。

3.2.2 收集保存统计信息

这块其实RFC27已经在做了,感觉有点重复工作我们简单介绍下我们的实现,数据完成z重组后,我们需要对重组后的每个文件都收集参与z值计算的各个字段的min/max/nullCount 的统计信息。对于统计信息收集,可以通过读取Parquet文件或者通过SparkSQL收集

  • 读取Parquet文件收集统计信息
/** collect statistic info*/
val sc = df.sparkSession.sparkContext
val serializableConfiguration = new SerializableConfiguration(conf)
val numParallelism = inputFiles.size/3
val previousJobDescription = sc.getLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION)
try {
  val description = s"Listing parquet column statistics"
  sc.setJobDescription(description)
  sc.parallelize(inputFiles, numParallelism).mapPartitions { paths =>
    val hadoopConf = serializableConfiguration.value
    paths.map(new Path(_)).flatMap { filePath =>
      val blocks = ParquetFileReader.readFooter(hadoopConf, filePath).getBlocks().asScala
      blocks.flatMap(b => b.getColumns().asScala.
        map(col => (col.getPath().toDotString(),
          FileStats(col.getStatistics().minAsString(), col.getStatistics().maxAsString(), col.getStatistics.getNumNulls.toInt))))
        .groupBy(x => x._1).mapValues(v => v.map(vv => vv._2)).
        mapValues(value => FileStats(value.map(_.minVal).min, value.map(_.maxVal).max, value.map(_.num_nulls).max)).toSeq.
        map(x => ColumnFileStats(filePath.getName(), x._1, x._2.minVal, x._2.maxVal, x._2.num_nulls))
    }.filter(p => cols.contains(p.colName))
  }.collect()
} finally {
  sc.setJobDescription(previousJobDescription)
}
  • 通过SparkSQL方式收集统计信息
/** collect statistic info*/
val inputFiles = df.inputFiles
val conf = df.sparkSession.sparkContext.hadoopConfiguration
val values = cols.flatMap(c => Seq( min(col(c)).as(c + "_minValue"), max(col(c)).as(c + "_maxValue"), count(c).as(c + "_noNullCount")))
val valueCounts = count("*").as("totalNum")
val projectValues = Seq(col("file")) ++ cols.flatMap(c =>
  Seq(col(c + "_minValue"), col(c + "_maxValue"), expr(s"totalNum - ${c + "_noNullCount"}").as(c + "_num_nulls")))
val result = df.select(input_file_name() as "file", col("*"))
  .groupBy($"file")
  .agg(valueCounts,  values: _*).select(projectValues:_*)
result

之后将这些信息保存在Hudi表里面的hoodie目录下的index目录下,然后供Spark查询使用。

3.2.3 应用到Spark查询

为将统计信息应用Spark查询,需修改HudiIndex的文件过滤逻辑,将DataFilter转成对Index表的过滤,选出候选要读取的文件,返回给查询引擎,具体步骤如下。

  • 将索引表加载到 IndexDataFrame
  • 使用原始查询过滤器为 IndexDataFrame 构建数据过滤器
  • 查询 IndexDataFrame 选择候选文件
  • 使用这些候选文件来重建 HudiMemoryIndex

通过min/max值和null计数信息为 IndexDataFrame 构建数据过滤器,由于z排序后参与z值计算的各个字段在每个文件里面的min/max值很大概率不交叉,因此对Index表的过滤可以过滤掉大量的文件。

/** convert filter */
def createZindexFilter(condition: Expression): Expression = {
  val minValue = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_minValue").expr
  val maxValue = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_maxValue").expr
  val num_nulls = (colName: Seq[String]) =>
    col(UnresolvedAttribute(colName) + "_num_nulls").expr
  condition match {
    case EqualTo(attribute: AttributeReference, value: Literal) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
      And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
    case EqualTo(value: Literal, attribute: AttributeReference) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(attribute)
      And(LessThanOrEqual(minValue(colName), value), GreaterThanOrEqual(maxValue(colName), value))
    case equalNullSafe @ EqualNullSafe(_: AttributeReference, _ @ Literal(null, _)) =>
      val colName = HudiMergeIntoUtils.getTargetColNameParts(equalNullSafe.left)
      EqualTo(num_nulls(colName), equalNullSafe.right)
.......

4. 测试结果

我们采用databrick的测试样例https://help.aliyun.com/document_detail/168137.html?spm=a2c4g.11186623.6.563.53c258ccmqvYfy 进行了测试

测试数据量和资源使用大小和databrick保持一致。唯一区别是我们只生成了10000个文件,原文是100w个文件。 测试结果表明zorder加速比还说很可观的,另外Z-Order的效果随着文件数的增加会越来越好,我们后续也会在100w文件级别测试。

表名称 时间(s)
conn_random_parquet 89.3
conn_zorder 19.4
conn_zorder_only_ip 18.2

以上就是Z-Order加速Hudi大规模数据集方案分析的详细内容,更多关于Z-Order加速Hudi大规模数据集的资料请关注我们其它相关文章!

(0)

相关推荐

  • Vertica集成Apache Hudi重磅使用指南

    目录 1. 摘要 2. Apache Hudi介绍 3. 环境准备 4. Vertica和Apache Hudi集成 4.1 在 Apache Spark 上配置 Apache Hudi 和 AWS S3 4.2 配置 Vertica 和 Apache HUDI 集成 4.3 如何让 Vertica 查看更改的数据 4.3.1 写入数据 4.3.2 更新数据 4.3.3 创建和查看数据的历史快照 1. 摘要 本文演示了使用外部表集成 Vertica 和 Apache Hudi. 在演示中我们使用

  • Apache Hudi结合Flink的亿级数据入湖实践解析

    目录 1. 实时数据落地需求演进 2. 基于Spark+Hudi的实时数据落地应用实践 3. 基于Flink自定义实时数据落地实践 4. 基于Flink + Hudi的落地数据实践 5. 后续应用规划及展望 5.1 取代离线报表,提高报表实时性及稳定性 5.2 完善监控体系,提升落数据任务稳定性 5.3 落数据中间过程可视化探索 本次分享分为5个部分介绍Apache Hudi的应用与实践 1. 实时数据落地需求演进 实时平台上线后,主要需求是开发实时报表,即抽取各类数据源做实时etl后,吐出实时

  • Apache教程Hudi与Hive集成手册

    目录 1. Hudi表对应的Hive外部表介绍 2. Hive对Hudi的集成 3. 创建Hudi表对应的hive外部表 4. 查询Hudi表对应的Hive外部表 4.1 操作前提 4.2 COW类型Hudi表的查询 4.2.1 COW表实时视图查询 4.2.2 COW表增量查询 4.3 MOR类型Hudi表的查询 4.3.1 MOR表读优化视图 4.3.2 MOR表实时视图 4.3.3 MOR表增量查询 5. Hive侧源码修改 1. Hudi表对应的Hive外部表介绍 Hudi源表对应一份H

  • 深入解析Apache Hudi内核文件标记机制

    目录 1. 摘要 2. 为何引入Markers机制 3. 现有的直接标记机制及其局限性 4. 基于时间线服务器的标记机制提高写入性能 5. 标记相关的写入选项 6. 性能 7. 总结 1. 摘要 Hudi 支持在写入时自动清理未成功提交的数据.Apache Hudi 在写入时引入标记机制来有效跟踪写入存储的数据文件. 在本博客中,我们将深入探讨现有直接标记文件机制的设计,并解释了其在云存储(如 AWS S3.Aliyun OSS)上针对非常大批量写入的性能问题. 并且演示如何通过引入基于时间轴服

  • Apache Hudi性能提升三倍的查询优化

    目录 1. 背景 2. 设置 3. 测试 4. 结果 5. 总结 从 Hudi 0.10.0版本开始,我们很高兴推出在数据库领域中称为 Z-Order 和 Hilbert 空间填充曲线的高级数据布局优化技术的支持. 1. 背景 Amazon EMR 团队最近发表了一篇很不错的文章展示了对数据进行聚簇是如何提高查询性能的,为了更好地了解发生了什么以及它与空间填充曲线的关系,让我们仔细研究该文章的设置. 文章中比较了 2 个 Apache Hudi 表(均来自 Amazon Reviews 数据集)

  • Apache Hudi基于华米科技应用湖仓一体化改造

    目录 1. 应用背景及痛点介绍 2. 技术方案选型 3. 问题与解决方案 3.1.增量数据字段对齐问题 3.2 全球存储兼容性问题 3.3 云主机时区统一问题 3.4 升级新版本问题 3.5 多分区Upsert性能问题 3.6 数据特性适应问题 4. 上线收益 4.1 成本方面 4.2 效率方面 4.3 稳定性层面 4.4 查询性能层面 5. 总结与展望 1. 应用背景及痛点介绍 华米科技是一家基于云的健康服务提供商,拥有全球领先的智能可穿戴技术.在华米科技,数据建设主要围绕两类数据:设备数据和

  • Apache Hudi异步Clustering部署操作的掌握

    目录 1. 摘要 2. 介绍 3. Clustering策略 3.1 计划策略 3.2 执行策略 3.3 更新策略 4. 异步Clustering 4.1 HoodieClusteringJob 4.2 HoodieDeltaStreamer 4.3 Spark Structured Streaming 5. 总结和未来工作 1. 摘要 在之前的一篇博客中,我们介绍了Clustering(聚簇)的表服务来重新组织数据来提供更好的查询性能,而不用降低摄取速度,并且我们已经知道如何部署同步Clust

  • Apache Hudi灵活的Payload机制硬核解析

    1.摘要 Apache Hudi 的Payload是一种可扩展的数据处理机制,通过不同的Payload我们可以实现复杂场景的定制化数据写入方式,大大增加了数据处理的灵活性.Hudi Payload在写入和读取Hudi表时对数据进行去重.过滤.合并等操作的工具类,通过使用参数 "hoodie.datasource.write.payload.class"指定我们需要使用的Payload class.本文我们会深入探讨Hudi Payload的机制和不同Payload的区别及使用场景. 2

  • OnZoom基于Apache Hudi的一体架构实践解析

    1. 背景 OnZoom是Zoom新产品,是基于Zoom Meeting的一个独一无二的在线活动平台和市场.作为Zoom统一通信平台的延伸,OnZoom是一个综合性解决方案,为付费的Zoom用户提供创建.主持和盈利的活动,如健身课.音乐会.站立表演或即兴表演,以及Zoom会议平台上的音乐课程. 在OnZoom data platform中,source数据主要分为MySQL DB数据和Log数据. 其中Kafka数据通过Spark Streaming job实时消费,MySQL数据通过Spark

  • Apache Hudi数据布局黑科技降低一半查询时间

    目录 1. 背景 2. Clustering架构 2.1 调度Clustering 2.2 运行Clustering 2.3 Clustering配置 3. 表查询性能 3.1 进行Clustering之前 3.2 进行Clustering之后 4. 总结 1. 背景 Apache Hudi将流处理带到大数据,相比传统批处理效率高一个数量级,提供了更新鲜的数据.在数据湖/仓库中,需要在摄取速度和查询性能之间进行权衡,数据摄取通常更喜欢小文件以改善并行性并使数据尽快可用于查询,但很多小文件会导致查

随机推荐