二、 如何避免數(shù)據(jù)傾斜
2.1 避免數(shù)據(jù)源傾斜-HDFS
Spark通過(guò) textFile(path, minPartitions) 方法讀取文件時(shí),使用 TextInputFormat。對(duì)于不可切分的文件,每個(gè)文件對(duì)應(yīng)一個(gè) Split 從而對(duì)應(yīng)一個(gè) Partition。此時(shí)各文件大小是否一致,很大程度上決定了是否存在數(shù)據(jù)源側(cè)的數(shù)據(jù)傾斜。另外,對(duì)于不可切分的壓縮文件,即使壓縮后的文件大 小一致,它所包含的實(shí)際數(shù)據(jù)量也可能差別很多,因?yàn)樵次募?shù)據(jù)重復(fù)度越高,壓縮比越高。反過(guò)來(lái), 即使壓縮文件大小接近,但由于壓縮比可能差距很大,所需處理的數(shù)據(jù)量差距也可能很大。此時(shí)可通過(guò)在數(shù)據(jù)生成端將不可切分文件存儲(chǔ)為可切分文件,或者保證各文件包含數(shù)據(jù)量相同的方式避免數(shù)據(jù)傾斜。
# 對(duì)于不可切分文件可能出現(xiàn)數(shù)據(jù)傾斜,對(duì)于可切分文件,一般來(lái)說(shuō),不存在數(shù)據(jù)傾斜問(wèn)題。
1. 可切分: 基本上不會(huì)! 默認(rèn)數(shù)據(jù)塊大小:128M
2. 不可切分: 源文件不均勻,最終導(dǎo)致 分布式引用程序計(jì)算產(chǎn)生數(shù)據(jù)傾斜 日志:每一個(gè)小時(shí)生成一個(gè)日志文件
2.2 避免數(shù)據(jù)源傾斜-Kaka
Topic 主題: 分布式的組織形式: 分區(qū), 既然要進(jìn)行數(shù)據(jù)分區(qū),那就有可能產(chǎn)生數(shù)據(jù)分布不均勻
以 Spark Stream 通過(guò) DirectStream 方式讀取 Kafka 數(shù)據(jù)為例。由于 Kafka 的每一個(gè) Partition 對(duì)應(yīng) Spark 的一個(gè) Task(Partition),所以 Kafka 內(nèi)相關(guān) Topic 的各 Partition 之間數(shù)據(jù)是否平衡,直接決 定 Spark 處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。
Kafka 某一 Topic 內(nèi)消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實(shí)現(xiàn) 類決定。如果使用隨機(jī) Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè) Partition 中,從而從概率上來(lái)講, 各 Partition 間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源 Stage(直接讀取 Kafka 數(shù)據(jù)的 Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
但很多時(shí)候,業(yè)務(wù)場(chǎng)景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù) 放于同一個(gè) Partition 中。一個(gè)典型的場(chǎng)景是,需要將同一個(gè)用戶相關(guān)的PV信息置于同一個(gè) Partition 中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過(guò)其它方式處理。
* 以 Spark Stream 通過(guò) DirectStream 方式讀取 Kafka 數(shù)據(jù)為例。由于 Kafka 的每一個(gè) Partition 對(duì)應(yīng) Spark 的一個(gè) Task(Partition),所以 Kafka 內(nèi)相關(guān) Topic 的各 Partition 之間數(shù)據(jù)是否平衡,直接決 定 Spark 處理該數(shù)據(jù)時(shí)是否會(huì)產(chǎn)生數(shù)據(jù)傾斜。
* Kafka 某一 Topic 內(nèi)消息在不同 Partition 之間的分布,主要由 Producer 端所使用的 Partitioner 實(shí)現(xiàn) 類決定。如果使用隨機(jī) Partitioner,則每條消息會(huì)隨機(jī)發(fā)送到一個(gè) Partition 中,從而從概率上來(lái)講, 各 Partition 間的數(shù)據(jù)會(huì)達(dá)到平衡。此時(shí)源 Stage(直接讀取 Kafka 數(shù)據(jù)的 Stage)不會(huì)產(chǎn)生數(shù)據(jù)傾斜。
* 但很多時(shí)候,業(yè)務(wù)場(chǎng)景可能會(huì)要求將具備同一特征的數(shù)據(jù)順序消費(fèi),此時(shí)就需要將具有相同特征的數(shù)據(jù) 放于同一個(gè) Partition 中。一個(gè)典型的場(chǎng)景是,需要將同一個(gè)用戶相關(guān)的PV信息置于同一個(gè) Partition 中。此時(shí),如果產(chǎn)生了數(shù)據(jù)傾斜,則需要通過(guò)其它方式處理。
2.3 定位處理邏輯 - Stage 和 Task
歸根結(jié)底,數(shù)據(jù)傾斜產(chǎn)生的原因,就是兩個(gè) stage 中的 shuffle 過(guò)程導(dǎo)致的。所以我們只需要研究Shuffle 算子即可。我們知道了導(dǎo)致數(shù)據(jù)傾斜的問(wèn)題就是 shuffle 算子,所以我們先去找到代碼中的 shuffle 的算子,比如 distinct、groupByKey、reduceByKey、aggergateByKey、join、cogroup、repartition 等,那么問(wèn) 題一定就出現(xiàn)在這里。spark的執(zhí)行,按照hsuffle算子分成多個(gè)stage來(lái)執(zhí)行。
* 如果 Spark Application 運(yùn)行過(guò)程中,出現(xiàn)數(shù)據(jù)傾斜,可以通過(guò) web 管理監(jiān)控界面,查看 各stage 的運(yùn)行情況,如果某一個(gè) stage 的運(yùn)行很長(zhǎng),并且這個(gè) stage 的大部分Task都運(yùn)行很快,則
2.4 查看導(dǎo)致傾斜的key的數(shù)據(jù)分布情況
知道了數(shù)據(jù)傾斜發(fā)生在哪里之后,通常需要分析一下那個(gè)執(zhí)行了shuffle操作并且導(dǎo)致了數(shù)據(jù)傾斜的 RDD/Hive表,查看一下其中key的分布情況。這主要是為之后選擇哪一種技術(shù)方案提供依據(jù)。針對(duì)不同 的key分布與不同的shuffle算子組合起來(lái)的各種情況,可能需要選擇不同的技術(shù)方案來(lái)解決。此時(shí)根據(jù)你執(zhí)行操作的情況不同,可以有很多種查看key分布的方式:
1. 如果是Spark SQL中的group by、join語(yǔ)句導(dǎo)致的數(shù)據(jù)傾斜,那么就查詢一下 SQL 中使用的表的key 分布情況。
2. 如果是對(duì) Spark RDD執(zhí)行shuffle算子導(dǎo)致的數(shù)據(jù)傾斜,那么可以在Spark作業(yè)中加入查看 key 分布 的代碼,比如 RDD.countByKey()。然后對(duì)統(tǒng)計(jì)出來(lái)的各個(gè)key出現(xiàn)的次數(shù),collect/take到客戶端打印 一下,就可以看到key的分布情況。
舉例來(lái)說(shuō),對(duì)于上面所說(shuō)的單詞計(jì)數(shù)程序,如果確定了是 stage1 的 reduceByKey 算子導(dǎo)致了數(shù)據(jù)傾 斜,那么就應(yīng)該看看進(jìn)行 reduceByKey 操作的 RDD 中的 key 分布情況,在這個(gè)例子中指的就是 pairs RDD。如下示例,我們可以先對(duì) pairs 采樣 10% 的樣本數(shù)據(jù),然后使用 countByKey 算子統(tǒng)計(jì)出每個(gè) key 出現(xiàn)的次數(shù),最后在客戶端遍歷和打印樣本數(shù)據(jù)中各個(gè) key 的出現(xiàn)次數(shù)。
val sampledPairs = pairs.sample(false, 0.1)
val sampledWordCounts = sampledPairs.countByKey()
sampledWordCounts.foreach(println(_))
采樣!(離線處理:無(wú)放回采樣, 流式處理:魚塘采樣)