一、概述
CPU從三十多年前的8086,到十年前的奔騰,再到當(dāng)下的多核i7。一開始,以單核cpu的主頻為目標(biāo),架構(gòu)的改良和集成電路工藝的進(jìn)步使得cpu的性能高速上升,單核cpu的主頻從老爺車的MHz階段一度接近4GHz高地。然而,也因?yàn)楣に嚭凸牡鹊南拗?,單核cpu遇到了人生的天花板,急需轉(zhuǎn)換思維,以滿足無(wú)止境的性能需求。多核cpu在此登上歷史舞臺(tái)。給你的老爺車多加兩個(gè)引擎,讓你有法拉利的感覺(jué)?,F(xiàn)時(shí)代,連手機(jī)都到處叫囂自己有4核8核處理器的時(shí)代,PC就更不用說(shuō)了。
扯遠(yuǎn)了,anyway,對(duì)于俺們程序員來(lái)說(shuō),如何利用如此強(qiáng)大的引擎完成我們的任務(wù)才是我們要考慮的。隨著大規(guī)模數(shù)據(jù)處理、大規(guī)模問(wèn)題和復(fù)雜系統(tǒng)求解需求的增加,以前的單核編程已經(jīng)有心無(wú)力了。如果程序一跑就得幾個(gè)小時(shí),甚至一天,想想都無(wú)法原諒自己。那如何讓自己更快的過(guò)度到高大上的多核并行編程中去呢?哈哈,廣大人民的力量!
目前工作中我所接觸到的并行處理框架主要有MPI、OpenMP和MapReduce(Hadoop)三個(gè)(CUDA屬于GPU并行編程,這里不提及)。MPI和Hadoop都可以在集群中運(yùn)行,而OpenMP因?yàn)楣蚕泶鎯?chǔ)結(jié)構(gòu)的關(guān)系,不能在集群上運(yùn)行,只能單機(jī)。另外,MPI可以讓數(shù)據(jù)保留在內(nèi)存中,可以為節(jié)點(diǎn)間的通信和數(shù)據(jù)交互保存上下文,所以能執(zhí)行迭代算法,而Hadoop卻不具有這個(gè)特性。因此,需要迭代的機(jī)器學(xué)習(xí)算法大多使用MPI來(lái)實(shí)現(xiàn)。當(dāng)然了,部分機(jī)器學(xué)習(xí)算法也是可以通過(guò)設(shè)計(jì)使用Hadoop來(lái)完成的。(淺見,如果錯(cuò)誤,希望各位不吝指出,謝謝)。
本文主要介紹Python環(huán)境下MPI編程的實(shí)踐基礎(chǔ)。
二、MPI與mpi4py
MPI是MessagePassingInterface的簡(jiǎn)稱,也就是消息傳遞。消息傳遞指的是并行執(zhí)行的各個(gè)進(jìn)程具有自己獨(dú)立的堆棧和代碼段,作為互不相關(guān)的多個(gè)程序獨(dú)立執(zhí)行,進(jìn)程之間的信息交互完全通過(guò)顯示地調(diào)用通信函數(shù)來(lái)完成。
Mpi4py是構(gòu)建在mpi之上的python庫(kù),使得python的數(shù)據(jù)結(jié)構(gòu)可以在進(jìn)程(或者多個(gè)cpu)之間進(jìn)行傳遞。
2.1、MPI的工作方式
很簡(jiǎn)單,就是你啟動(dòng)了一組MPI進(jìn)程,每個(gè)進(jìn)程都是執(zhí)行同樣的代碼!然后每個(gè)進(jìn)程都有一個(gè)ID,也就是rank來(lái)標(biāo)記我是誰(shuí)。什么意思呢?假設(shè)一個(gè)CPU是你請(qǐng)的一個(gè)工人,共有10個(gè)工人。你有100塊磚頭要搬,然后很公平,讓每個(gè)工人搬10塊。這時(shí)候,你把任務(wù)寫到一個(gè)任務(wù)卡里面,讓10個(gè)工人都執(zhí)行這個(gè)任務(wù)卡中的任務(wù),也就是搬磚!這個(gè)任務(wù)卡中的“搬磚”就是你寫的代碼。然后10個(gè)CPU執(zhí)行同一段代碼。需要注意的是,代碼里面的所有變量都是每個(gè)進(jìn)程獨(dú)有的,雖然名字相同。
例如,一個(gè)腳本test.py,里面包含以下代碼:
frommpi4pyimportMPI
print("helloworld'')
print("myrankis:%d"%MPI.rank)
然后我們?cè)诿钚型ㄟ^(guò)以下方式運(yùn)行:
#mpirun–np5pythontest.py
-np5指定啟動(dòng)5個(gè)mpi進(jìn)程來(lái)執(zhí)行后面的程序。相當(dāng)于對(duì)腳本拷貝了5份,每個(gè)進(jìn)程運(yùn)行一份,互不干擾。在運(yùn)行的時(shí)候代碼里面唯一的不同,就是各自的rank也就是ID不一樣。所以這個(gè)代碼就會(huì)打印5個(gè)helloworld和5個(gè)不同的rank值,從0到4.
2.2、點(diǎn)對(duì)點(diǎn)通信
點(diǎn)對(duì)點(diǎn)通信(Point-to-PointCommunication)的能力是信息傳遞系統(tǒng)最基本的要求。意思就是讓兩個(gè)進(jìn)程直接可以傳輸數(shù)據(jù),也就是一個(gè)發(fā)送數(shù)據(jù),另一個(gè)接收數(shù)據(jù)。接口就兩個(gè),send和recv,來(lái)個(gè)例子:
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
#pointtopointcommunication
data_send=[comm_rank]*5
comm.send(data_send,dest=(comm_rank+1)%comm_size)
data_recv=comm.recv(source=(comm_rank-1)%comm_size)
print("myrankis%d,andIreceived:"%comm_rank)
printdata_recv
啟動(dòng)5個(gè)進(jìn)程運(yùn)行以上代碼,結(jié)果如下:
myrankis0,andIreceived:
[4,4,4,4,4]
myrankis1,andIreceived:
[0,0,0,0,0]
myrankis2,andIreceived:
[1,1,1,1,1]
myrankis3,andIreceived:
[2,2,2,2,2]
myrankis4,andIreceived:
[3,3,3,3,3]
可以看到,每個(gè)進(jìn)程都創(chuàng)建了一個(gè)數(shù)組,然后把它傳遞給下一個(gè)進(jìn)程,最后的那個(gè)進(jìn)程傳遞給第一個(gè)進(jìn)程。comm_size就是mpi的進(jìn)程個(gè)數(shù),也就是-np指定的那個(gè)數(shù)。MPI.COMM_WORLD表示進(jìn)程所在的通信組。
但這里面有個(gè)需要注意的問(wèn)題,如果我們要發(fā)送的數(shù)據(jù)比較小的話,mpi會(huì)緩存我們的數(shù)據(jù),也就是說(shuō)執(zhí)行到send這個(gè)代碼的時(shí)候,會(huì)緩存被send的數(shù)據(jù),然后繼續(xù)執(zhí)行后面的指令,而不會(huì)等待對(duì)方進(jìn)程執(zhí)行recv指令接收完這個(gè)數(shù)據(jù)。但是,如果要發(fā)送的數(shù)據(jù)很大,那么進(jìn)程就是掛起等待,直到接收進(jìn)程執(zhí)行了recv指令接收了這個(gè)數(shù)據(jù),進(jìn)程才繼續(xù)往下執(zhí)行。所以上述的代碼發(fā)送[rank]*5沒(méi)啥問(wèn)題,如果發(fā)送[rank]*500程序就會(huì)半死不活的樣子了。因?yàn)樗械倪M(jìn)程都會(huì)卡在發(fā)送這條指令,等待下一個(gè)進(jìn)程發(fā)起接收的這個(gè)指令,但是進(jìn)程是執(zhí)行完發(fā)送的指令才能執(zhí)行接收的指令,這就和死鎖差不多了。所以一般,我們將其修改成以下的方式:
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
data_send=[comm_rank]*5
ifcomm_rank==0:
comm.send(data_send,dest=(comm_rank+1)%comm_size)
ifcomm_rank>0:
data_recv=comm.recv(source=(comm_rank-1)%comm_size)
comm.send(data_send,dest=(comm_rank+1)%comm_size)
ifcomm_rank==0:
data_recv=comm.recv(source=(comm_rank-1)%comm_size)
print("myrankis%d,andIreceived:"%comm_rank)
printdata_recv
第一個(gè)進(jìn)程一開始就發(fā)送數(shù)據(jù),其他進(jìn)程一開始都是在等待接收數(shù)據(jù),這時(shí)候進(jìn)程1接收了進(jìn)程0的數(shù)據(jù),然后發(fā)送進(jìn)程1的數(shù)據(jù),進(jìn)程2接收了,再發(fā)送進(jìn)程2的數(shù)據(jù)……知道最后進(jìn)程0接收最后一個(gè)進(jìn)程的數(shù)據(jù),從而避免了上述問(wèn)題。
一個(gè)比較常用的方法是封一個(gè)組長(zhǎng),也就是一個(gè)主進(jìn)程,一般是進(jìn)程0作為主進(jìn)程leader。主進(jìn)程將數(shù)據(jù)發(fā)送給其他的進(jìn)程,其他的進(jìn)程處理數(shù)據(jù),然后返回結(jié)果給進(jìn)程0。換句話說(shuō),就是進(jìn)程0來(lái)控制整個(gè)數(shù)據(jù)處理流程。
2.3、群體通信
點(diǎn)對(duì)點(diǎn)通信是A發(fā)送給B,一個(gè)人將自己的秘密告訴另一個(gè)人,群體通信(CollectiveCommunications)像是拿個(gè)大喇叭,一次性告訴所有的人。前者是一對(duì)一,后者是一對(duì)多。但是,群體通信是以更有效的方式工作的。它的原則就一個(gè):盡量把所有的進(jìn)程在所有的時(shí)刻都使用上!我們?cè)谙旅娴腷cast小節(jié)講述。
群體通信還是發(fā)送和接收兩類,一個(gè)是一次性把數(shù)據(jù)發(fā)給所有人,另一個(gè)是一次性從所有人那里回收結(jié)果。
1)廣播bcast
將一份數(shù)據(jù)發(fā)送給所有的進(jìn)程。例如我有200份數(shù)據(jù),有10個(gè)進(jìn)程,那么每個(gè)進(jìn)程都會(huì)得到這200份數(shù)據(jù)。
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
data=comm.bcast(dataifcomm_rank==0elseNone,root=0)
print'rank%d,got:'%(comm_rank)
printdata
結(jié)果如下:
rank0,got:
[0,1,2,3,4]
rank1,got:
[0,1,2,3,4]
rank2,got:
[0,1,2,3,4]
rank3,got:
[0,1,2,3,4]
rank4,got:
[0,1,2,3,4]
Root進(jìn)程自己建了一個(gè)列表,然后廣播給所有的進(jìn)程。這樣所有的進(jìn)程都擁有了這個(gè)列表。然后愛(ài)干嘛就干嘛了。
對(duì)廣播最直觀的觀點(diǎn)是某個(gè)特定進(jìn)程將數(shù)據(jù)一一發(fā)送給每個(gè)進(jìn)程。假設(shè)有n個(gè)進(jìn)程,那么假設(shè)我們的數(shù)據(jù)在0進(jìn)程,那么0進(jìn)程就需要將數(shù)據(jù)發(fā)送給剩下的n-1個(gè)進(jìn)程,這是非常低效的,復(fù)雜度是O(n)。那有沒(méi)有高效的方式?一個(gè)最常用也是非常高效的手段是規(guī)約樹廣播:收到廣播數(shù)據(jù)的所有進(jìn)程都參與到數(shù)據(jù)廣播的過(guò)程中。首先只有一個(gè)進(jìn)程有數(shù)據(jù),然后它把它發(fā)送給第一個(gè)進(jìn)程,此時(shí)有兩個(gè)進(jìn)程有數(shù)據(jù);然后這兩個(gè)進(jìn)程都參與到下一次的廣播中,這時(shí)就會(huì)有4個(gè)進(jìn)程有數(shù)據(jù),……,以此類推,每次都會(huì)有2的次方個(gè)進(jìn)程有數(shù)據(jù)。通過(guò)這種規(guī)約樹的廣播方法,廣播的復(fù)雜度降為O(logn)。這就是上面說(shuō)的群體通信的高效原則:充分利用所有的進(jìn)程來(lái)實(shí)現(xiàn)數(shù)據(jù)的發(fā)送和接收。
2)散播scatter
將一份數(shù)據(jù)平分給所有的進(jìn)程。例如我有200份數(shù)據(jù),有10個(gè)進(jìn)程,那么每個(gè)進(jìn)程會(huì)分別得到20份數(shù)據(jù)。
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
printdata
else:
data=None
local_data=comm.scatter(data,root=0)
print'rank%d,got:'%comm_rank
printlocal_data
結(jié)果如下:
[0,1,2,3,4]
rank0,got:
0
rank1,got:
1
rank2,got:
2
rank3,got:
3
rank4,got:
4
這里root進(jìn)程創(chuàng)建了一個(gè)list,然后將它散播給所有的進(jìn)程,相當(dāng)于對(duì)這個(gè)list做了劃分,每個(gè)進(jìn)程獲得等分的數(shù)據(jù),這里就是list的每一個(gè)數(shù)。(主要根據(jù)list的索引來(lái)劃分,list索引為第i份的數(shù)據(jù)就發(fā)送給第i個(gè)進(jìn)程)。如果是矩陣,那么就等分的劃分行,每個(gè)進(jìn)程獲得相同的行數(shù)進(jìn)行處理。
需要注意的是,MPI的工作方式是每個(gè)進(jìn)程都會(huì)執(zhí)行所有的代碼,所以每個(gè)進(jìn)程都會(huì)執(zhí)行scatter這個(gè)指令,但只有root執(zhí)行它的時(shí)候,它才兼?zhèn)浒l(fā)送者和接收者的身份(root也會(huì)得到屬于自己的數(shù)據(jù)),對(duì)于其他進(jìn)程來(lái)說(shuō),他們都只是接收者而已。
3)收集gather
那有發(fā)送,就有一起回收的函數(shù)。Gather是將所有進(jìn)程的數(shù)據(jù)收集回來(lái),合并成一個(gè)列表。下面聯(lián)合scatter和gather組成一個(gè)完成的分發(fā)和收回過(guò)程:
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
printdata
else:
data=None
local_data=comm.scatter(data,root=0)
local_data=local_data*2
print'rank%d,gotanddo:'%comm_rank
printlocal_data
combine_data=comm.gather(local_data,root=0)
ifcomm_rank==0:
printcombine_data
結(jié)果如下:
[0,1,2,3,4]
rank0,gotanddo:
0
rank1,gotanddo:
2
rank2,gotanddo:
4
rank4,gotanddo:
8
rank3,gotanddo:
6
[0,2,4,6,8]
Root進(jìn)程將數(shù)據(jù)通過(guò)scatter等分發(fā)給所有的進(jìn)程,等待所有的進(jìn)程都處理完后(這里只是簡(jiǎn)單的乘以2),root進(jìn)程再通過(guò)gather回收他們的結(jié)果,和分發(fā)的原則一樣,組成一個(gè)list。Gather還有一個(gè)變體就是allgather,可以理解為它在gather的基礎(chǔ)上將gather的結(jié)果再bcast了一次。啥意思?意思是root進(jìn)程將所有進(jìn)程的結(jié)果都回收統(tǒng)計(jì)完后,再把整個(gè)統(tǒng)計(jì)結(jié)果告訴大家。這樣,不僅root可以訪問(wèn)combine_data,所有的進(jìn)程都可以訪問(wèn)combine_data了。
4)規(guī)約reduce
規(guī)約是指不但將所有的數(shù)據(jù)收集回來(lái),收集回來(lái)的過(guò)程中還進(jìn)行了簡(jiǎn)單的計(jì)算,例如求和,求最大值等等。為什么要有這個(gè)呢?我們不是可以直接用gather全部收集回來(lái)了,再對(duì)列表求個(gè)sum或者max就可以了嗎?這樣不是累死組長(zhǎng)嗎?為什么不充分使用每個(gè)工人呢?規(guī)約實(shí)際上是使用規(guī)約樹來(lái)實(shí)現(xiàn)的。例如求max,完成可以讓工人兩兩pk后,再返回兩兩pk的最大值,然后再對(duì)第二層的最大值兩兩pk,直到返回一個(gè)最終的max給組長(zhǎng)。組長(zhǎng)就非常聰明的將工作分配下工人高效的完成了。這是O(n)的復(fù)雜度,下降到O(logn)(底數(shù)為2)的復(fù)雜度。
importmpi4py.MPIasMPI
comm=MPI.COMM_WORLD
comm_rank=comm.Get_rank()
comm_size=comm.Get_size()
ifcomm_rank==0:
data=range(comm_size)
printdata
else:
data=None
local_data=comm.scatter(data,root=0)
local_data=local_data*2
print'rank%d,gotanddo:'%comm_rank
printlocal_data
all_sum=comm.reduce(local_data,root=0,op=MPI.SUM)
ifcomm_rank==0:
print'sumis:%d'%all_sum
結(jié)果如下:
[0,1,2,3,4]
rank0,gotanddo:
0
rank1,gotanddo:
2
rank2,gotanddo:
4
rank3,gotanddo:
6
rank4,gotanddo:
8
sumis:20
可以看到,最后可以得到一個(gè)sum值。
以上內(nèi)容為大家介紹了Python多核編程mpi4py實(shí)踐,希望對(duì)大家有所幫助,如果想要了解更多Python相關(guān)知識(shí),請(qǐng)關(guān)注IT培訓(xùn)機(jī)構(gòu):千鋒教育。http://m.2667701.com/