1.HBase得二級(jí)索引的設(shè)計(jì)(或者Phoenix 二級(jí)索引-說說原理)
2.怎么提高Flink的執(zhí)行性能(代碼方面)?
3.Spark 數(shù)據(jù)傾斜調(diào)優(yōu)10策
答案區(qū)
1. HBase二級(jí)索引的設(shè)計(jì)方案一般有如下幾種:
1)協(xié)處理器coprocessor方案。 原理就是自定義協(xié)處理器,實(shí)現(xiàn)`雙寫`,就是寫主表的時(shí)候,同時(shí)寫索引表[這里這個(gè)索引表是根據(jù)業(yè)務(wù)對(duì)查詢的需求建立的]。 比如我們要查詢的主表是A, 里面有RowKey,還有一列ColumnA. 如果想對(duì)ColumnA這一列建立索引,就自定義一個(gè)協(xié)處理器(觀察者模式),當(dāng)我們寫入A表中一條數(shù)據(jù),比如 行鍵rowkey(123),cloumnA列值:abc,這時(shí)協(xié)處理在索引表(自己建立,比如A_INDEX)中插入一條記錄 行鍵為剛才列A的值abc,列值為主表的rowkey(123). 查詢的時(shí)候,先查索引表得到rowkey,然后根據(jù)rowkey在主表中查。
2)ES 方案,將想要構(gòu)建的二級(jí)索引的字段值存儲(chǔ)到ES中,查詢時(shí)先去ES根據(jù)條件查到rowkey,然后根據(jù)rowkey再去hbase查數(shù)據(jù)。
3)Phoenix 方案。 Phoenix構(gòu)建構(gòu)建索引的方式,本質(zhì)也在HBase中建立索引表。只不建表的過程,索引維護(hù)的過程,Phoenix自己內(nèi)部實(shí)現(xiàn),暴露給用戶的只是SQL接口。
其實(shí)在HBase構(gòu)建二級(jí)索引,萬變不離其宗,最終的方向都是構(gòu)建索引字段與行鍵的映射關(guān)系,先更加索引表查行鍵,在根據(jù)行鍵,查最終數(shù)據(jù)。
2.- 通用的優(yōu)化方式
1)盡早fliter掉一些不需要的數(shù)據(jù)以及避免一些不必要的序列化。
2)避免使用深層嵌套數(shù)據(jù)類型。
3) 對(duì)于數(shù)據(jù)傾斜使用調(diào)整并行度或者雙層聚合的方式。
4)一些基數(shù)較少的并且本身較長(zhǎng)維度可以采用數(shù)據(jù)字典的方式減少網(wǎng)絡(luò)傳輸及內(nèi)存占用、gc開銷。
- 數(shù)據(jù)類型和序列化
Flink支持java、scala基本數(shù)據(jù)類型,以及java Tuples、scala Case Class、Flink Value,對(duì)于這些數(shù)據(jù)類型,flink會(huì)采用自身的序列化反序列化器去做序列化操作,對(duì)于其他數(shù)據(jù)類型,flink會(huì)采用kyro方式序列化,kyro序列化方式效率會(huì)比flink自帶的方式低很多。因此在數(shù)據(jù)序列化方面我們可以做如下工作
1) 嘗試使用transient修飾不需要序列化的變量,或者修飾你可以在下游通過其他方式獲取到變量,這個(gè)可以減少序列化流程和網(wǎng)絡(luò)傳輸(但可能帶來更多的內(nèi)存占用用和gc消耗)
2) 對(duì)于一些特殊的數(shù)據(jù)你可以嘗試重寫writeObject() 和 readObject() 來自己控制一些序列化方式,如果更高效的話
3)如果使用了lambda或者泛型的話,顯式的指定類型信息讓flink類型提取系統(tǒng)識(shí)別到以提升性能。
- 多組相同keyby可使用DataStreamUtils
在多組keyby的場(chǎng)景可以采用DataStreamUtils.reinterpretAsKeyedStream的方式避免多次shuffle操作
- 盡量減少狀態(tài)的大小
1)設(shè)置合適的state TTL, 清洗過期狀態(tài),避免狀態(tài)無限增大。
2)減少狀態(tài)字段數(shù), 比如使用aggreteFunction 做窗口聚合時(shí),可以只將要聚合的信息放入狀態(tài),其他keyBy字段以及窗口信息,可以通過processWindowFunction的方式獲取,這樣就是 aggregateFunction + ProcessWindowFunction,agg函數(shù)獲取聚合信息,輸出的結(jié)果到processwindowFunction中取獲取窗口信息。
3)checkpoint頻率不宜過高,超時(shí)時(shí)間不要太長(zhǎng),可以異步化的地方盡量異步化
3.Spark 數(shù)據(jù)傾斜調(diào)優(yōu)10策
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。
- 數(shù)據(jù)傾斜概述
1.1 什么是數(shù)據(jù)傾斜
對(duì)Hadoop、Spark、Flink這樣的大數(shù)據(jù)系統(tǒng)來講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個(gè)Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個(gè)數(shù)據(jù)集處理的瓶頸。
對(duì)于分布式系統(tǒng)而言,理想情況下,隨著系統(tǒng)規(guī)模(節(jié)點(diǎn)數(shù)量)的增加,應(yīng)用整體耗時(shí)線性下降。如果一臺(tái)機(jī)器處理一批大量數(shù)據(jù)需要120分鐘,當(dāng)機(jī)器數(shù)量增加到三時(shí),理想的耗時(shí)為120 / 3 = 40分鐘,如下圖所示:
但是,上述情況只是理想情況,實(shí)際上將單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,會(huì)有overhead,使得總的任務(wù)量較之單機(jī)時(shí)有所增加,所以每臺(tái)機(jī)器的執(zhí)行時(shí)間加起來比單臺(tái)機(jī)器時(shí)更大。這里暫不考慮這些overhead,假設(shè)單機(jī)任務(wù)轉(zhuǎn)換成分布式任務(wù)后,總?cè)蝿?wù)量不變。
但即使如此,想做到分布式情況下每臺(tái)機(jī)器執(zhí)行時(shí)間是單機(jī)時(shí)的`1 / N`,就必須保證每臺(tái)機(jī)器的任務(wù)量相等。不幸的是,很多時(shí)候,任務(wù)的分配是不均勻的,甚至不均勻到大部分任務(wù)被分配到個(gè)別機(jī)器上,其它大部分機(jī)器所分配的任務(wù)量只占總得的小部分。比如一臺(tái)機(jī)器負(fù)責(zé)處理80%的任務(wù),另外兩臺(tái)機(jī)器各處理10%的任務(wù),如下圖所示:
1.2 數(shù)據(jù)傾斜發(fā)生時(shí)的現(xiàn)象
- 絕大多數(shù) task 執(zhí)行得都非常快,但個(gè)別 task 執(zhí)行極慢。比如,總共有 1000 個(gè) task,997 個(gè) task 都在 1 分鐘之內(nèi)執(zhí)行完了,但是剩余兩三個(gè) task 卻要一兩個(gè)小時(shí)。這種情況很常見。
- 原本能夠正常執(zhí)行的 Spark 作業(yè),某天突然報(bào)出 OOM(內(nèi)存溢出)異常,觀察異常棧,是我們寫 的業(yè)務(wù)代碼造成的。這種情況比較少見。
markdown
總結(jié):
1. 大部分任務(wù)都很快執(zhí)行完,用時(shí)也相差無幾,但個(gè)別Task執(zhí)行耗時(shí)很長(zhǎng),整個(gè)應(yīng)用程序一直處于99%左右 的狀態(tài)。
2. 一直運(yùn)行正常的Spark Application昨晚突然OOM了。
1.3 數(shù)據(jù)傾斜發(fā)生的原理
數(shù)據(jù)傾斜的原理很簡(jiǎn)單: 在進(jìn)行 shuffle 的時(shí)候,必須將各個(gè)節(jié)點(diǎn)上相同的 key 的數(shù)據(jù)拉取到某個(gè)節(jié)點(diǎn) 上的一個(gè) task 來進(jìn)行處理,比如按照 key 進(jìn)行聚合或 join 等操作。此時(shí)如果某個(gè) key 對(duì)應(yīng)的數(shù)據(jù)量特 別大的話,就會(huì)發(fā)生數(shù)據(jù)傾斜。比如大部分 key 對(duì)應(yīng) 10 條數(shù)據(jù),但是個(gè)別 key 卻對(duì)應(yīng)了 100 萬條數(shù) 據(jù),那么大部分 task 可能就只會(huì)分配到 10 條數(shù)據(jù),然后 1 秒鐘就運(yùn)行完了;但是個(gè)別 task 可能分配 到了 100 萬數(shù)據(jù),要運(yùn)行一兩個(gè)小時(shí)。因此,整個(gè) Spark 作業(yè)的運(yùn)行進(jìn)度是由運(yùn)行時(shí)間最長(zhǎng)的那個(gè) task 決定的。
因此出現(xiàn)數(shù)據(jù)傾斜的時(shí)候,Spark 作業(yè)看起來會(huì)運(yùn)行得非常緩慢,甚至可能因?yàn)槟硞€(gè) task 處理的數(shù)據(jù) 量過大導(dǎo)致內(nèi)存溢出。
下圖就是一個(gè)很清晰的例子:hello 這個(gè) key,在三個(gè)節(jié)點(diǎn)上對(duì)應(yīng)了總共 7 條數(shù)據(jù),這些數(shù)據(jù)都會(huì)被拉 取到同一個(gè)task中進(jìn)行處理;而world 和 you 這兩個(gè) key 分別才對(duì)應(yīng) 1 條數(shù)據(jù),所以另外兩個(gè) task 只 要分別處理 1 條數(shù)據(jù)即可。此時(shí)第一個(gè) task 的運(yùn)行時(shí)間可能是另外兩個(gè) task 的 7 倍,而整個(gè) stage 的 運(yùn)行速度也由運(yùn)行最慢的那個(gè) task 所決定。
markdown
總結(jié):
數(shù)據(jù)傾斜發(fā)生的本質(zhì),就是在執(zhí)行多階段的計(jì)算的時(shí)候,中間的shuffle策略可能導(dǎo)致分發(fā)到下 游Task的數(shù)據(jù)量不均勻,進(jìn)而導(dǎo)致下游Task執(zhí)行時(shí)長(zhǎng)的不一致。不完全均勻是正常的,但是如果相差太大,那么就產(chǎn)生性能問題了。
1.4 數(shù)據(jù)傾斜的危害
從上圖可見,當(dāng)出現(xiàn)數(shù)據(jù)傾斜時(shí),小量任務(wù)耗時(shí)遠(yuǎn)高于其它任務(wù),從而使得整體耗時(shí)過大,未能充分發(fā) 揮分布式系統(tǒng)的并行計(jì)算優(yōu)勢(shì)。另外,當(dāng)發(fā)生數(shù)據(jù)傾斜時(shí),少量部分任務(wù)處理的數(shù)據(jù)量過大,可能造成 內(nèi)存不足使得任務(wù)失敗,并進(jìn)而引進(jìn)整個(gè)應(yīng)用失敗。如果應(yīng)用并沒有因此失敗,但是大量正常任務(wù)都早 早完成處于等待狀態(tài),資源得不到充分利用。
markdown
總結(jié):
1. 整體耗時(shí)過大(整個(gè)任務(wù)的完成由執(zhí)行時(shí)間最長(zhǎng)的那個(gè)Task決定)
2. 應(yīng)用程序可能異常退出(某個(gè)Task執(zhí)行時(shí)處理的數(shù)據(jù)量遠(yuǎn)遠(yuǎn)大于正常節(jié)點(diǎn),則需要的資源容易出現(xiàn)瓶頸, 當(dāng)資源不足,則應(yīng)用程序退出)
3. 資源閑置(處理等待狀態(tài)的Task資源得不到及時(shí)的釋放,處于閑置浪費(fèi)狀態(tài))
1.5 數(shù)據(jù)傾斜是如何造成的
在 Spark 中,同一個(gè) Stage 的不同 Partition 可以并行處理,而具有依賴關(guān)系的不同 Stage 之間是串行 處理的。假設(shè)某個(gè) Spark Job 分為Stage0 和 Stage1 兩個(gè) Stage,且 Stage1 依賴于 Stage0,那 Stage0 完全處理結(jié)束之前不會(huì)處理 Stage1。而 Stage0 可能包含 N 個(gè)Task,這 N 個(gè) Task 可以并行進(jìn)行。如 果其中 N-1 個(gè) Task 都在 10 秒內(nèi)完成,而另外一個(gè) Task 卻耗時(shí) 1 分鐘,那該 Stage 的總時(shí)間至少為 1 分鐘。換句話說,一個(gè) Stage 所耗費(fèi)的時(shí)間,主要由最慢的那個(gè) Task 決定。由于同一個(gè) Stage 內(nèi)的所有 Task 執(zhí)行相同的計(jì)算,在排除不同計(jì)算節(jié)點(diǎn)計(jì)算能力差異的前提下,不同 Task 之間耗時(shí)的差異主要由該 Task 所處理的數(shù)據(jù)量決定。Stage 的數(shù)據(jù)來源主要分為如下兩類:
markdown
1)數(shù)據(jù)源本身分布有問題:從數(shù)據(jù)源直接讀取。如讀取HDFS,Kafka,有可能出現(xiàn),大概率不會(huì)
2)自己指定的分區(qū)規(guī)則:讀取上一個(gè) Stage 的 Shuffle 數(shù)據(jù)
樸素的分布式計(jì)算的核心思想:
1) 大問題拆分成小問題:分而治之
2)既然要分開算,那最后就一定要把分開計(jì)算的那么多的小 Task 的結(jié)果執(zhí)行匯總
3)所以必然分布式計(jì)算引擎的設(shè)計(jì)中,應(yīng)用程序的執(zhí)行一定是分階段
4)分布計(jì)算引擎的而核心:一個(gè)復(fù)雜的分布式計(jì)算應(yīng)用程序的執(zhí)行肯定要分成多個(gè)階段,每個(gè)階段分布式并 行運(yùn)行多個(gè)Task
5)DAG引擎:
Spark: stage1 ==> stage2 ===> stage3
mapreduce: 就只有兩個(gè)階段:mapper reducer
階段與階段之間需要進(jìn)行 shuffle,只要進(jìn)行了數(shù)據(jù)混洗,就存在著數(shù)據(jù)分發(fā)不均勻的情況。如果情況嚴(yán) 重,就是數(shù)據(jù)傾斜。
分布式計(jì)算引擎的設(shè)計(jì),免不了有shuffle,既然有shuffle操作,就一定有產(chǎn)生數(shù)據(jù)傾斜的可能。如果 你是做大數(shù)據(jù)處理的,就一定會(huì)遇到 數(shù)據(jù)傾斜!
- 如何避免數(shù)據(jù)傾斜
2.1 避免數(shù)據(jù)源傾斜-HDFS
Spark通過 textFile(path, minPartitions) 方法讀取文件時(shí),使用 TextInputFormat。對(duì)于不可切分的文件,每個(gè)文件對(duì)應(yīng)一個(gè) Split 從而對(duì)應(yīng)一個(gè) Partition。此時(shí)各文件大小是否一致,很大程度上決定了是否存在數(shù)據(jù)源側(cè)的數(shù)據(jù)傾斜。另外,對(duì)于不可切分的壓縮文件,即使壓縮后的文件大 小一致,它所包含的實(shí)際數(shù)據(jù)量也可能差別很多,因?yàn)樵次募?shù)據(jù)重復(fù)度越高,壓縮比越高。反過來, 即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數(shù)據(jù)量差距也可能很大。此時(shí)可通過在數(shù)據(jù)生成端將不可切分文件存儲(chǔ)為可切分文件,或者保證各文件包含數(shù)據(jù)量相同的方式避免數(shù)據(jù)傾斜。
markdown
對(duì)于不可切分文件可能出現(xiàn)數(shù)據(jù)傾斜,對(duì)于可切分文件,一般來說,不存在數(shù)據(jù)傾斜問題。
1)可切分: 基本上不會(huì)! 默認(rèn)數(shù)據(jù)塊大小:128M
2)不可切分: 源文件不均勻,最終導(dǎo)致 分布式引用程序計(jì)算產(chǎn)生數(shù)據(jù)傾斜 日志:每一個(gè)小時(shí)生成一個(gè)日志文件
2.2 避免數(shù)據(jù)源傾斜-Kaka
Topic 主題: 分布式的組織形式:分區(qū), 既然要進(jìn)行數(shù)據(jù)分區(qū),那就有可能產(chǎn)生數(shù)據(jù)分布不均勻
以 Spark Stream 通過 DirectStream 方式讀取 Kafka 數(shù)據(jù)為例。由于 Kafka 的每一個(gè) Partition 對(duì)應(yīng) Spark 的一個(gè) Task(Partition),所以 Kafka 內(nèi)相關(guān) Topic 的各 Partition 之間數(shù)據(jù)是否平衡,直接決 定 Spark 處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。
Kafka 某一 Topic 內(nèi)消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實(shí)現(xiàn) 類決定。如果使用隨機(jī) Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè) Partition 中,從而從概率上來講, 各 Partition 間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源 Stage(直接讀取 Kafka 數(shù)據(jù)的 Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
但很多時(shí)候,業(yè)務(wù)場(chǎng)景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù) 放于同一個(gè) Partition 中。一個(gè)典型的場(chǎng)景是,需要將同一個(gè)用戶相關(guān)的PV信息置于同一個(gè) Partition 中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過其它方式處理。
markdown
* 以 Spark Stream 通過 DirectStream 方式讀取 Kafka 數(shù)據(jù)為例。由于 Kafka 的每一個(gè) Partition 對(duì)應(yīng) Spark 的一個(gè) Task(Partition),所以 Kafka 內(nèi)相關(guān) Topic 的各 Partition 之間數(shù)據(jù)是否平衡,直接決 定 Spark 處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。
* Kafka 某一 Topic 內(nèi)消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實(shí)現(xiàn) 類決定。如果使用隨機(jī) Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè) Partition 中,從而從概率上來講, 各 Partition 間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源 Stage(直接讀取 Kafka 數(shù)據(jù)的 Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
* 但很多時(shí)候,業(yè)務(wù)場(chǎng)景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù) 放于同一個(gè) Partition 中。一個(gè)典型的場(chǎng)景是,需要將同一個(gè)用戶相關(guān)的PV信息置于同一個(gè) Partition 中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過其它方式處理。
2.3 定位處理邏輯 **- Stage** **和** Task
歸根結(jié)底,數(shù)據(jù)傾斜產(chǎn)生的原因,就是兩個(gè) stage 中的 shuffle 過程導(dǎo)致的。所以我們只需要研究Shuffle 算子即可。我們知道了導(dǎo)致數(shù)據(jù)傾斜的問題就是 shuffle 算子,所以我們先去找到代碼中的 shuffle 的算子,比如 distinct、groupByKey、reduceByKey、aggergateByKey、join、cogroup、repartition 等,那么問 題一定就出現(xiàn)在這里。spark的執(zhí)行,按照hsuffle算子分成多個(gè)stage來執(zhí)行。
markdown
* 如果 Spark Application 運(yùn)行過程中,出現(xiàn)數(shù)據(jù)傾斜,可以通過 web 管理監(jiān)控界面,查看 各stage 的運(yùn)行情況,如果某一個(gè) stage 的運(yùn)行很長(zhǎng),并且這個(gè) stage 的大部分Task都運(yùn)行很快,則
2.4 查看導(dǎo)致傾斜的key的數(shù)據(jù)分布情況
知道了數(shù)據(jù)傾斜發(fā)生在哪里之后,通常需要分析一下那個(gè)執(zhí)行了shuffle操作并且導(dǎo)致了數(shù)據(jù)傾斜的 RDD/Hive表,查看一下其中key的分布情況。這主要是為之后選擇哪一種技術(shù)方案提供依據(jù)。針對(duì)不同 的key分布與不同的shuffle算子組合起來的各種情況,可能需要選擇不同的技術(shù)方案來解決。此時(shí)根據(jù)你執(zhí)行操作的情況不同,可以有很多種查看key分布的方式:
markdown
1)如果是Spark SQL中的group by、join語句導(dǎo)致的數(shù)據(jù)傾斜,那么就查詢一下 SQL 中使用的表的key 分布情況。
2)如果是對(duì) Spark RDD執(zhí)行shuffle算子導(dǎo)致的數(shù)據(jù)傾斜,那么可以在Spark作業(yè)中加入查看 key 分布 的代碼,比如 RDD.countByKey()。然后對(duì)統(tǒng)計(jì)出來的各個(gè)key出現(xiàn)的次數(shù),collect/take到客戶端打印 一下,就可以看到key的分布情況。
舉例來說,對(duì)于上面所說的單詞計(jì)數(shù)程序,如果確定了是 stage1 的 reduceByKey 算子導(dǎo)致了數(shù)據(jù)傾 斜,那么就應(yīng)該看看進(jìn)行 reduceByKey 操作的 RDD 中的 key 分布情況,在這個(gè)例子中指的就是 pairs RDD。如下示例,我們可以先對(duì) pairs 采樣 10% 的樣本數(shù)據(jù),然后使用 countByKey 算子統(tǒng)計(jì)出每個(gè) key 出現(xiàn)的次數(shù),最后在客戶端遍歷和打印樣本數(shù)據(jù)中各個(gè) key 的出現(xiàn)次數(shù)。
scala
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
采樣!(離線處理:無放回采樣, 流式處理:魚塘采樣)
- 數(shù)據(jù)傾斜解決方案
3.1 Hive ETL處理
3.1.1 適用場(chǎng)景
導(dǎo)致數(shù)據(jù)傾斜的是 Hive 表。如果該 Hive 表中的數(shù)據(jù)本身很不均勻(比如某個(gè) key 對(duì)應(yīng)了 100 萬數(shù) 據(jù),其他 key 才對(duì)應(yīng)了 10 條數(shù)據(jù)),而且業(yè)務(wù)場(chǎng)景需要頻繁使用 Spark 對(duì) Hive 表執(zhí)行某個(gè)分析操作,那么比較適合使用這種技術(shù)方案
3.1.2 實(shí)現(xiàn)思路
此時(shí)可以評(píng)估一下,是否可以通過Hive來進(jìn)行數(shù)據(jù)預(yù)處理(即通過 Hive ETL 預(yù)先對(duì)數(shù)據(jù)按照 key 進(jìn)行 聚合,或者是預(yù)先和其他表進(jìn)行join),然后在 Spark 作業(yè)中針對(duì)的數(shù)據(jù)源就不是原來的 Hive 表了, 而是預(yù)處理后的Hive表。此時(shí)由于數(shù)據(jù)已經(jīng)預(yù)先進(jìn)行過聚合或join操作了,那么在 Spark 作業(yè)中也就不 需要使用原先的 shuffle 類算子執(zhí)行這類操作了。
3.1.3 實(shí)現(xiàn)原理
這種方案從根源上解決了數(shù)據(jù)傾斜,因?yàn)閺氐妆苊饬嗽赟park中執(zhí)行shuffle類算子,那么肯定就不會(huì)有 數(shù)據(jù)傾斜的問題了。但是這里也要提醒一下大家,這種方式屬于治標(biāo)不治本。因?yàn)楫吘箶?shù)據(jù)本身就存在 分布不均勻的問題,所以Hive ETL中進(jìn)行g(shù)roup by或者join等shuffle操作時(shí),還是會(huì)出現(xiàn)數(shù)據(jù)傾斜,導(dǎo) 致Hive ETL的速度很慢。我們只是把數(shù)據(jù)傾斜的發(fā)生提前到了Hive ETL中,避免Spark程序發(fā)生數(shù)據(jù)傾斜而已。
3.1.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 實(shí)現(xiàn)起來簡(jiǎn)單便捷,效果還非常好,完全規(guī)避掉了數(shù)據(jù)傾斜,Spark作業(yè)的性能會(huì)大幅度提升。
* 缺點(diǎn):治標(biāo)不治本,Hive ETL中還是會(huì)發(fā)生數(shù)據(jù)傾斜。
3.1.5 企業(yè)最佳實(shí)踐
markdown
* 在一些 Java 系統(tǒng)與 Spark 結(jié)合使用的項(xiàng)目中,會(huì)出現(xiàn) Java 代碼頻繁調(diào)用 Spark 作業(yè)的場(chǎng)景,而且對(duì) Spark 作業(yè)的執(zhí)行性能要求很高,就比較適合使用這種方案。將數(shù)據(jù)傾斜提前到上游的 Hive ETL,每天 僅執(zhí)行一次,只有那一次是比較慢的,而之后每次 Java 調(diào)用 Spark作業(yè)時(shí),執(zhí)行速度都會(huì)很快,能夠 提供更好的用戶體驗(yàn)。
* 在美團(tuán)·點(diǎn)評(píng)的交互式用戶行為分析系統(tǒng)中使用了這種方案,該系統(tǒng)主要是允許用戶通過 Java Web 系統(tǒng) 提交數(shù)據(jù)分析統(tǒng)計(jì)任務(wù),后端通過Java 提交 Spark作業(yè)進(jìn)行數(shù)據(jù)分析統(tǒng)計(jì)。要求 Spark 作業(yè)速度必須要 快,盡量在10 分鐘以內(nèi),否則速度太慢,用戶體驗(yàn)會(huì)很差。所以我們將有些 Spark 作業(yè)的shuffle操作 提前到了Hive ETL中,從而讓 Spark 直接使用預(yù)處理的 Hive 中間表,盡可能地減少 Spark 的 shuffle操 作,大幅度提升了性能,將部分作業(yè)的性能提升了6倍以上。
3.2 調(diào)整shuffle操作的并行度
3.2.1 適用場(chǎng)景
大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過大。
如果我們必須要對(duì)數(shù)據(jù)傾斜迎難而上,那么建議優(yōu)先使用這種方案,因?yàn)檫@是處理數(shù)據(jù)傾斜最簡(jiǎn)單的一 種方案。但是也是一種屬于碰運(yùn)氣的方案。因?yàn)檫@種方案,并不能讓你一定解決數(shù)據(jù)傾斜,甚至有可能 加重。那當(dāng)然,總歸,你會(huì)調(diào)整到一個(gè)合適的并行度是能解決的。前提是這種方案適用于 Hash散列的 分區(qū)方式。湊巧的是,各種分布式計(jì)算引擎,比如MapReduce,Spark 等默認(rèn)都是使用 Hash散列的方 式來進(jìn)行數(shù)據(jù)分區(qū)。
Spark 在做 Shuffle 時(shí),默認(rèn)使用 HashPartitioner(非Hash Shuffle)對(duì)數(shù)據(jù)進(jìn)行分區(qū)。如果并行度設(shè) 置的不合適,可能造成大量不相同的 Key 對(duì)應(yīng)的數(shù)據(jù)被分配到了同一個(gè) Task 上,造成該 Task 所處理 的數(shù)據(jù)遠(yuǎn)大于其它 Task,從而造成數(shù)據(jù)傾斜。
如果調(diào)整 Shuffle 時(shí)的并行度,使得原本被分配到同一 Task 的不同 Key 發(fā)配到不同 Task 上處理,則可 降低原 Task 所需處理的數(shù)據(jù)量,從而緩解數(shù)據(jù)傾斜問題造成的短板效應(yīng)。
3.2.2 實(shí)現(xiàn)思路
在對(duì) RDD 執(zhí)行 Shuffle 算子時(shí),給 Shuffle 算子傳入一個(gè)參數(shù),比如 reduceByKey(1000),該參數(shù)就 設(shè)置了這個(gè) shuffle 算子執(zhí)行時(shí)shuffle read task 的數(shù)量。對(duì)于 Spark SQL 中的 Shuffle 類語句,比如 group by、join 等,需要設(shè)置一個(gè)參數(shù),即 spark.sql.shuffle.partitions,該參數(shù)代表了 shuffle readTask 的并行度,該值默認(rèn)是 200,對(duì)于很多場(chǎng)景來說都有點(diǎn)過小。
3.2.3 實(shí)現(xiàn)原理
增加 shuffle read task 的數(shù)量,可以讓原本分配給一個(gè) task 的多個(gè) key 分配給多個(gè) task,從而讓每個(gè) task 處理比原來更少的數(shù)據(jù)。舉例來說,如果原本有 5 個(gè)key,每個(gè) key 對(duì)應(yīng) 10 條數(shù)據(jù),這 5 個(gè) key 都是分配給一個(gè) task 的,那么這個(gè) task 就要處理 50 條數(shù)據(jù)。而增加了 shuffle read task 以后,每個(gè) task 就分配到一個(gè) key,即每個(gè) task 就處理 10 條數(shù)據(jù),那么自然每個(gè) task 的執(zhí)行時(shí)間都會(huì)變短了。 具體原理如下圖所示。
一句話總結(jié):調(diào)整并行度分散同一個(gè) Task的不同 Key,之前由于運(yùn)氣比較差,多個(gè)數(shù)據(jù)比較多的 key 都分布式在同一個(gè) Task 上,如果調(diào)整了并行度,極大可能會(huì)讓這些 key 分布式到不同的 Task,有效緩 解數(shù)據(jù)傾斜。
3.2.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 實(shí)現(xiàn)起來比較簡(jiǎn)單,可以有效緩解和減輕數(shù)據(jù)傾斜的影響。實(shí)現(xiàn)簡(jiǎn)單,可在需要Shuffle的操作算子上直接設(shè) 置并行度或者使用spark.default.parallelism設(shè)置。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設(shè)置并行度??捎米钚〉拇鷥r(jià)解決問題。一般如果出現(xiàn) 數(shù)據(jù)傾斜,都可以通過這種方法先試驗(yàn)幾次,如果問題未解決,再嘗試其它方法。
* 缺點(diǎn):只是緩解了數(shù)據(jù)傾斜而已,沒有徹底根除問題,根據(jù)實(shí)踐經(jīng)驗(yàn)來看,其效果有限。適用場(chǎng)景少,只能將分配到 同一Task的不同Key分散開,但對(duì)于同一Key傾斜嚴(yán)重的情況該方法并不適用。并且該方法一般只能緩解數(shù)據(jù) 傾斜,沒有徹底消除問題。從實(shí)踐經(jīng)驗(yàn)來看,其效果一般。
3.2.5 企業(yè)最佳實(shí)踐
markdown
* 該方案通常無法徹底解決數(shù)據(jù)傾斜,因?yàn)槿绻霈F(xiàn)一些極端情況,比如某個(gè)key對(duì)應(yīng)的數(shù)據(jù)量有100萬, 那么無論你的task數(shù)量增加到多少,這個(gè)對(duì)應(yīng)著100萬數(shù)據(jù)的key肯定還是會(huì)分配到一個(gè)task中去處理, 因此注定還是會(huì)發(fā)生數(shù)據(jù)傾斜的。所以這種方案只能說是在發(fā)現(xiàn)數(shù)據(jù)傾斜時(shí)嘗試使用的第一種手段,嘗 試去用嘴簡(jiǎn)單的方法緩解數(shù)據(jù)傾斜而已,或者是和其他方案結(jié)合起來使用。
3.3 過濾少數(shù)導(dǎo)致傾斜的key
3.3.1 適用場(chǎng)景
如果發(fā)現(xiàn)導(dǎo)致傾斜的 key 就少數(shù)幾個(gè),而且對(duì)計(jì)算本身的影響并不大的話,那么很適合使用這種方案。 比如 99% 的 key 就對(duì)應(yīng) 10 條數(shù)據(jù),但是只有一個(gè) key 對(duì)應(yīng)了 100 萬數(shù)據(jù),從而導(dǎo)致了數(shù)據(jù)傾斜。
3.3.2 實(shí)現(xiàn)思路
如果我們判斷那少數(shù)幾個(gè)數(shù)據(jù)量特別多的 key,對(duì)作業(yè)的執(zhí)行和計(jì)算結(jié)果不是特別重要的話,那么干脆 就直接過濾掉那少數(shù)幾個(gè) key。比如,在 Spark SQL 中可以使用 where 子句過濾掉這些 key 或者在 SparkCore 中對(duì) RDD 執(zhí)行 filter 算子過濾掉這些 key。如果需要每次作業(yè)執(zhí)行時(shí),動(dòng)態(tài)判定哪些 key 的數(shù)據(jù)量最多然后再進(jìn)行過濾,那么可以使用 sample 算子對(duì) RDD 進(jìn)行采樣,然后計(jì)算出每個(gè) key 的 數(shù)量,取數(shù)據(jù)量最多的 key 過濾掉即可。
3.2.3 實(shí)現(xiàn)原理
將導(dǎo)致數(shù)據(jù)傾斜的 key 給過濾掉之后,這些 key 就不會(huì)參與計(jì)算了,自然不可能產(chǎn)生數(shù)據(jù)傾斜。
3.3.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn):實(shí)現(xiàn)簡(jiǎn)單,而且效果也很好,可以完全規(guī)避掉數(shù)據(jù)傾斜。
* 缺點(diǎn):適用場(chǎng)景不多,大多數(shù)情況下,導(dǎo)致傾斜的key還是很多的,并不是只有少數(shù)幾個(gè)
3.3.5 企業(yè)最佳實(shí)踐
markdown
* 在項(xiàng)目中我們也采用過這種方案解決數(shù)據(jù)傾斜。有一次發(fā)現(xiàn)某一天 Spark 作業(yè)在運(yùn)行的時(shí)候突然 OOM 了,追查之后發(fā)現(xiàn),是 Hive 表中的某一個(gè) key 在那天數(shù)據(jù)異常,導(dǎo)致數(shù)據(jù)量暴增。因此就采取每次執(zhí) 行前先進(jìn)行采樣,計(jì)算出樣本中數(shù)據(jù)量最大的幾個(gè) key 之后,直接在程序中將那些key給過濾掉。
3.4 將reduce join轉(zhuǎn)為map join
3.4.1 適用場(chǎng)景
在對(duì) RDD 使用 join 類操作,或者是在 Spark SQL 中使用 join 語句時(shí),而且 join 操作中的一個(gè) RDD 或 表的數(shù)據(jù)量比較小(比如幾百M(fèi)或者一兩G),比較適用此方案。
在分布式計(jì)算引擎中,實(shí)現(xiàn)Join的思路有兩種:
1)MapJoin,顧名思義,Join邏輯的完成是在 Mapper 階段就完成了,這是假定執(zhí)行的是 MapReduce任務(wù),如果是 Spark任務(wù),表示只用一個(gè) Stage 就執(zhí)行完了 Join 操作。
markdown
* 優(yōu)點(diǎn):避免了兩階段之間的shuffle,效率高,沒有shuffle也就沒有了傾斜。
* 缺點(diǎn):多使用內(nèi)存資源,只適合大小表做join的場(chǎng)景
2)ReduceJoin,顧名思義,Join邏輯的完成是在 Reducer 階段完成的。那么如果是MapReduce任 務(wù),則表示 Maper階段執(zhí)行完之后把數(shù)據(jù) Shuffle到 Reducer階段來執(zhí)行 Join 邏輯,那么就可能導(dǎo)致數(shù) 據(jù)傾斜。如果是 Spark任務(wù),意味著,上一個(gè)stage的執(zhí)行結(jié)果數(shù)據(jù)shuffle到 下一個(gè)stage中來完成 Join 操作,同樣也可能產(chǎn)生數(shù)據(jù)傾斜。
markdown
* 優(yōu)點(diǎn):這是一種通用的join,在不產(chǎn)生數(shù)據(jù)傾斜的情況下,能完成各種類型的join
* 缺點(diǎn):會(huì)發(fā)生數(shù)據(jù)傾斜的情況
3.4.2 實(shí)現(xiàn)思路
不使用join算子進(jìn)行連接操作,而使用Broadcast變量與map類算子實(shí)現(xiàn)join操作,進(jìn)而完全規(guī)避掉 shuffle類的操作,徹底避免數(shù)據(jù)傾斜的發(fā)生和出現(xiàn)。將較小RDD中的數(shù)據(jù)直接通過collect算子拉取到 Driver端的內(nèi)存中來,然后對(duì)其創(chuàng)建一個(gè)Broadcast變量;接著對(duì)另外一個(gè)RDD執(zhí)行map類算子,在算 子函數(shù)內(nèi),從Broadcast變量中獲取較小RDD的全量數(shù)據(jù),與當(dāng)前RDD的每一條數(shù)據(jù)按照連接key進(jìn)行 比對(duì),如果連接key相同的話,那么就將兩個(gè)RDD的數(shù)據(jù)用你需要的方式連接起來。
3.4.3 實(shí)現(xiàn)原理
普通的 join 是會(huì)走 shuffle 過程的,而一旦 shuffle,就相當(dāng)于會(huì)將相同 key 的數(shù)據(jù)拉取到一個(gè) shuffle read task 中再進(jìn)行 join,此時(shí)就是 reduce join。但是如果一個(gè) RDD 是比較小的,則可以采用廣播小 RDD 全量數(shù)據(jù) +map 算子來實(shí)現(xiàn)與 join 同樣的效果,也就是 map join,此時(shí)就不會(huì)發(fā)生 shuffle 操 作,也就不會(huì)發(fā)生數(shù)據(jù)傾斜。具體原理如下圖所示。
3.4.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 對(duì)join操作導(dǎo)致的數(shù)據(jù)傾斜,效果非常好,因?yàn)楦揪筒粫?huì)發(fā)生shuffle,也就根本不會(huì)發(fā)生數(shù)據(jù)傾斜。
* 缺點(diǎn): 適用場(chǎng)景較少,因?yàn)檫@個(gè)方案只適用于一個(gè)大表和一個(gè)小表的情況。畢竟我們需要將小表進(jìn)行廣播,此時(shí)會(huì)比 較消耗內(nèi)存資源,driver 和每個(gè)Executor 內(nèi)存中都會(huì)駐留一份小 RDD 的全量數(shù)據(jù)。如果我們廣播出去 的 RDD 數(shù)據(jù)比較大,比如 10G 以上,那么就可能發(fā)生內(nèi)存溢出了。因此并不適合兩個(gè)都是大表的情況。
3.5 采樣傾斜 key并分拆 join操作
3.5.1 適用場(chǎng)景
兩個(gè) RDD/Hive 表進(jìn)行 join 的時(shí)候,如果數(shù)據(jù)量都比較大,無法采用3.5方案,那么此時(shí)可以看一 下兩個(gè) RDD/Hive 表中的 key 分布情況。如果出現(xiàn)數(shù)據(jù)傾斜,是因?yàn)槠渲心骋粋€(gè) RDD/Hive 表中的少數(shù) 幾個(gè) key 的數(shù)據(jù)量過大,而另一個(gè) RDD/Hive 表中的所有 key 都分布比較均勻,那么采用這個(gè)解決方案 是比較合適的。
3.5.2 實(shí)現(xiàn)思路
markdown
1)對(duì)包含少數(shù)幾個(gè)數(shù)據(jù)量過大的key的那個(gè)RDD,通過sample算子采樣出一份樣本來,然后統(tǒng)計(jì)一下每個(gè) key的數(shù)量,計(jì)算出來數(shù)據(jù)量最大的是哪幾個(gè)key。
2)然后將這幾個(gè)key對(duì)應(yīng)的數(shù)據(jù)從原來的RDD中拆分出來,形成一個(gè)單獨(dú)的RDD,并給每個(gè)key都打上n以內(nèi)的 隨機(jī)數(shù)作為前綴,而不會(huì)導(dǎo)致傾斜的大部分key形成另外一個(gè)RDD。
3)接著將需要join的另一個(gè)RDD,也過濾出來那幾個(gè)傾斜key對(duì)應(yīng)的數(shù)據(jù)并形成一個(gè)單獨(dú)的RDD,將每條數(shù)據(jù) 膨脹成n條數(shù)據(jù),這n條數(shù)據(jù)都按順序附加一個(gè)0~n的前綴,不會(huì)導(dǎo)致傾斜的大部分key也形成另外一個(gè)RDD。
4)再將附加了隨機(jī)前綴的獨(dú)立RDD與另一個(gè)膨脹n倍的獨(dú)立RDD進(jìn)行join,此時(shí)就可以將原先相同的key打散 成n份,分散到多個(gè)task中去進(jìn)行join了。
5)而另外兩個(gè)普通的RDD就照常join即可。
6)最后將兩次join的結(jié)果使用union算子合并起來即可,就是最終的join結(jié)果。
3.5.3 實(shí)現(xiàn)原理
對(duì)于 join 導(dǎo)致的數(shù)據(jù)傾斜,如果只是某幾個(gè) key 導(dǎo)致了傾斜,可以將少數(shù)幾個(gè) key 分拆成獨(dú)立 RDD, 并附加隨機(jī)前綴打散成 n 份去進(jìn)行join,此時(shí)這幾個(gè) key 對(duì)應(yīng)的數(shù)據(jù)就不會(huì)集中在少數(shù)幾個(gè) task 上, 而是分散到多個(gè) task 進(jìn)行 join 了。
3.5.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 對(duì)于join導(dǎo)致的數(shù)據(jù)傾斜,如果只是某幾個(gè)key導(dǎo)致了傾斜,采用該方式可以用最有效的方式打散key進(jìn)行 join。而且只需要針對(duì)少數(shù)傾斜key對(duì)應(yīng)的數(shù)據(jù)進(jìn)行擴(kuò)容n倍,不需要對(duì)全量數(shù)據(jù)進(jìn)行擴(kuò)容。避免了占用過多 內(nèi)存。
* 缺點(diǎn): 如果導(dǎo)致傾斜的key特別多的話,比如成千上萬個(gè)key都導(dǎo)致數(shù)據(jù)傾斜,那么這種方式也不適合。
3.6 兩階段聚合(局部聚合+全局聚合)
3.6.1 適用場(chǎng)景
對(duì)RDD執(zhí)行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進(jìn)行分組聚合時(shí), 比較適用這種方案。
3.6.2 實(shí)現(xiàn)思路
這個(gè)方案的核心實(shí)現(xiàn)思路就是進(jìn)行兩階段聚合。第一次是局部聚合,先給每個(gè)key都打上一個(gè)隨機(jī)數(shù), 比如10以內(nèi)的隨機(jī)數(shù),此時(shí)原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會(huì)變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對(duì)打上隨機(jī)數(shù)后的數(shù)據(jù),執(zhí)行 reduceByKey等聚合操作,進(jìn)行局部聚合,那么局部聚合結(jié)果,就會(huì)變成了(1_hello, 2) (2_hello, 2)。然 后將各個(gè)key的前綴給去掉,就會(huì)變成(hello,2)(hello,2),再次進(jìn)行全局聚合操作,就可以得到最終結(jié)果 了,比如(hello, 4)。
3.6.3 實(shí)現(xiàn)原理
將原本相同的key通過附加隨機(jī)前綴的方式,變成多個(gè)不同的key,就可以讓原本被一個(gè)task處理的數(shù)據(jù) 分散到多個(gè)task上去做局部聚合,進(jìn)而解決單個(gè)task處理數(shù)據(jù)量過多的問題。接著去除掉隨機(jī)前綴,再 次進(jìn)行全局聚合,就可以得到最終的結(jié)果。具體原理見下圖:
3.6.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 對(duì)于聚合類的shuffle操作導(dǎo)致的數(shù)據(jù)傾斜,效果是非常不錯(cuò)的。通常都可以解決掉數(shù)據(jù)傾斜,或者至少是大 幅度緩解數(shù)據(jù)傾斜,將Spark作業(yè)的性能提升數(shù)倍以上。
* 缺點(diǎn): 僅僅適用于聚合類的shuffle操作,適用范圍相對(duì)較窄。如果是join類的shuffle操作,還得用其他的解決 方案。
3.7 使用隨機(jī)前綴和擴(kuò)容 RDD 進(jìn)行 join
3.7.1 適用場(chǎng)景
如果在進(jìn)行 join 操作時(shí),RDD 中有大量的 key 導(dǎo)致數(shù)據(jù)傾斜,那么進(jìn)行分拆 key 也沒什么意義。
3.7.2 實(shí)現(xiàn)思路
markdown
1)該方案的實(shí)現(xiàn)思路基本和3.5方案類似,首先查看 RDD/Hive 表中的數(shù)據(jù)分布情況,找到那個(gè)造成 數(shù)據(jù)傾斜的 RDD/Hive 表,比如有多個(gè)key 都對(duì)應(yīng)了超過1萬條數(shù)據(jù)。
2)然后將該RDD的每條數(shù)據(jù)都打上一個(gè)n以內(nèi)的隨機(jī)前綴。
3)同時(shí)對(duì)另外一個(gè)正常的RDD進(jìn)行擴(kuò)容,將每條數(shù)據(jù)都擴(kuò)容成n條數(shù)據(jù),擴(kuò)容出來的每條數(shù)據(jù)都依次打上一個(gè) 0~n的前綴。
4)最后將兩個(gè)處理后的RDD進(jìn)行join即可。
3.7.3 實(shí)現(xiàn)原理
將原先一樣的 key 通過附加隨機(jī)前綴變成不一樣的key,然后就可以將這些處理后的“不同key”分散到多 個(gè)task中去處理,而不是讓一個(gè)task處理大量的相同key。該方案與3.6方案的不同之處就在于,上 一種方案是盡量只對(duì)少數(shù)傾斜key對(duì)應(yīng)的數(shù)據(jù)進(jìn)行特殊處理,由于處理過程需要擴(kuò)容RDD,因此上一種 方案擴(kuò)容RDD后對(duì)內(nèi)存的占用并不大;而這一種方案是針對(duì)有大量?jī)A斜key的情況,沒法將部分key拆分 出來進(jìn)行單獨(dú)處理,因此只能對(duì)整個(gè)RDD進(jìn)行數(shù)據(jù)擴(kuò)容,對(duì)內(nèi)存資源要求很高。
3.7.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 對(duì)join類型的數(shù)據(jù)傾斜基本都可以處理,而且效果也相對(duì)比較顯著,性能提升效果非常不錯(cuò)。
* 缺點(diǎn): 該方案更多的是緩解數(shù)據(jù)傾斜,而不是徹底避免數(shù)據(jù)傾斜。而且需要對(duì)整個(gè)RDD進(jìn)行擴(kuò)容,對(duì)內(nèi)存資源要求很高。
3.7.5 企業(yè)最佳實(shí)踐
markdown
* 開發(fā)一個(gè)數(shù)據(jù)需求的時(shí)候,發(fā)現(xiàn)一個(gè)join導(dǎo)致了數(shù)據(jù)傾斜。優(yōu)化之前,作業(yè)的執(zhí)行時(shí)間大約是60分鐘左右;使用該方案優(yōu)化之后,執(zhí)行時(shí)間縮短到10分鐘左右,性能提升了6倍。
3.8 任務(wù)橫切,一分為二,單獨(dú)處理
3.8.1 適用場(chǎng)景
有時(shí)候,一個(gè)Spark應(yīng)用程序中,導(dǎo)致傾斜的因素不是一個(gè)單一的,比如有一部分傾斜的因素是null, 有一部分傾斜的因素是某些個(gè)key分布特別多。那么拆分出來也得使用不同的手段來處理
3.8.2 實(shí)現(xiàn)思路
在了解清楚數(shù)據(jù)的分布規(guī)律,以及確定了數(shù)據(jù)傾斜是由何種原因?qū)е碌?,那么按照這些原因,進(jìn)行數(shù)據(jù)的拆分,然后單獨(dú)處理每個(gè)部分的數(shù)據(jù),最后把結(jié)果合起來。
3.8.3 實(shí)現(xiàn)原理
3.6方案其實(shí)是一種縱切,3.8方案就是一種橫切。原理同思路。
3.8.4 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 將多種簡(jiǎn)單的方案綜合起來,解決一個(gè)復(fù)雜的問題??梢运闵弦环N萬能的方案。
* 缺點(diǎn): 確定數(shù)據(jù)傾斜的因素比較復(fù)雜,導(dǎo)致解決該數(shù)據(jù)傾斜的方案比較難實(shí)現(xiàn)落地。代碼復(fù)雜度也較高。
3.9 多方案組合使用
markdown
* 在實(shí)踐中發(fā)現(xiàn),很多情況下,如果只是處理較為簡(jiǎn)單的數(shù)據(jù)傾斜場(chǎng)景,那么使用上述方案中的某一種基 本就可以解決。但是如果要處理一個(gè)較為復(fù)雜的數(shù)據(jù)傾斜場(chǎng)景,那么可能需要將多種方案組合起來使 用。比如說,我們針對(duì)出現(xiàn)了多個(gè)數(shù)據(jù)傾斜環(huán)節(jié)的Spark作業(yè),可以先運(yùn)用3.1和3.2方案,預(yù)處理一 部分?jǐn)?shù)據(jù),并過濾一部分?jǐn)?shù)據(jù)來緩解;其次可以對(duì)某些shuffle操作提升并行度,優(yōu)化其性能;最后還可 以針對(duì)不同的聚合或join操作,選擇一種方案來優(yōu)化其性能。大家需要對(duì)這些方案的思路和原理都透徹 理解之后,在實(shí)踐中根據(jù)各種不同的情況,靈活運(yùn)用多種方案,來解決自己的數(shù)據(jù)傾斜問題。
* 如果這多種方案,組合使用也不行,最后一招:自定義分區(qū)規(guī)則
3.10 自定義Partitioner
3.10.1 適用場(chǎng)景
大量不同的Key被分配到了相同的Task造成該Task數(shù)據(jù)量過大。
3.10.2 實(shí)現(xiàn)思路
先通過抽樣,了解數(shù)據(jù)的 key 的分布規(guī)律,然后根據(jù)規(guī)律,去定制自己的數(shù)據(jù)分區(qū)規(guī)則,盡量保證所有 的 Task 的數(shù)據(jù)量相差無幾。
3.10.3 實(shí)現(xiàn)原理
使用自定義的 Partitioner(默認(rèn)為HashPartitioner),將原本被分配到同一個(gè) Task 的不同 Key 分配 到不同 Task。
3.10.4 分區(qū)方案
- 隨機(jī)分區(qū)
markdown
* 優(yōu)點(diǎn): 數(shù)據(jù)分布均勻
* 缺點(diǎn): 具有相同特點(diǎn)的數(shù)據(jù)不會(huì)保證被分配到相同的分區(qū)
- 輪詢分區(qū)
markdown
* 優(yōu)點(diǎn): 確保一定不會(huì)出現(xiàn)數(shù)據(jù)傾斜
* 缺點(diǎn): 無法根據(jù)存儲(chǔ)/計(jì)算能力分配存儲(chǔ)/計(jì)算壓力
- Hash散列
markdown
* 優(yōu)點(diǎn): 具有相同特點(diǎn)的數(shù)據(jù)保證被分配到相同的分區(qū)
* 缺點(diǎn): 極容易產(chǎn)生數(shù)據(jù)傾斜
- 范圍分區(qū)
markdown
* 優(yōu)點(diǎn): 相鄰的數(shù)據(jù)都在相同的分區(qū)
* 缺點(diǎn): 部分分區(qū)的數(shù)據(jù)量會(huì)超出其他的分區(qū),需要進(jìn)行裂變以保持所有分區(qū)的數(shù)據(jù)量是均勻的,如果每個(gè)分區(qū)不排序,那么裂變就會(huì)非常困難。
3.10.5 方案優(yōu)缺點(diǎn)
markdown
* 優(yōu)點(diǎn): 靈活,通用。
* 缺點(diǎn): 必須根據(jù)對(duì)應(yīng)的場(chǎng)景設(shè)計(jì)合理的分區(qū)方案。沒有現(xiàn)成的方案可用,需臨時(shí)實(shí)現(xiàn)。
四、案例
4.1 問題
如果是兩張?zhí)卮髮挶碜?Join 怎么辦?
markdown
# 解決方案: 位圖法
4.2 例子
最近7天`連續(xù)登錄`的用戶有哪些?假如每天登陸的用戶存在多張表或者一張表的多個(gè)分區(qū)中。如果用戶基數(shù)很高比如`10億`。 那Join的方案將會(huì)比較低效。位圖解決是一個(gè)不錯(cuò)的方案
4.3 實(shí)現(xiàn)思路
對(duì)每一天的用戶登陸數(shù)據(jù)維護(hù)一個(gè)Bitmap, 如果用戶登錄對(duì)應(yīng)的Bitmap位就置為1。將7個(gè)Bitmap按位求與,就可以得到7天連續(xù)登錄的用戶了。
更多關(guān)于大數(shù)據(jù)培訓(xùn)的問題,歡迎咨詢千鋒教育在線名師。千鋒教育多年辦學(xué),課程大綱緊跟企業(yè)需求,更科學(xué)更嚴(yán)謹(jǐn),每年培養(yǎng)泛IT人才近2萬人。不論你是零基礎(chǔ)還是想提升,都可以找到適合的班型,千鋒教育隨時(shí)歡迎你來試聽。