Apache Doris Colocate Join 原理实践教程

目录
  • What Colocate Join
  • Why Colocate Join
  • How Colocate Join
    • 核心思路
    • 术语定义
    • 1 数据导入时保证本地性
    • 2 Colocate Join Query Plan
    • 3 Colocate Join Query Schedule
    • 4 Colocate Join At Bucket Seq Level
    • 5 Colocate Join Metadata Maintenance
    • 6 How to decide a query can colocate join
    • 7 Colocate Join Support Balance
  • Colocate Join Performance
  • How To Use Colocate Join
    • Colocate Join 目前限制
    • Colocate Join 适用场景
    • Colocate Join FAQ
  • 总结

What Colocate Join

我们都知道 Join 的常见连接类型分为以下几种:

  • INNER JOIN
  • OUTER JOIN
  • CROSS JOIN
  • SEMI JOIN
  • ANTI JOIN

Join 的常见算法实现包含以下几种:

  • Nested Loop Join
  • Sort Merge Join
  • Hash Join

分布式系统实现 Join 数据分布的常见策略有:

  • Shuffle Join
  • Broadcast Join
  • Colocate/Local Join

Colocate/Local Join 就是指多个节点 Join 时没有数据移动和网络传输,每个节点只在本地进行 Join,能够本地进行 Join 的前提是相同 Join Key 的数据分布在相同的节点。

Why Colocate Join

相比 Shuffle Join 和 Broadcast Join,Colocate Join 在查询时没有数据的网络传输,性能会更高。 在 Doris 的具体实现中,Colocate Join 相比 Shuffle Join 可以拥有更高的并发粒度,也可以显著提升 Join 的性能,这一点在后面会解释。

How Colocate Join

核心思路

对于 colocate tables,在任何情况下都要保证数据的本地性。 具体包括:

  • 数据导入时保证数据本地性
  • 查询调度时保证数据本地性
  • 数据 balance 后保证数据本地性

实现中最复杂是第 3 点: 处理 colocate tables 的 balance。

术语定义

Colocate Group

我们将一组具体相同 Colocate 属性的 Table 称为 Group,下图中 t1 和 t2 拥有相同的 Colocate Group。

Colocate Parent Table

我们将决定一个 Group 数据分布的 Table 称为 Parent Table,下图中 t1 是 Colocate Parent Table.

Colocate Child Table

我们将一个 Group 中除 Parent Table 之外的 Table 称为 Child Table,下图中 t2 是 Colocate Child Table.

Bucket Seq

如下图,如果一个表有 N 个 Partition, 则每个 Partition 的第 M 个 bucket 的 Bucket Seq 是 M。

1 数据导入时保证本地性

Doris 的分区方式如下所示,先根据分区字段 Range 分区,再根据指定的 Distributed Key Hash 分桶:

所以我们在数据导入时保证本地性的核心思想就是两次映射,对于 colocate tables,我们保证相同 Distributed Key 的数据映射到相同的 Bucket Seq,再保证相同 Bucket Seq 的 buckets 映射到相同的 BE。

具体来说,第一步:我们计算 Distributed Key 的 hash 值,并对 bucket num 取模,保证相同 Distributed Key 的数据映射到相同的 Bucket Seq。

第二步:将同一个 Colocate Group 下所有相同 Bucket Seq 的 Bucket 映射到相同的 BE,方法如下:

  • Group 中所有 Table 的 Bucket Seq 和 BE 节点的映射关系和 Parent Table 一致
  • Parent Table 中所有 Partition 的 Bucket Seq 和 BE 节点的映射关系和第一个 Partition 一致
  • Parent Table 第一个 Partition 的 Bucket Seq 和 BE 节点的映射关系利用原生的 Round Robin 算法决定

2 Colocate Join Query Plan

对 HashJoinFragment,由于 Join 的多张表有了数据本地性保证,所以可以去掉 Exchange Node,避免网络传输,将 ScanNode 直接设置为 Hash Join Node 的 Child。

3 Colocate Join Query Schedule

查询调度的目标: 一个 Colocate join 中所有 ScanNode 中所有 Bucket Seq 相同的 Buckets 被调度到同一个 BE。

查询调度的策略:第一个 ScanNode 的 Buckets 随机选择 BE,其余的 ScanNode 和第一个 ScanNode 保持一致。

4 Colocate Join At Bucket Seq Level

目前,Doris 的 Hash Join 是 Server 粒度的:

对于 colocate join,由于同一个 Colocate Group 下相同 Bucket Seq 的 Bucket 分布在相同的 BE,所以我们将 Join 的粒度从 Server 粒度降至 Bucket Seq 粒度:

5 Colocate Join Metadata Maintenance

对于 colocate join,我们需要维护以下几个核心元数据:

  • 代码中,colocate group id 就是 colocate parent table id
  • group2BackendsPerBucketSeq 代表每个 colocate group 中每个 bucket seq 映射到哪些 BE
  • 为了支持 balance,以及保证元数据的一致性,这些元数据都需要持久化

6 How to decide a query can colocate join

  • Join 的 tables 是 colocate able
  • The colocate group 是 stable 状态,没有 balancing
  • Join 的 Key 包含分桶的 Distributed Key

7 Colocate Join Support Balance

核心思路

新增一个 daemon 线程专门处理 colocate table 的 balance,并让正常的 balance 线程不处理 colocate table 的 balance。

何时 balance

有 BE 节点新增,删除,down 掉时。

balance 的粒度

正常 balance 的粒度是 bucket,但是对于 colocate table,我们必须保证同一个 colocate group 下所有 bucket 的数据本地性,所以我们 balance 的单位是 colocate group。

balance 对查询的影响

当一个 colocate group 正在 balance 时,colocate join 会退化为原始的 shuffle join 或 broadcast join。

balance 流程:

  • 为需要复制或迁移的 Bucket 选择目标 BE
  • 标记 colocate group 的转态为 balancing
  • 对于需要复制或迁移的 Bucket,发起 Clone Job,Clone Job 会从 Bucket 的现有副本复制一个新副本目标 BE
  • 更新 backendsPerBucketSeq(维护 Bucket Seq 到 BE 映射关系的元数据)
  • 当一个 colocate group 下的所有 Clone Job 都完成时,标记 colocate group 的转态为 stable
  • 删除冗余的副本

当有 BE 节点删除或长时间挂掉时,选择目标 BE 的策略:

和正常 balance 时的选择策略相同,考虑集群的整体负载,尽量选择负载较低的 BE。

当有 BE 节点新增时,选择目标 BE 的策略:

  • 对于当前 colocate group,计算每个新增 BE 需要增加的 bucket seqs 个数:假如我们有 3 个 BE,8 个 bucket,每个 bucket 是 3 副本,则每个 BE 负责 8 个 bucket 副本,我们新增 1 个 BE 后,可以计算出每个 BE 负责的平均 bucket 副本数应该是 3 * 8 / 4 = 6,每个新增 BE 需要增加的 bucket seqs 个数为 6 / 1 = 6.
  • 对于每个 bucket seqs, 随机选择从哪个旧的 BE 迁移副本到新增的 BE。

Colocate Join Performance

测试数据:

Table A,B,C 都有 10 天数据,1 天一个 partitions,每个 partition 有 570 万数据。

测试集群:

4 台低配物理机,每个 BE 24CPU,96MEM

测试 SQL:

SQL1:

select count(*)
FROM A t1
INNER JOIN [shuffle] B t5
   ON ((t1.dt = t5.dt) AND (t1.id = t5.id))
INNER JOIN [shuffle] C t6
   ON ((t1.dt = t6.dt) AND (t1.id = t6.id))
where t1.dt in (xxx days);

SQL2:

select t1.dt, t1.id, t1.name, t1.second_id,t1.second_name,
t5.id, t5.weight_time,t5.list,
t6.ord_id, t6._id
FROM A t1
INNER JOIN B t5
   ON ((t1.dt = t5.dt) AND (t1.id = t5.id))
INNER JOIN C t6
   ON ((t1.dt = t6.dt) AND (t1.id = t6.id))
where t1.dt in (xxx days)
limit 10000;

Test Result for SQL1:

Test Result for SQL2:

可以看到,Colocate Join 相比 Shuffle Join 有明显的性能提升,而且随着集群规模越大,Join 的数据量越多,Colocate Join 的优势会更明显。

How To Use Colocate Join

社区最新代码已经支持 Colocate Join,只不过默认是关闭的,只需要在 FE 配置中设置 disable_colocate_join 为 false,即可开启 Colocate Join 功能。

具体使用时只需要在建表时增加 colocate_with 这个属性即可,colocate_with 的值可以设置成同一组 colocate 表中的任意一个,不过需要保证 colocate_with 属性中的表要先建立。

假如需要对 table t1 和 t2 进行 Colocate Join,可以按以下语句建表:

CREATE TABLE `t1` (
  `id` int(11) COMMENT "",
  `value` varchar(8) COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"colocate_with" = "t1"
);

CREATE TABLE `t2` (
  `id` int(11) COMMENT "",
  `value` varchar(8) COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"colocate_with" = "t1"
);

Colocate Join 目前限制

  • Colocate Table 必须是 OLAP 类型的表
  • colocate_with 属性相同表的 BUCKET 数必须一样
  • colocate_with 属性相同表的 副本数必须一样 (这个限制之后可能会去掉,但对用户应该没有实际影响)
  • colocate_with 属性相同表的 DISTRIBUTED Columns 的数据类型必须一样

Colocate Join 适用场景

Colocate Join 十分适合几张表按照相同字段分桶,并高频根据相同字段 Join 的场景,比如电商的不少应用都按照商家 Id 分桶,并高频按照商家 Id 进行 Join。

Colocate Join FAQ

一句话总结,凡是不能进行 Colocate Join 的场景都会自动退化为原始的 Shuffle Join 或者 Broadcast Join

Q1: 支持多张表进行 Colocate Join 吗?

A: 支持

Q2: 支持 Colocate 表和正常表 Join 吗?

A: 支持

Q3: Colocate 表支持用非分桶的 Key 进行 Join 吗?

A: 支持:不符合 Colocate Join 条件的 Join 会使用 Shuffle Join 或 Broadcast Join

Q4: 如何确定 Join 是按照 Colocate Join 执行的?

A: explain 的结果中 Hash Join 的孩子节点如果直接是 OlapScanNode, 没有 Exchange Node,就说明是 Colocate Join

Q5: 如何修改 colocate_with 属性?

A: ALTER TABLE example_db.my_table set ("colocate_with"="target_table");

Q6: 如何禁用 colocate join?

A: set disable_colocate_join = true; 就可以禁用 Colocate Join,查询时就会使用 Shuffle Join 或 Broadcast Join

总结

大多数支持 Join 的 OLAP 系统都会考虑支持 Colocate Join,比如 MemSQL, SnappyData, 阿里 AnalyticDB 等,阿里 AnalyticDB 更是在数据模型中就引入了 Table Group 的概念。总的来讲,Colocate Join 通过在数据导入,查询 Plan,查询调度,数据 balance 时对数据本地性的保证和考虑,可以显著加速特定场景的下 Join 查询,是一个十分有用的 Feature。

以上就是Apache Doris Colocate Join 原理实践教程的详细内容,更多关于Apache Doris Colocate Join 原理的资料请关注我们其它相关文章!

(0)

相关推荐

  • Apache Doris的Bitmap索引和BloomFilter索引使用及注意事项

    目录 1. Bitmap索引的使用 1.1 Bitmap索引介绍 1.2 Bitmap索引使用的注意事项 1.3 Bitmap索引的使用 2. BloomFilter索引 2.1 BloomFilter索引介绍 2.2 BloomFilter原理 2.3 BloomFilter索引的使用 2.4 Doris BloomFilter使用场景 2.5 Doris BloomFilter使用注意事项 1. Bitmap索引的使用 1.1 Bitmap索引介绍 bitmap index是一种位图索引,是

  • Apache Doris Join 优化原理详解

    目录 背景 & 目标 Doris 数据划分 Partition Bucket Join 方式 总览 Broadcast / Shuffle Join Bucket Shuffle Join Plan Rule Colocate Join Runtime Filter 优化 Join Reorder 优化 Join 调优建议 背景 & 目标 掌握 Apache Doris Join 优化手段及其实现原理 为代码阅读提供理论基础 Doris 数据划分 不同的 Join 方式非常依赖于对 Dor

  • Doris Join 优化原理文档详解

    目录 Doris Join 优化原理 Doris Shuffle 方式 四种 Shuffle 方式对比 Runtime Filter Join 优化 Runtime Filter 类型 Join Reorder Doris Join 调优方法 调优案例实战 案例一 案例二 案例三 Doris Join 调优建议 Doris Join 优化原理 Doris 支持两种物理算子,一类是 Hash Join,另一类是 Nest Loop Join. Hash Join:在右表上根据等值 Join 列建立

  • OnZoom基于Apache Hudi的一体架构实践解析

    1. 背景 OnZoom是Zoom新产品,是基于Zoom Meeting的一个独一无二的在线活动平台和市场.作为Zoom统一通信平台的延伸,OnZoom是一个综合性解决方案,为付费的Zoom用户提供创建.主持和盈利的活动,如健身课.音乐会.站立表演或即兴表演,以及Zoom会议平台上的音乐课程. 在OnZoom data platform中,source数据主要分为MySQL DB数据和Log数据. 其中Kafka数据通过Spark Streaming job实时消费,MySQL数据通过Spark

  • CentOS 7 x64下Apache+MySQL(Mariadb)+PHP56的安装教程详解

    每次搭建新服务器,都要来来回回把这些包再装一下,来来回回搞了不下20遍了吧,原来都是凭经验,配置过程中重复入坑是难免的,故写此文做个备忘.虽然有像xampp这样的集成包,但是在生产环境的Linux发行版上,还是通过包管理工具安装会放心.这次新买的服务器是CentOS 7(7.2)系统,相关配置也都以此版本为主,为方便操作,直接使用root用户配置. CentOS 7的源比较旧,自带的PHP是PHP 5.4,我们想要的是PHP 5.6,这就需要执行以下命令添加额外的remi源. rpm -ivh

  • apache php mysql开发环境安装教程

    apache php mysql开发环境搭建教程,分享给大家,供大家参考 0.资源链接: 360云盘:https://yunpan.cn/c6wpzYwbfmLJY (提取码:0190) 1.首先安装编译器环境 即如下文件: 2.安装apache+php+mysql 即如下文件: 2.1.安装apache 2.1.1安装 只说关键几点 2.1.1.1 2.1.1.2 2.1.1.3 2.1.1.4.在浏览器地址栏输入:localhost ,若出现以下页面,说明您已成功安装apache 2.1.2

  • Apache服务器的安装步骤(图文教程)

    我这次环境配置安装的是Apache-2.4.23版本! 1.首先将下载的压缩包解压到你的专门的WAMP环境文件夹,这样以后查找起来比较方便: 2.启动cmd: 如果你和我一样用的Win10,需要右键以管理员身份运行,如图,切换到Apache所在目录. httpd.exe -h 可以查看可以使用的命令. 3.使用httpd.exe -k install安装Apache.这时候会报服务器路径错误: 4.在httpd.conf文件中修改服务器路径. 5.启动Apache:httpd.exe -k st

  • dubbo新手学习之事件通知实践教程

    前言 今天主要给大家分享一下dubbo的事件通知机制. 先看下dubbo中文官网的示例解释:事件通知. 在调用之前.调用之后.出现异常时,会触发 oninvoke.onreturn.onthrow 三个事件,可以配置当事件发生时,通知哪个类的哪个方法. 实践 溪源目的是快速学习dubbo的相关机制,故定义的相同的接口和方法,采用分包的方式解耦合,便于后期维护. 先看服务接口 dubbo-demo-interface 目录如图 ** UserNotifyService ** /** * @auth

  • JSON Web Token(JWT)原理入门教程详解

    目录 一.跨域认证的问题 二.JWT 的原理 三.JWT 的数据结构 3.1 Header 3.2 Payload 3.3 Signature 3.4 Base64URL 四.JWT 的使用方式 五.JWT 的几个特点 六.参考链接 一.跨域认证的问题 互联网服务离不开用户认证.一般流程是下面这样. 1.用户向服务器发送用户名和密码. 2.服务器验证通过后,在当前对话(session)里面保存相关数据,比如用户角色.登录时间等等. 3.服务器向用户返回一个 session_id,写入用户的 Co

  • MySQL数据库连接查询 join原理

    目录 1.连接查询的分类 2.交叉连接 2.1.原理 2.2.基本语法 2.3.应用 3.内连接 3.1.原理 3.2.基本语法 3.3.应用 4.外连接 4.1.原理 4.2.基本语法 4.3.特点 4.4.应用 5.using关键字 5.1.原理 5.2.基本语法 1.连接查询的分类 交叉连接 内连接 外连接 左外链接(左连接) 右外连接(右连接) 自然连接 2.交叉连接 将两张表的数据与另外一张表彼此交叉 2.1.原理 笛卡尔积: 从第一张表一次取出每一条数据 取出每一条记录之后,与另外一

  • Apache PHP MySql安装配置图文教程

    每一项技术用的人多了,就会有人将其进行优化,做成一个简单.实用.大众化的工具,这对于初识者来说是非常方便的,但是对于长久学习或工作这方面的人技术人员来说是不可取的,所以还是要学习基础的实用方法.因此,我就在ubuntu下配置了Apache服务器来更深入的学习. 这是一个默认安装的方法,如果要指定 步骤一:安装apache2 1.sudo apt-get install apache2,然后输入管理员用户的密码 输入"y",然后回车,完成安装 2.默认的网站根目录的路径 Apache 安

随机推荐