何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個數(shù)據(jù)集處理的瓶頸。
一、數(shù)據(jù)傾斜概述
1.1 什么是數(shù)據(jù)傾斜
對Hadoop、Spark、Flink這樣的大數(shù)據(jù)系統(tǒng)來講,數(shù)據(jù)量大并不可怕,可怕的是數(shù)據(jù)傾斜。
何謂數(shù)據(jù)傾斜?數(shù)據(jù)傾斜指的是,并行處理的數(shù)據(jù)集中,某一部分(如Spark或Kafka的一個Partition)的數(shù)據(jù)顯著多于其它部分,從而使得該部分的處理速度成為整個數(shù)據(jù)集處理的瓶頸。
對于分布式系統(tǒng)而言,理想情況下,隨著系統(tǒng)規(guī)模(節(jié)點數(shù)量)的增加,應(yīng)用整體耗時線性下降。如果一臺機器處理一批大量數(shù)據(jù)需要120分鐘,當(dāng)機器數(shù)量增加到三時,理想的耗時為120 / 3 = 40分鐘,如下圖所示
但是,上述情況只是理想情況,實際上將單機任務(wù)轉(zhuǎn)換成分布式任務(wù)后,會有overhead,使得總的任務(wù)量較之單機時有所增加,所以每臺機器的執(zhí)行時間加起來比單臺機器時更大。這里暫不考慮這些overhead,假設(shè)單機任務(wù)轉(zhuǎn)換成分布式任務(wù)后,總?cè)蝿?wù)量不變。
但即使如此,想做到分布式情況下每臺機器執(zhí)行時間是單機時的1 / N,就必須保證每臺機器的任務(wù)量相等。不幸的是,很多時候,任務(wù)的分配是不均勻的,甚至不均勻到大部分任務(wù)被分配到個別機器上,其它大部分機器所分配的任務(wù)量只占總得的小部分。比如一臺機器負責(zé)處理80%的任務(wù),另外兩臺機器各處理10%的任務(wù),如下圖所示
1.2 數(shù)據(jù)傾斜發(fā)生時的現(xiàn)象
• 絕大多數(shù) task 執(zhí)行得都非???,但個別 task 執(zhí)行極慢。比如,總共有 1000 個 task,997 個 task 都在 1 分鐘之內(nèi)執(zhí)行完了,但是剩余兩三個 task 卻要一兩個小時。這種情況很常見。
• 原本能夠正常執(zhí)行的 Spark 作業(yè),某天突然報出 OOM(內(nèi)存溢出)異常,觀察異常棧,是我們寫 的業(yè)務(wù)代碼造成的。這種情況比較少見。
• Task 類似下圖所示
總結(jié):
1. 大部分任務(wù)都很快執(zhí)行完,用時也相差無幾,但個別Task執(zhí)行耗時很長,整個應(yīng)用程序一直處于99%左右 的狀態(tài)。
2. 一直運行正常的Spark Application昨晚突然OOM了。
1.3 數(shù)據(jù)傾斜發(fā)生的原理
數(shù)據(jù)傾斜的原理很簡單: 在進行 shuffle 的時候,必須將各個節(jié)點上相同的 key 的數(shù)據(jù)拉取到某個節(jié)點 上的一個 task 來進行處理,比如按照 key 進行聚合或 join 等操作。此時如果某個 key 對應(yīng)的數(shù)據(jù)量特 別大的話,就會發(fā)生數(shù)據(jù)傾斜。比如大部分 key 對應(yīng) 10 條數(shù)據(jù),但是個別 key 卻對應(yīng)了 100 萬條數(shù) 據(jù),那么大部分 task 可能就只會分配到 10 條數(shù)據(jù),然后 1 秒鐘就運行完了;但是個別 task 可能分配 到了 100 萬數(shù)據(jù),要運行一兩個小時。因此,整個 Spark 作業(yè)的運行進度是由運行時間最長的那個 task 決定的。
因此出現(xiàn)數(shù)據(jù)傾斜的時候,Spark 作業(yè)看起來會運行得非常緩慢,甚至可能因為某個 task 處理的數(shù)據(jù) 量過大導(dǎo)致內(nèi)存溢出。
下圖就是一個很清晰的例子:hello 這個 key,在三個節(jié)點上對應(yīng)了總共 7 條數(shù)據(jù),這些數(shù)據(jù)都會被拉 取到同一個task中進行處理;而world 和 you 這兩個 key 分別才對應(yīng) 1 條數(shù)據(jù),所以另外兩個 task 只 要分別處理 1 條數(shù)據(jù)即可。此時第一個 task 的運行時間可能是另外兩個 task 的 7 倍,而整個 stage 的 運行速度也由運行最慢的那個 task 所決定。
總結(jié):
* 數(shù)據(jù)傾斜發(fā)生的本質(zhì),就是在執(zhí)行多階段的計算的時候,中間的shuffle策略可能導(dǎo)致分發(fā)到下 游Task的數(shù)據(jù)量不均勻,進而導(dǎo)致下游Task執(zhí)行時長的不一致。不完全均勻是正常的,但是如果相差太大,那么就產(chǎn)生性能問題了。
1.4 數(shù)據(jù)傾斜的危害
從上圖可見,當(dāng)出現(xiàn)數(shù)據(jù)傾斜時,小量任務(wù)耗時遠高于其它任務(wù),從而使得整體耗時過大,未能充分發(fā) 揮分布式系統(tǒng)的并行計算優(yōu)勢。另外,當(dāng)發(fā)生數(shù)據(jù)傾斜時,
少量部分任務(wù)處理的數(shù)據(jù)量過大,可能造成 內(nèi)存不足使得任務(wù)失敗,并進而引進整個應(yīng)用失敗。如果應(yīng)用并沒有因此失敗,但是大量正常任務(wù)都早 早完成處于等待狀態(tài),資源得不到充分利用。
總結(jié):
1. 整體耗時過大(整個任務(wù)的完成由執(zhí)行時間最長的那個Task決定)
2. 應(yīng)用程序可能異常退出(某個Task執(zhí)行時處理的數(shù)據(jù)量遠遠大于正常節(jié)點,則需要的資源容易出現(xiàn)瓶頸, 當(dāng)資源不足,則應(yīng)用程序退出)
3. 資源閑置(處理等待狀態(tài)的Task資源得不到及時的釋放,處于閑置浪費狀態(tài))
1.5 數(shù)據(jù)傾斜是如何造成的
在 Spark 中,同一個 Stage 的不同 Partition 可以并行處理,而具有依賴關(guān)系的不同 Stage 之間是串行 處理的。假設(shè)某個 Spark Job 分為Stage0 和 Stage1 兩個 Stage,且 Stage1 依賴于 Stage0,那 Stage0 完全處理結(jié)束之前不會處理 Stage1。而 Stage0 可能包含 N 個Task,這 N 個 Task 可以并行進行。如 果其中 N-1 個 Task 都在 10 秒內(nèi)完成,而另外一個 Task 卻耗時 1 分鐘,那該 Stage 的總時間至少為 1 分鐘。換句話說,一個 Stage 所耗費的時間,主要由最慢的那個 Task 決定。由于同一個 Stage 內(nèi)的所有 Task 執(zhí)行相同的計算,在排除不同計算節(jié)點計算能力差異的前提下,不同 Task 之間耗時的差異主要由該 Task 所處理的數(shù)據(jù)量決定。Stage 的數(shù)據(jù)來源主要分為如下兩類:
1. 數(shù)據(jù)源本身分布有問題:從數(shù)據(jù)源直接讀取。如讀取HDFS,Kafka,有可能出現(xiàn),大概率不會
2. 自己指定的分區(qū)規(guī)則:讀取上一個 Stage 的 Shuffle 數(shù)據(jù)
樸素的分布式計算的核心思想:
1. 大問題拆分成小問題:分而治之
2. 既然要分開算,那最后就一定要把分開計算的那么多的小 Task 的結(jié)果執(zhí)行匯總
3. 所以必然分布式計算引擎的設(shè)計中,應(yīng)用程序的執(zhí)行一定是分階段
4. 分布計算引擎的而核心:一個復(fù)雜的分布式計算應(yīng)用程序的執(zhí)行肯定要分成多個階段,每個階段分布式并 行運行多個Task
5. DAG引擎:
Spark: stage1 ==> stage2 ===> stage3
mapreduce: 就只有兩個階段:mapper reducer
階段與階段之間需要進行 shuffle,只要進行了數(shù)據(jù)混洗,就存在著數(shù)據(jù)分發(fā)不均勻的情況。如果情況嚴 重,就是數(shù)據(jù)傾斜。
分布式計算引擎的設(shè)計,免不了有shuffle,既然有shuffle操作,就一定有產(chǎn)生數(shù)據(jù)傾斜的可能。如果 你是做大數(shù)據(jù)處理的,就一定會遇到 數(shù)據(jù)傾斜!