Hello, 

I need to compute aggregate statistics---say, the mean over columns, but 
later these will become more computationally intensive---for a large 
collection of matrices.   These matrices vary in the number of rows, but 
have the same number of columns.  

I am looking for a general scheme that can utilise cluster parallel 
execution and will deal with various sorts of granularity of number of size 
of matrices.  The combined size of the matrices may be larger than the 
combined memory of the computing cluster, and I have my matrices in 
separate files. 

In my first attempts I load the data in memory in the worker processes, 
compute the statistics, and reduce the results in the main process.  The 
coding is fairly straight forward with pmap(). 

function stats(x)
## compute the statistics
end

function load(file::String)
## load the data
end

result = pmap(s->stats(load(s)), listoffiles)
reducethestats(result)

However, for low-complexity stats computations--say, the sum over 
columns---it seems that the system is thrashing terribly because all 
processes start to load the data from (network) disc more/less at the same 
time, and data loading take longer than the computation.  The thrashing 
effect is quite large, I loose a factor of 100 or so over serial 
loading/execution time.  Of course one would not want to parallellize in 
this particular case, but as I said before, the statistics become more 
computationally intensive later in the algorithm, and then the parallel 
computing is very beneficial. 

The same data is used in multiple iterations of the algorithm, so it can be 
beneficial to map the same matrices to the same worker processes and have 
OS file caching reduce load times.  So pmap() is probably not a good 
choice, anyway.  

My question is: is there an efficient approach where the data is loaded 
synchronously in the worker processes---so that they don't swamp the 
network---an then later compute the stats asynchronously?

One way could be (a simplified example that needs lots of memory in the 
main process)


result = pmap(stats, map(load, listoffiles))


but this is not efficient as it needs to serialize the loaded data in the 
main process and transfer it to the workers.  And for larger problems 
nothing is cached locally.   There is some better control with remotecall() 
and fetch(), but I don't see a way to leave the loaded data in a 
remotecall() process and use it in a next remotecall() without fetch()ing 
it to the main process.  Maybe I am looking for something like

worker(i) = workers[1 .+ (i % nworkers())]


for i in 1:length(workers)
  remotecall_wait(worker(i), load, listoffiles[i]) ## but keep the data in 
the worker
end
for i in 1:length(workers)
  remotecall(worker(i), stats, ## the remote result)
end
## fetch, and do the rest of the files in a task-based way like pmap()


Any ideas how this can be accomplished?

Thanks, 

---david





Reply via email to