One option is : # Create a RemoteRef on each worker to store the data rr_files = map(x->RemoteRef(x), workers())
# Create a RemoteRef on each worker to store results rr_results = map(x->RemoteRef(x), workers()) for i,p in enumerate(workers()) remotecall_wait(p, load, files_on_ith_worker, rr_files[i]) # files_on_ith_worker are that part of listoffiles on ith worker end for i,p in enumerate(workers()) remotecall(p, stats, rr_files[i], rr_results[i]) # stats should process data in the first remoteref and store the result in the second one end # fetch, wait and continue processing. On Fri, Apr 11, 2014 at 8:12 PM, David van Leeuwen < [email protected]> wrote: > Hello, > > I need to compute aggregate statistics---say, the mean over columns, but > later these will become more computationally intensive---for a large > collection of matrices. These matrices vary in the number of rows, but > have the same number of columns. > > I am looking for a general scheme that can utilise cluster parallel > execution and will deal with various sorts of granularity of number of size > of matrices. The combined size of the matrices may be larger than the > combined memory of the computing cluster, and I have my matrices in > separate files. > > In my first attempts I load the data in memory in the worker processes, > compute the statistics, and reduce the results in the main process. The > coding is fairly straight forward with pmap(). > > function stats(x) > ## compute the statistics > end > > function load(file::String) > ## load the data > end > > result = pmap(s->stats(load(s)), listoffiles) > reducethestats(result) > > However, for low-complexity stats computations--say, the sum over > columns---it seems that the system is thrashing terribly because all > processes start to load the data from (network) disc more/less at the same > time, and data loading take longer than the computation. The thrashing > effect is quite large, I loose a factor of 100 or so over serial > loading/execution time. Of course one would not want to parallellize in > this particular case, but as I said before, the statistics become more > computationally intensive later in the algorithm, and then the parallel > computing is very beneficial. > > The same data is used in multiple iterations of the algorithm, so it can > be beneficial to map the same matrices to the same worker processes and > have OS file caching reduce load times. So pmap() is probably not a good > choice, anyway. > > My question is: is there an efficient approach where the data is loaded > synchronously in the worker processes---so that they don't swamp the > network---an then later compute the stats asynchronously? > > One way could be (a simplified example that needs lots of memory in the > main process) > > > result = pmap(stats, map(load, listoffiles)) > > > but this is not efficient as it needs to serialize the loaded data in the > main process and transfer it to the workers. And for larger problems > nothing is cached locally. There is some better control with remotecall() > and fetch(), but I don't see a way to leave the loaded data in a > remotecall() process and use it in a next remotecall() without fetch()ing > it to the main process. Maybe I am looking for something like > > worker(i) = workers[1 .+ (i % nworkers())] > > > for i in 1:length(workers) > remotecall_wait(worker(i), load, listoffiles[i]) ## but keep the data > in the worker > end > for i in 1:length(workers) > remotecall(worker(i), stats, ## the remote result) > end > ## fetch, and do the rest of the files in a task-based way like pmap() > > > Any ideas how this can be accomplished? > > Thanks, > > ---david > > > > > >
