使用numpy / scipy最大限度地减少Python multiprocessing.Pool的开销
|
我花了几个小时来尝试并行化我的数字运算代码,但是当我这样做时它只会变慢.不幸的是,当我尝试将其减少到下面的示例时,问题就消失了,我真的不想在这里发布整个程序.所以问题是:在这类程序中我应该避免哪些陷阱? (注意:Unutbu的答案在底部后跟进.) 以下是情况: >它是关于一个模块,它定义了一个包含大量内部数据的类BigData.在该示例中,存在一个插值函数列表ff;在实际程序中,还有更多,例如ffA [k],ffB [k],ffC [k]. #!/usr/bin/python2.7
import numpy as np,time,sys
from multiprocessing import Pool
from scipy.interpolate import RectBivariateSpline
_tm=0
def stopwatch(msg=''):
tm = time.time()
global _tm
if _tm==0: _tm = tm; return
print("%s: %.2f seconds" % (msg,tm-_tm))
_tm = tm
class BigData:
def __init__(self,n):
z = np.random.uniform(size=n*n*n).reshape((n,n,n))
self.ff = []
for i in range(n):
f = RectBivariateSpline(np.arange(n),np.arange(n),z[i],kx=1,ky=1)
self.ff.append(f)
self.n = n
def do_chunk(self,k,xi,yi):
s = np.sum(np.exp(self.ff[k].ev(xi,yi)))
sys.stderr.write(".")
return s
def do_multi(self,numproc,yi):
procs = []
pool = Pool(numproc)
stopwatch('Pool setup')
for k in range(self.n):
p = pool.apply_async( _do_chunk_wrapper,(self,yi))
procs.append(p)
stopwatch('Jobs queued (%d processes)' % numproc)
sum = 0.0
for k in range(self.n):
# Edit/bugfix: replaced p.get by procs[k].get
sum += np.sum(procs[k].get(timeout=30)) # timeout allows ctrl-C interrupt
if k == 0: stopwatch("nFirst get() done")
stopwatch('Jobs done')
pool.close()
pool.join()
return sum
def do_single(self,yi):
sum = 0.0
for k in range(self.n):
sum += self.do_chunk(k,yi)
stopwatch('nAll in single process')
return sum
def _do_chunk_wrapper(bd,yi): # must be outside class for apply_async to chunk
return bd.do_chunk(k,yi)
if __name__ == "__main__":
stopwatch()
n = 50
bd = BigData(n)
m = 1000*1000
xi,yi = np.random.uniform(0,size=m*2).reshape((2,m))
stopwatch('Initialized')
bd.do_multi(2,yi)
bd.do_multi(3,yi)
bd.do_single(xi,yi)
输出: Initialized: 0.06 seconds Pool setup: 0.01 seconds Jobs queued (2 processes): 0.03 seconds .. First get() done: 0.34 seconds ................................................Jobs done: 7.89 seconds Pool setup: 0.05 seconds Jobs queued (3 processes): 0.03 seconds .. First get() done: 0.50 seconds ................................................Jobs done: 6.19 seconds .................................................. All in single process: 11.41 seconds 计时采用Intel Core i3-3227 CPU,具有2个内核,4个线程,运行64位Linux.对于实际程序,多处理版本(池机制,即使只使用一个核心)比单进程版本慢10倍. 跟进 Unutbu的回答让我走上正轨.在实际的程序中,self被腌制成一个需要传递给工作进程的37到140 MB的对象.更糟糕的是,Python酸洗非常缓慢;酸洗本身花了几秒钟,这发生在传递给工人流程的每一块工作中.除了挑选和传递大数据对象之外,Linux中apply_async的开销非常小;对于一个小函数(添加几个整数参数),每个apply_async / get对只需0.2 ms.因此,以非常小的块分割工作本身并不是问题.所以,我将所有大数组参数作为索引传递给全局变量.为了CPU缓存优化,我保持小块大小. 全局变量存储在全局字典中;在设置工作池之后,将立即在父进程中删除这些条目.只有dict的密钥才会传送给工作人员.酸洗/ IPC唯一的大数据是工人创建的新数据. #!/usr/bin/python2.7
import numpy as np,sys
from multiprocessing import Pool
_mproc_data = {} # global storage for objects during multiprocessing.
class BigData:
def __init__(self,size):
self.blah = np.random.uniform(0,1,size=size)
def do_chunk(self,yi):
# do the work and return an array of the same shape as xi,yi
zi = k*np.ones_like(xi)
return zi
def do_all_work(self,yi,num_proc):
global _mproc_data
mp_key = str(id(self))
_mproc_data['bd'+mp_key] = self # BigData
_mproc_data['xi'+mp_key] = xi
_mproc_data['yi'+mp_key] = yi
pool = Pool(processes=num_proc)
# processes have now inherited the global variabele; clean up in the parent process
for v in ['bd','xi','yi']:
del _mproc_data[v+mp_key]
# setup indices for the worker processes (placeholder)
n_chunks = 45
n = len(xi)
chunk_len = n//n_chunks
i1list = np.arange(0,chunk_len)
i2list = i1list + chunk_len
i2list[-1] = n
klist = range(n_chunks) # placeholder
procs = []
for i in range(n_chunks):
p = pool.apply_async( _do_chunk_wrapper,(mp_key,i1list[i],i2list[i],klist[i]) )
sys.stderr.write(".")
procs.append(p)
sys.stderr.write("n")
# allocate space for combined results
zi = np.zeros_like(xi)
# get data from workers and finish
for i,p in enumerate(procs):
zi[i1list[i]:i2list[i]] = p.get(timeout=30) # timeout allows ctrl-C handling
pool.close()
pool.join()
return zi
def _do_chunk_wrapper(key,i1,i2,k):
"""All arguments are small objects."""
global _mproc_data
bd = _mproc_data['bd'+key]
xi = _mproc_data['xi'+key][i1:i2]
yi = _mproc_data['yi'+key][i1:i2]
return bd.do_chunk(k,yi)
if __name__ == "__main__":
xi,yi = np.linspace(1,100,100001),np.linspace(1,100001)
bd = BigData(int(1e7))
bd.do_all_work(xi,4)
以下是速度测试的结果(同样,2个内核,4个线程),改变了工作进程的数量和块中的内存量(xi,zi数组切片的总字节数).这些数字是“每秒百万结果值”,但这对比较并不重要. “1 process”的行是带有完整输入数据的do_chunk的直接调用,没有任何子进程. #Proc 125K 250K 500K 1000K unlimited 1 0.82 2 4.28 1.96 1.3 1.31 3 2.69 1.06 1.06 1.07 4 2.17 1.27 1.23 1.28 数据大小对内存的影响非常大. CPU具有3 MB共享L3缓存,每个核心具有256 KB L2缓存.请注意,计算还需要访问BigData对象的几MB内部数据.因此,我们从中学到的是进行这种速度测试很有用.对于这个程序,2个进程最快,其次是4个,3个是最慢的. 解决方法尝试减少进程间通信.在多处理模块中,通过队列完成所有(单机)进程间通信.通过队列传递的对象 被腌制.因此,尝试通过队列发送更少和/或更小的对象. >不要通过队列发送自我,BigData的实例.它相当大,随着自我数据量的增加而变大: In [6]: import pickle In [14]: len(pickle.dumps(BigData(50))) Out[14]: 1052187 一切
因此,您可以避免通过Queue传递BigData实例 p = pool.apply_async(_do_chunk_wrapper,(k_start,k_end,yi)) (编辑:安卓应用网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
