Hi Greg, On Friday, August 28, 2015 at 6:40:33 AM UTC+2, Greg Plowman wrote: > > Firstly, I hope it's OK to revive this thread. > > @parallel doesn't do dynamic load balancing. > So I'm still interested in a pmap_reduce() or equivalent which does > dynamic allocation to workers AND reduces on the fly. > > Just to be clear: > The "chunks" of work given to each worker might vary in execution time. > The workers might vary in their execution speed. > > It's difficult to optimally split job into chunks to assign to workers > upfront. > > I had the same demands for my implementation in GaussianMixtures, training parallel on large amounts of data in chunks of some natural heterogeneous size, on a cluster with heterogeneous CPU power per node.
If you have a look at https://github.com/davidavdav/GaussianMixtures.jl/blob/master/src/data.jl#L101-136 there is a function `dmapreduce` that works on chucks of data of type Data. Data is basically a Vector{Matrix}, but the Matrix can be loaded from disc in my case. The length of the vector---the number of chunks---can be very large, but in reducing there are never more than the number of workers results stored. > Although pmap does assign chunks of work dynamically, it's still not > optimal if the number of chunks is of the same order of the number of > workers. > It's very possible a slow worker will be left running a long "chunk". Or > simply that only some workers will be required to finish any remaining > chunks, leaving other workers idle. > > It seems more granularity may help. i.e. Splitting job into many chunks so > that #chunks >> #workers. > But then pmap becomes unviable because it returns an array of results for > each chunk, which requires too much memory. > This is exactly the reason why I reduce in two places: - in the @async loop there is a reduced for each worker - there is a reducer at the end. before, I was running out of memory on a large machine (128G) while reducing a huge list of chunks, for which the results themselves (a tuple of matrices) were quite sizeable. Now with the `dmapreduce()` the reduction size is limited by the number of workers, and not by the number of chunks (the data). Perhaps you can adapt the code a bit for your own purpose, or, if you have your data chunks on disk in a matrix-like format, you can even use the Data type. Cheers, ---david > I don't need the intermediate results, so I could reduce on the fly. > > Hence my interest in a version of pmap which returns the final reduced > result only. > Is it possible to comment on my original question about a modified pmap? > > Or have I misunderstood pmap &/or @parallel? Or are there other options? > > Thanks, Greg > > > > On Friday, July 17, 2015 at 3:09:03 PM UTC+10, Greg Plowman wrote: > > Thanks Jameson. > > My error was from the line: trialCounts = MySimulation(trial, numIter) > Error message: trialCounts not defined > > This had something to do with the variable name (used later, perhaps if > statements guarding scope and now local) > In any case, if I change variable name, it works: counts = MySimulation( > trial, numIter) > > > Thanks again for your help. > > Greg > > > > On Friday, July 17, 2015 at 2:48:44 PM UTC+10, Jameson wrote: > > i believe that length(chunks) will be <= nworkers() > > the last statement of the for loop should be the "return" value from that > iteration. (for example: the variable name `trialCount`). > > On Fri, Jul 17, 2015 at 12:12 AM Greg Plowman <[email protected]> wrote: > > OK thanks. > I didn't consider @parallel (probably because I considered it for only > large trials of small work units, whereas I considered pmap more suited to > relatively small trials of longer running work units) > In any case, @parallel works fine. > > Old pmap code skeleton: > trialCounts = pmap(MySimulation, [1:numTrials], fill(numIter, numTrials)) > totalCounts = sum(trialCounts) > > New @parallel code > totalCounts = @parallel (+) for trial = 1:numTrials > MySimulation(trial, numIter) > end > > > > However, I have 2 questions: > > 1. When I try to modify @parallel code to assign the result to a variable > inside the loop, I get an error. > I don't understand the @parallel macro, but I'm guessing I can't assign to > variable inside loop? > > totalCounts = @parallel (+) for trial = 1:numTrials > trialCount = MySimulation(trial, numPlays) > print(trialCount) # or some other processing with trialCount > end > > > > 2. Again I don't @parallel macro but it seems to call preduce (see below), > which seems to collect results in an array of size numTrials / nworkers(). > If this is so, then memory requirement still has a dependency on the > number of trials. > I was trying to limit the results array to the number of workers, > independent of number of trials. > Is my understanding here correct? > > function preduce(reducer, f, N::Int) > chunks = splitrange(N, nworkers()) > results = cell(length(chunks)) > for i in 1:length(chunks) > results[i] = @spawn f(first(chunks[i]), last(chunks[i])) > end > mapreduce(fetch, reducer, results) > end > > > > Greg > > > > > > > > > > > On Friday, July 10, 2015 at 12:24:16 PM UTC+10, Jameson wrote: > > this sounds like you may be looking for the `@parallel reduce_fn for itm = > lst; f(itm); end` map-reducer construct (described on the same page)? > > On Thu, Jul 9, 2015 at 9:23 PM Greg Plowman <[email protected]> wrote: > > I have been using pmap for simulations and find it very useful > and convenient. > However, sometimes I want to run a large number of trials where the > results are also large. This requires a lot of memory to hold the returned > results. > If I'm only interested the final, reduced result, and not concerned with > the raw individual trial results, then returning entire array seems > unnecessary. > I want to reduce on the fly, avoiding the need to keep all trial results. > I want to run more trials than workers for load balancing. (And possibly > because I'm interested in summary results of individual trials, not the > entire raw results). > > With the help of the simplified version of pmap presented in the docs ( > http://julia.readthedocs.org/en/latest/manual/parallel-computing/), I > have a tenuous understanding of how pmap works. Although the actual > implementation scares me. > In any case, I was wondering before I progress further, whether a modified > version of pmap could be designed to reduce on-the-fly. > Here are some modifications to the simplified, documentation version. > Would something like this work? I'm worried about the shared updates to > final_result. Will these happen orderly? What else should I consider? > > > * function pmap(f, lst) > > * function pmap_reduce(f, lst, reduce_fn) # extra argument is reduce > function > np = nprocs() # determine the number of processes available > n = length(lst) > > > * results = cell(n) > * results = cell(np) # hold results for currently executing procs only > * final_result = cell(1) # holds the final, reduced result > > i = 1 > # function to produce the next work item from the queue. > # in this case it's just an index. > nextidx() = (idx=i; i+=1; idx) > > @sync begin > for p=1:np > if p != myid() || np == > > ...
