Hi Greg,

On Friday, August 28, 2015 at 6:40:33 AM UTC+2, Greg Plowman wrote:
>
> Firstly, I hope it's OK to revive this thread.
>
> @parallel doesn't do dynamic load balancing.
> So I'm still interested in a pmap_reduce() or equivalent which does 
> dynamic allocation to workers AND reduces on the fly.
>
> Just to be clear:
> The "chunks" of work given to each worker might vary in execution time.
> The workers might vary in their execution speed.
>
> It's difficult to optimally split job into chunks to assign to workers 
> upfront.
>
> I had the same demands for my implementation in GaussianMixtures, training 
parallel on large amounts of data in chunks of some natural heterogeneous 
size, on a cluster with heterogeneous CPU power per node. 

If you have a look at 
https://github.com/davidavdav/GaussianMixtures.jl/blob/master/src/data.jl#L101-136
 
there is a function `dmapreduce` that works on chucks of data of type Data. 
 Data is basically a Vector{Matrix}, but the Matrix can be loaded from disc 
in my case.  The length of the vector---the number of chunks---can be very 
large, but in reducing there are never more than the number of workers 
results stored.  
 

> Although pmap does assign chunks of work dynamically, it's still not 
> optimal if the number of chunks is of the same order of the number of 
> workers.
> It's very possible a slow worker will be left running a long "chunk". Or 
> simply that only some workers will be required to finish any remaining 
> chunks, leaving other workers idle.
>
> It seems more granularity may help. i.e. Splitting job into many chunks so 
> that #chunks >> #workers.
> But then pmap becomes unviable because it returns an array of results for 
> each chunk, which requires too much memory.
>

This is exactly the reason why I reduce in two places: 
 - in the @async loop there is a reduced for each worker
 - there is a reducer at the end. 

before, I was running out of memory on a large machine (128G) while 
reducing a huge list of chunks, for which the results themselves (a tuple 
of matrices) were quite sizeable.  Now with the `dmapreduce()` the 
reduction size is limited by the number of workers, and not by the number 
of chunks (the data). 

Perhaps you can adapt the code a bit for your own purpose, or, if you have 
your data chunks on disk in a matrix-like format, you can even use the Data 
type.  

Cheers, 

---david
 

> I don't need the intermediate results, so I could reduce on the fly.
>
> Hence my interest in a version of pmap which returns the final reduced 
> result only.
> Is it possible to comment on my original question about a modified pmap?
>
> Or have I misunderstood pmap &/or @parallel? Or are there other options?
>
> Thanks, Greg
>
>
>
> On Friday, July 17, 2015 at 3:09:03 PM UTC+10, Greg Plowman wrote:
>
> Thanks Jameson.
>
> My error was from the line: trialCounts = MySimulation(trial, numIter)
> Error message: trialCounts not defined
>
> This had something to do with the variable name (used later, perhaps if 
> statements guarding scope and now local)
> In any case, if I change variable name, it works: counts = MySimulation(
> trial, numIter)
>
>
> Thanks again for your help.
>
> Greg
>
>
>
> On Friday, July 17, 2015 at 2:48:44 PM UTC+10, Jameson wrote:
>
> i believe that length(chunks) will be <= nworkers()
>
> the last statement of the for loop should be the "return" value from that 
> iteration. (for example: the variable name `trialCount`).
>
> On Fri, Jul 17, 2015 at 12:12 AM Greg Plowman <[email protected]> wrote:
>
> OK thanks.
> I didn't consider @parallel (probably because I considered it for only 
> large trials of small work units, whereas I considered pmap more suited to 
> relatively small trials of longer running work units)
> In any case, @parallel works fine.
>
> Old pmap code skeleton:
> trialCounts = pmap(MySimulation, [1:numTrials], fill(numIter, numTrials))
> totalCounts = sum(trialCounts)
>
> New @parallel code
> totalCounts = @parallel (+) for trial = 1:numTrials
>     MySimulation(trial, numIter)
> end
>
>
>
> However, I have 2 questions:
>
> 1. When I try to modify @parallel code to assign the result to a variable 
> inside the loop, I get an error.
> I don't understand the @parallel macro, but I'm guessing I can't assign to 
> variable inside loop?
>  
> totalCounts = @parallel (+) for trial = 1:numTrials
>     trialCount = MySimulation(trial, numPlays)
>     print(trialCount) # or some other processing with trialCount
> end
>
>
>
> 2. Again I don't @parallel macro but it seems to call preduce (see below), 
> which seems to collect results in an array of size numTrials / nworkers().
> If this is so, then memory requirement still has a dependency on the 
> number of trials.
> I was trying to limit the results array to the number of workers, 
> independent of number of trials.
> Is my understanding here correct?
>
> function preduce(reducer, f, N::Int)
>     chunks = splitrange(N, nworkers())
>     results = cell(length(chunks))
>     for i in 1:length(chunks)
>         results[i] = @spawn f(first(chunks[i]), last(chunks[i]))
>     end
>     mapreduce(fetch, reducer, results)
> end
>
>
>
> Greg
>
>
>
>
>
>
>
>
>  
>
> On Friday, July 10, 2015 at 12:24:16 PM UTC+10, Jameson wrote:
>
> this sounds like you may be looking for the `@parallel reduce_fn for itm = 
> lst; f(itm); end` map-reducer construct (described on the same page)?
>
> On Thu, Jul 9, 2015 at 9:23 PM Greg Plowman <[email protected]> wrote:
>
> I have been using pmap for simulations and find it very useful 
> and convenient.
> However, sometimes I want to run a large number of trials where the 
> results are also large. This requires a lot of memory to hold the returned 
> results.
> If I'm only interested the final, reduced result, and not concerned with 
> the raw individual trial results, then returning entire array seems 
> unnecessary.
> I want to reduce on the fly, avoiding the need to keep all trial results.
> I want to run more trials than workers for load balancing. (And possibly 
> because I'm interested in summary results of individual trials, not the 
> entire raw results).
>
> With the help of the simplified version of pmap presented in the docs (
> http://julia.readthedocs.org/en/latest/manual/parallel-computing/), I 
> have a tenuous understanding of how pmap works. Although the actual 
> implementation scares me.
> In any case, I was wondering before I progress further, whether a modified 
> version of pmap could be designed to reduce on-the-fly.
> Here are some modifications to the simplified, documentation version.
> Would something like this work? I'm worried about the shared updates to 
> final_result. Will these happen orderly? What else should I consider?
>
>
> * function pmap(f, lst)
>
> * function pmap_reduce(f, lst, reduce_fn)  # extra argument is reduce 
> function 
>     np = nprocs()  # determine the number of processes available
>     n = length(lst)
>
>
> *   results = cell(n)
> *   results = cell(np)  # hold results for currently executing procs only
> *   final_result = cell(1)  # holds the final, reduced result
>
>     i = 1
>     # function to produce the next work item from the queue.
>     # in this case it's just an index.
>     nextidx() = (idx=i; i+=1; idx)
>
>     @sync begin
>         for p=1:np
>             if p != myid() || np == 
>
> ...

Reply via email to