I have an embarrassingly parallel problem with the following features:

   1. I need to run a pure function on different inputs a lot of times
   2. I need to record the results of *every* call to this function
   3. I need to pass a lot of information as input to this function, and in 
   particular the result needs to match to the particular input. But suppose 
   temporarily that all the inputs are fixed except just for say just a number 
   indexing which computation I'm doing (see example below).

I have successfully run examples of solving these problems on a single 
machine and on a multi-machine cluster with passwordless SSH using three 
distinct approaches. (Apologies for lack of MWEs here; I will add if 
necessary, but should be simple enough and applicable to any example where 
pure_function below is doing a very small amount of work.)


   1. In the single machine case, use a SharedArray, and do 
   @sync @parallel for i in 1:N
       my_shared_array[i] = pure_function(i, [other fixed inputs])
   end
   
   2. In the multi machine case, I can't use a SharedArray. So I can do 
   essentially the identical thing with a DistributedArray, or
   3. Essentially the same thing where instead of using @parallel for, I 
   just use pmap.
   4. (Also another version where I essentially tried manually to recreate 
   @parallel for by sending remote calls, waiting for workers to finish, and 
   then feeding them the next task, but this didn't outperform the others and 
   anyway I'd prefer to stick to the realm of canonical / "safe" approaches.)

When I time these (I'll provide specific benchmarks if necessary) on my 
local machine ONLY, #1 is way faster than either #2 or #3. The slowness of 
#3 in particular is consistent with the documentation, since pure_function 
isn't doing much work--I have relatively a high number of calls, and 
relatively a small amount of work being done in each call--so @parallel 
should be the better solution than pmap.  (Interlude--I've also tried 
another approach, say #1A, where I don't use a SharedArray at all, but just 
use (vcat) as the reduction for @parallel for, as suggested e.g. here 
<https://groups.google.com/forum/#!searchin/julia-users/parallel/julia-users/vUpyqIxz3Hw/DyjlegLTCAAJ>.
 
But this is of course slower than all of these approaches when the array is 
long enough, IIUC b/c calling reduce on vcat is doing a lot of unnecessary 
traversals instead of just "smartly" aggregating within processes and then 
pasting together the completed results of the processes. BTW, I've also 
tried to write a modified @parallel that doesn't try to reduce but just 
collects the values, but I failed to get it to work.) Benchmarking these 
results is slighty complicated by my uncertainty over how much of the 
message-passing overhead is due to passing the inputs to the processes 
versus how much is due to the method of allocating the work to available 
workers versus in turn the overhead of collecting all the results. 

Based on (a) comparing the speed on my local machine of approach #1 to 
approaches #2/#3, I believe SharedArrays are ideal for work on a single 
machine.

Based on (b) comparing the speed of #2 / #3 on my local machine only to the 
speed of #2/#3 when I have multiple machines, using both the all_to_all and 
master_slave topology, it seems that there is a lot of additional overhead 
in sending information across the network to each of the processes on the 
remote *machines*. Note I don't have independent evidence of this; perhaps 
Julia is already "cleverly" in the background deciding that it only needs 
to communicate across the network once per machine rather than once per 
process. But IIUC this is not so, and either way the overhead of 
communicating with additional processes on remote machines is so far so 
high that I basically get no gain from adding them. 

Consider these observations, it seems that the ideal approach would look 
something like this:


   1. Setup a custom network topology where the master process has 
   connections *only* to single "lieutenant" processes on worker machines, and 
   2. Per-machine, the lieutenant process (including the original master 
   process) creates a SharedArray, adds *local* processes, and distributes 
   work to all the *local-only* nodes
   3. Therefore the initial inputs and the per-machine SharedArrays are the 
   only entities passed across the network (minimize network communication, 
   which IIUC is the slowest inter-process communication), and then on a given 
   machine as little as possible is passed between processes.


My questions are:


   1. Is this really the ideal approach? Or is there a better way?
   2. If that really is the ideal approach, how can I do it? I've tried 
   tinkering with the "spawn lieutenant-worker processes on other machines and 
   then make them spawn their own worker processes" idea, but I get errors 
   because the remote lieutenants aren't the master process and don't seem to 
   be "allowed" to spawn more workers; this suggested to me that I must be 
   thinking about the problem the wrong way, since I probably shouldn't have 
   to modify functions in Base in order to get this to work. So (A) literally 
   how can I do it, and (B) is there a way to do it without digging too deep 
   into the implementation, but just solving the problem generically / at a 
   high level?
   3. Has no one else encountered this particular issue? I've seen so many 
   excellent examples of distributed computing in these forums and elsewhere, 
   yet they all stick either with distributed arrays for multi-machine work or 
   SharedArrays for local work, i.e. thinking either about parallelizing 
   across some local processes or setting up a network of machines and 
   parallelizing across a bunch of possibly remote processes, but nothing 
   directly tackling the issue of discriminating among worker processes 
   depending on whether they're local or on a remote machine. 


Addendum...here is a subset of things I've read on the topic, many of which 
have proved useful food for thought, despite comment #3 above.


http://docs.julialang.org/en/latest/manual/parallel-computing/

https://github.com/JuliaLang/julia/pull/11665
https://github.com/JuliaLang/julia/issues/3655

https://github.com/proflage/2015-julia-hands-on

https://groups.google.com/d/topic/julia-users/1sNXYtIbS1Q/discussion
https://groups.google.com/d/topic/julia-users/E8fGIiDwckc/discussion
https://groups.google.com/d/topic/julia-users/W5QIfE7f0O4/discussion
https://groups.google.com/d/topic/julia-users/WJBIAYzrZgg/discussion
https://groups.google.com/d/topic/julia-users/b4tEzOOOnJI/discussion
https://groups.google.com/d/topic/julia-users/fMTwlMJKNVw/discussion
https://groups.google.com/d/topic/julia-users/fe1yZawvvi0/discussion
https://groups.google.com/d/topic/julia-users/u4tJSCX4jxw/discussion
https://groups.google.com/d/topic/julia-users/vUpyqIxz3Hw/discussion
https://groups.google.com/d/topic/julia-users/vt2hS9h36a0/discussion




Reply via email to