Hi,
I was trying to parallelize some algorithms and needed a writable
array shared between processes. It turned out to be quite simple and
gave a nice speed up almost linear in number of cores. Of course you
need to know what you are doing to avoid segfaults and such. But I
still think something like this should be included with NumPy for
power users.
This works by inheriting anonymous mmaped memory. Not sure if this
works on windows.
import numpy as np
import multiprocessing as mp
class shared(np.ndarray):
"""Shared writable array"""
def __new__(subtype, shape, interface=None):
size = np.prod(shape)
if interface == None:
buffer = mp.RawArray('d', size)
self = np.ndarray.__new__(subtype, shape, float, buffer)
else:
class Dummy(object): pass
buffer = Dummy()
buffer.__array_interface__ = interface
a = np.asarray(buffer)
self = np.ndarray.__new__(subtype, shape=a.shape, buffer=a)
return self
def __reduce_ex__(self, protocol):
return shared, (self.shape, self.__array_interface__)
def __reduce__(self):
return __reduce_ex__(self, 0)
Also see attached file for example usage.
Erik
#!/usr/bin/env python
import numpy as np
import multiprocessing as mp
class shared(np.ndarray):
"""Shared writable array"""
def __new__(subtype, shape, interface=None):
size = np.prod(shape)
if interface == None:
buffer = mp.RawArray('d', size)
self = np.ndarray.__new__(subtype, shape, float, buffer)
else:
class Dummy(object): pass
buffer = Dummy()
buffer.__array_interface__ = interface
a = np.asarray(buffer)
self = np.ndarray.__new__(subtype, shape=a.shape, buffer=a)
return self
def __reduce_ex__(self, protocol):
return shared, (self.shape, self.__array_interface__)
def __reduce__(self):
return __reduce_ex__(self, 0)
def poolmap(func, iterable, chunksize=None):
p = mp.Pool()
return p.map(func, iterable, chunksize)
def std((a,o,i)):
o[i] = np.std(a[i])
def parallel_std(a):
o = shared(a.shape[0])
poolmap(std, [(a, o, i) for i in range(a.shape[0])])
return o
def main():
a = shared((1000,10000))
a[:,:] = np.random.rand(1000,10000)
import timeit
print timeit.timeit(lambda: np.std(a,1), number=100)/100
print timeit.timeit(lambda: parallel_std(a), number=100)/100
#parallel_std(a)
#np.std(a,1)
if __name__ == "__main__":
main()
_______________________________________________
NumPy-Discussion mailing list
[email protected]
http://mail.scipy.org/mailman/listinfo/numpy-discussion