One option is :

# Create a RemoteRef on each worker to store the data
rr_files = map(x->RemoteRef(x), workers())

# Create a RemoteRef on each worker to store results
rr_results = map(x->RemoteRef(x), workers())

for i,p in enumerate(workers())
  remotecall_wait(p, load, files_on_ith_worker, rr_files[i])  #
files_on_ith_worker are that part of listoffiles on ith worker
end

for i,p in enumerate(workers())
  remotecall(p, stats, rr_files[i], rr_results[i]) # stats should process
data in the first remoteref and store the result in the second one
end

# fetch, wait and continue processing.



On Fri, Apr 11, 2014 at 8:12 PM, David van Leeuwen <
[email protected]> wrote:

> Hello,
>
> I need to compute aggregate statistics---say, the mean over columns, but
> later these will become more computationally intensive---for a large
> collection of matrices.   These matrices vary in the number of rows, but
> have the same number of columns.
>
> I am looking for a general scheme that can utilise cluster parallel
> execution and will deal with various sorts of granularity of number of size
> of matrices.  The combined size of the matrices may be larger than the
> combined memory of the computing cluster, and I have my matrices in
> separate files.
>
> In my first attempts I load the data in memory in the worker processes,
> compute the statistics, and reduce the results in the main process.  The
> coding is fairly straight forward with pmap().
>
> function stats(x)
> ## compute the statistics
> end
>
> function load(file::String)
> ## load the data
> end
>
> result = pmap(s->stats(load(s)), listoffiles)
> reducethestats(result)
>
> However, for low-complexity stats computations--say, the sum over
> columns---it seems that the system is thrashing terribly because all
> processes start to load the data from (network) disc more/less at the same
> time, and data loading take longer than the computation.  The thrashing
> effect is quite large, I loose a factor of 100 or so over serial
> loading/execution time.  Of course one would not want to parallellize in
> this particular case, but as I said before, the statistics become more
> computationally intensive later in the algorithm, and then the parallel
> computing is very beneficial.
>
> The same data is used in multiple iterations of the algorithm, so it can
> be beneficial to map the same matrices to the same worker processes and
> have OS file caching reduce load times.  So pmap() is probably not a good
> choice, anyway.
>
> My question is: is there an efficient approach where the data is loaded
> synchronously in the worker processes---so that they don't swamp the
> network---an then later compute the stats asynchronously?
>
> One way could be (a simplified example that needs lots of memory in the
> main process)
>
>
> result = pmap(stats, map(load, listoffiles))
>
>
> but this is not efficient as it needs to serialize the loaded data in the
> main process and transfer it to the workers.  And for larger problems
> nothing is cached locally.   There is some better control with remotecall()
> and fetch(), but I don't see a way to leave the loaded data in a
> remotecall() process and use it in a next remotecall() without fetch()ing
> it to the main process.  Maybe I am looking for something like
>
> worker(i) = workers[1 .+ (i % nworkers())]
>
>
> for i in 1:length(workers)
>   remotecall_wait(worker(i), load, listoffiles[i]) ## but keep the data
> in the worker
> end
> for i in 1:length(workers)
>   remotecall(worker(i), stats, ## the remote result)
> end
> ## fetch, and do the rest of the files in a task-based way like pmap()
>
>
> Any ideas how this can be accomplished?
>
> Thanks,
>
> ---david
>
>
>
>
>
>

Reply via email to