Apache Doris Colocate Join 原理實(shí)踐教程
目錄
- What Colocate Join
- Why Colocate Join
- How Colocate Join
- 核心思路
- 術(shù)語(yǔ)定義
- 1 數(shù)據(jù)導(dǎo)入時(shí)保證本地性
- 2 Colocate Join Query Plan
- 3 Colocate Join Query Schedule
- 4 Colocate Join At Bucket Seq Level
- 5 Colocate Join Metadata Maintenance
- 6 How to decide a query can colocate join
- 7 Colocate Join Support Balance
- Colocate Join Performance
- How To Use Colocate Join
- Colocate Join 目前限制
- Colocate Join 適用場(chǎng)景
- Colocate Join FAQ
- 總結(jié)
What Colocate Join
我們都知道 Join 的常見(jiàn)連接類(lèi)型分為以下幾種:
- INNER JOIN
- OUTER JOIN
- CROSS JOIN
- SEMI JOIN
- ANTI JOIN
Join 的常見(jiàn)算法實(shí)現(xiàn)包含以下幾種:
- Nested Loop Join
- Sort Merge Join
- Hash Join
分布式系統(tǒng)實(shí)現(xiàn) Join 數(shù)據(jù)分布的常見(jiàn)策略有:
- Shuffle Join
- Broadcast Join
- Colocate/Local Join
Colocate/Local Join 就是指多個(gè)節(jié)點(diǎn) Join 時(shí)沒(méi)有數(shù)據(jù)移動(dòng)和網(wǎng)絡(luò)傳輸,每個(gè)節(jié)點(diǎn)只在本地進(jìn)行 Join,能夠本地進(jìn)行 Join 的前提是相同 Join Key 的數(shù)據(jù)分布在相同的節(jié)點(diǎn)。
Why Colocate Join
相比 Shuffle Join 和 Broadcast Join,Colocate Join 在查詢(xún)時(shí)沒(méi)有數(shù)據(jù)的網(wǎng)絡(luò)傳輸,性能會(huì)更高。 在 Doris 的具體實(shí)現(xiàn)中,Colocate Join 相比 Shuffle Join 可以擁有更高的并發(fā)粒度,也可以顯著提升 Join 的性能,這一點(diǎn)在后面會(huì)解釋。
How Colocate Join
核心思路
對(duì)于 colocate tables,在任何情況下都要保證數(shù)據(jù)的本地性。 具體包括:
- 數(shù)據(jù)導(dǎo)入時(shí)保證數(shù)據(jù)本地性
- 查詢(xún)調(diào)度時(shí)保證數(shù)據(jù)本地性
- 數(shù)據(jù) balance 后保證數(shù)據(jù)本地性
實(shí)現(xiàn)中最復(fù)雜是第 3 點(diǎn): 處理 colocate tables 的 balance。
術(shù)語(yǔ)定義
Colocate Group
我們將一組具體相同 Colocate 屬性的 Table 稱(chēng)為 Group,下圖中 t1 和 t2 擁有相同的 Colocate Group。
Colocate Parent Table
我們將決定一個(gè) Group 數(shù)據(jù)分布的 Table 稱(chēng)為 Parent Table,下圖中 t1 是 Colocate Parent Table.
Colocate Child Table
我們將一個(gè) Group 中除 Parent Table 之外的 Table 稱(chēng)為 Child Table,下圖中 t2 是 Colocate Child Table.
Bucket Seq
如下圖,如果一個(gè)表有 N 個(gè) Partition, 則每個(gè) Partition 的第 M 個(gè) bucket 的 Bucket Seq 是 M。
1 數(shù)據(jù)導(dǎo)入時(shí)保證本地性
Doris 的分區(qū)方式如下所示,先根據(jù)分區(qū)字段 Range 分區(qū),再根據(jù)指定的 Distributed Key Hash 分桶:
所以我們?cè)跀?shù)據(jù)導(dǎo)入時(shí)保證本地性的核心思想就是兩次映射,對(duì)于 colocate tables,我們保證相同 Distributed Key 的數(shù)據(jù)映射到相同的 Bucket Seq,再保證相同 Bucket Seq 的 buckets 映射到相同的 BE。
具體來(lái)說(shuō),第一步:我們計(jì)算 Distributed Key 的 hash 值,并對(duì) bucket num 取模,保證相同 Distributed Key 的數(shù)據(jù)映射到相同的 Bucket Seq。
第二步:將同一個(gè) Colocate Group 下所有相同 Bucket Seq 的 Bucket 映射到相同的 BE,方法如下:
- Group 中所有 Table 的 Bucket Seq 和 BE 節(jié)點(diǎn)的映射關(guān)系和 Parent Table 一致
- Parent Table 中所有 Partition 的 Bucket Seq 和 BE 節(jié)點(diǎn)的映射關(guān)系和第一個(gè) Partition 一致
- Parent Table 第一個(gè) Partition 的 Bucket Seq 和 BE 節(jié)點(diǎn)的映射關(guān)系利用原生的 Round Robin 算法決定
2 Colocate Join Query Plan
對(duì) HashJoinFragment,由于 Join 的多張表有了數(shù)據(jù)本地性保證,所以可以去掉 Exchange Node,避免網(wǎng)絡(luò)傳輸,將 ScanNode 直接設(shè)置為 Hash Join Node 的 Child。
3 Colocate Join Query Schedule
查詢(xún)調(diào)度的目標(biāo): 一個(gè) Colocate join 中所有 ScanNode 中所有 Bucket Seq 相同的 Buckets 被調(diào)度到同一個(gè) BE。
查詢(xún)調(diào)度的策略:第一個(gè) ScanNode 的 Buckets 隨機(jī)選擇 BE,其余的 ScanNode 和第一個(gè) ScanNode 保持一致。
4 Colocate Join At Bucket Seq Level
目前,Doris 的 Hash Join 是 Server 粒度的:
對(duì)于 colocate join,由于同一個(gè) Colocate Group 下相同 Bucket Seq 的 Bucket 分布在相同的 BE,所以我們將 Join 的粒度從 Server 粒度降至 Bucket Seq 粒度:
5 Colocate Join Metadata Maintenance
對(duì)于 colocate join,我們需要維護(hù)以下幾個(gè)核心元數(shù)據(jù):
- 代碼中,colocate group id 就是 colocate parent table id
- group2BackendsPerBucketSeq 代表每個(gè) colocate group 中每個(gè) bucket seq 映射到哪些 BE
- 為了支持 balance,以及保證元數(shù)據(jù)的一致性,這些元數(shù)據(jù)都需要持久化
6 How to decide a query can colocate join
- Join 的 tables 是 colocate able
- The colocate group 是 stable 狀態(tài),沒(méi)有 balancing
- Join 的 Key 包含分桶的 Distributed Key
7 Colocate Join Support Balance
核心思路:
新增一個(gè) daemon 線程專(zhuān)門(mén)處理 colocate table 的 balance,并讓正常的 balance 線程不處理 colocate table 的 balance。
何時(shí) balance:
有 BE 節(jié)點(diǎn)新增,刪除,down 掉時(shí)。
balance 的粒度:
正常 balance 的粒度是 bucket,但是對(duì)于 colocate table,我們必須保證同一個(gè) colocate group 下所有 bucket 的數(shù)據(jù)本地性,所以我們 balance 的單位是 colocate group。
balance 對(duì)查詢(xún)的影響:
當(dāng)一個(gè) colocate group 正在 balance 時(shí),colocate join 會(huì)退化為原始的 shuffle join 或 broadcast join。
balance 流程:
- 為需要復(fù)制或遷移的 Bucket 選擇目標(biāo) BE
- 標(biāo)記 colocate group 的轉(zhuǎn)態(tài)為 balancing
- 對(duì)于需要復(fù)制或遷移的 Bucket,發(fā)起 Clone Job,Clone Job 會(huì)從 Bucket 的現(xiàn)有副本復(fù)制一個(gè)新副本目標(biāo) BE
- 更新 backendsPerBucketSeq(維護(hù) Bucket Seq 到 BE 映射關(guān)系的元數(shù)據(jù))
- 當(dāng)一個(gè) colocate group 下的所有 Clone Job 都完成時(shí),標(biāo)記 colocate group 的轉(zhuǎn)態(tài)為 stable
- 刪除冗余的副本
當(dāng)有 BE 節(jié)點(diǎn)刪除或長(zhǎng)時(shí)間掛掉時(shí),選擇目標(biāo) BE 的策略:
和正常 balance 時(shí)的選擇策略相同,考慮集群的整體負(fù)載,盡量選擇負(fù)載較低的 BE。
當(dāng)有 BE 節(jié)點(diǎn)新增時(shí),選擇目標(biāo) BE 的策略:
- 對(duì)于當(dāng)前 colocate group,計(jì)算每個(gè)新增 BE 需要增加的 bucket seqs 個(gè)數(shù):假如我們有 3 個(gè) BE,8 個(gè) bucket,每個(gè) bucket 是 3 副本,則每個(gè) BE 負(fù)責(zé) 8 個(gè) bucket 副本,我們新增 1 個(gè) BE 后,可以計(jì)算出每個(gè) BE 負(fù)責(zé)的平均 bucket 副本數(shù)應(yīng)該是 3 * 8 / 4 = 6,每個(gè)新增 BE 需要增加的 bucket seqs 個(gè)數(shù)為 6 / 1 = 6.
- 對(duì)于每個(gè) bucket seqs, 隨機(jī)選擇從哪個(gè)舊的 BE 遷移副本到新增的 BE。
Colocate Join Performance
測(cè)試數(shù)據(jù):
Table A,B,C 都有 10 天數(shù)據(jù),1 天一個(gè) partitions,每個(gè) partition 有 570 萬(wàn)數(shù)據(jù)。
測(cè)試集群:
4 臺(tái)低配物理機(jī),每個(gè) BE 24CPU,96MEM
測(cè)試 SQL:
SQL1:
select count(*) FROM A t1INNER JOIN [shuffle] B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id))INNER JOIN [shuffle] C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id))where t1.dt in (xxx days);
SQL2:
select t1.dt, t1.id, t1.name, t1.second_id,t1.second_name,t5.id, t5.weight_time,t5.list,t6.ord_id, t6._idFROM A t1INNER JOIN B t5 ON ((t1.dt = t5.dt) AND (t1.id = t5.id))INNER JOIN C t6 ON ((t1.dt = t6.dt) AND (t1.id = t6.id))where t1.dt in (xxx days)limit 10000;
Test Result for SQL1:
Test Result for SQL2:
可以看到,Colocate Join 相比 Shuffle Join 有明顯的性能提升,而且隨著集群規(guī)模越大,Join 的數(shù)據(jù)量越多,Colocate Join 的優(yōu)勢(shì)會(huì)更明顯。
How To Use Colocate Join
社區(qū)最新代碼已經(jīng)支持 Colocate Join,只不過(guò)默認(rèn)是關(guān)閉的,只需要在 FE 配置中設(shè)置 disable_colocate_join 為 false,即可開(kāi)啟 Colocate Join 功能。
具體使用時(shí)只需要在建表時(shí)增加 colocate_with 這個(gè)屬性即可,colocate_with 的值可以設(shè)置成同一組 colocate 表中的任意一個(gè),不過(guò)需要保證 colocate_with 屬性中的表要先建立。
假如需要對(duì) table t1 和 t2 進(jìn)行 Colocate Join,可以按以下語(yǔ)句建表:
CREATE TABLE `t1` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES ("colocate_with" = "t1");CREATE TABLE `t2` ( `id` int(11) COMMENT "", `value` varchar(8) COMMENT "") ENGINE=OLAPDUPLICATE KEY(`id`)DISTRIBUTED BY HASH(`id`) BUCKETS 10PROPERTIES ("colocate_with" = "t1");
Colocate Join 目前限制
- Colocate Table 必須是 OLAP 類(lèi)型的表
- colocate_with 屬性相同表的 BUCKET 數(shù)必須一樣
- colocate_with 屬性相同表的 副本數(shù)必須一樣 (這個(gè)限制之后可能會(huì)去掉,但對(duì)用戶(hù)應(yīng)該沒(méi)有實(shí)際影響)
- colocate_with 屬性相同表的 DISTRIBUTED Columns 的數(shù)據(jù)類(lèi)型必須一樣
Colocate Join 適用場(chǎng)景
Colocate Join 十分適合幾張表按照相同字段分桶,并高頻根據(jù)相同字段 Join 的場(chǎng)景,比如電商的不少應(yīng)用都按照商家 Id 分桶,并高頻按照商家 Id 進(jìn)行 Join。
Colocate Join FAQ
一句話總結(jié),凡是不能進(jìn)行 Colocate Join 的場(chǎng)景都會(huì)自動(dòng)退化為原始的 Shuffle Join 或者 Broadcast Join。
Q1: 支持多張表進(jìn)行 Colocate Join 嗎?
A: 支持
Q2: 支持 Colocate 表和正常表 Join 嗎?
A: 支持
Q3: Colocate 表支持用非分桶的 Key 進(jìn)行 Join 嗎?
A: 支持:不符合 Colocate Join 條件的 Join 會(huì)使用 Shuffle Join 或 Broadcast Join
Q4: 如何確定 Join 是按照 Colocate Join 執(zhí)行的?
A: explain 的結(jié)果中 Hash Join 的孩子節(jié)點(diǎn)如果直接是 OlapScanNode, 沒(méi)有 Exchange Node,就說(shuō)明是 Colocate Join
Q5: 如何修改 colocate_with 屬性?
A: ALTER TABLE example_db.my_table set ("colocate_with"="target_table");
Q6: 如何禁用 colocate join?
A: set disable_colocate_join = true; 就可以禁用 Colocate Join,查詢(xún)時(shí)就會(huì)使用 Shuffle Join 或 Broadcast Join
總結(jié)
大多數(shù)支持 Join 的 OLAP 系統(tǒng)都會(huì)考慮支持 Colocate Join,比如 MemSQL, SnappyData, 阿里 AnalyticDB 等,阿里 AnalyticDB 更是在數(shù)據(jù)模型中就引入了 Table Group 的概念。總的來(lái)講,Colocate Join 通過(guò)在數(shù)據(jù)導(dǎo)入,查詢(xún) Plan,查詢(xún)調(diào)度,數(shù)據(jù) balance 時(shí)對(duì)數(shù)據(jù)本地性的保證和考慮,可以顯著加速特定場(chǎng)景的下 Join 查詢(xún),是一個(gè)十分有用的 Feature。
以上就是Apache Doris Colocate Join 原理實(shí)踐教程的詳細(xì)內(nèi)容,更多關(guān)于Apache Doris Colocate Join 原理的資料請(qǐng)關(guān)注其它相關(guān)文章!
相關(guān)文章:
