Hi David,

Thanks for your reply.
This seems to be exactly what I had in mind (as per my original post).


My original concern was whether shared updates to a common variable within 
`@async` <`@async> would happen in an orderly way.
In my case, its actually quite convenient to return results for each worker.
Just curious as to why you reduce for each worker and then reduce the 
workers at the end. Why not reduce into a final result inside the `@async` 
loop?
That way, if you scale up the number workers, you don't need to keep a data 
chunk for each worker, just one total reduced result.

Coincidently, I read a reply to a query about "threads and processes 
<https://groups.google.com/forum/#!topic/julia-users/n8G08xory9o>" by 
@stevengj yesterday which helped me understand parallel concepts a little 
better, but I'm still unsure about some stuff.
Apparently, `@async <`@async>` runs in green threads which are cooperative 
threads? Does this mean switching between threads only occurs when current 
thread blocks for I/O? As opposed to operating system threads which are 
pre-emptive meaning switching could happen at any time in thread execution?
Since `pmap()` uses `@async`, does this mean switching between threads only 
occurs at `remotecall_fetch()`? In which case it's safe to reduce to a 
single variable inside `@async`? Is this correct? Presumably this wouldn't 
be safe with pre-emptive threads? 


The actual pmap() code seems to be made more robust than the simplified 
version in the documentation. It seems to handle errors, etc.
My modified version of pmap_reduce(), returns an array of reduced results 
for each worker.
The modifications were surprising simple (although I don't really 
understand the error recovery part so I'm not sure what to do in case of 
error)
In any case here it is:

#function pmap(f, lsts...; err_retry=true, err_stop=false)
function pmap_reduce(f, op, lsts...; err_retry=true, err_stop=false)
    len = length(lsts)

    results = Dict{Int,Any}()

    retryqueue = {}
    task_in_err = false
    is_task_in_error() = task_in_err
    set_task_in_error() = (task_in_err = true)

    nextidx = 0
    getnextidx() = (nextidx += 1)

    states = [start(lsts[idx]) for idx in 1:len]
    function getnext_tasklet()
        if is_task_in_error() && err_stop
            return nothing
        elseif !any(idx->done(lsts[idx],states[idx]), 1:len)
            nxts = [next(lsts[idx],states[idx]) for idx in 1:len]
            for idx in 1:len; states[idx] = nxts[idx][2]; end
            nxtvals = [x[1] for x in nxts]
            return (getnextidx(), nxtvals)
        elseif !isempty(retryqueue)
            return shift!(retryqueue)
        else
            return nothing
        end
    end

    @sync begin
        for wpid in workers()
            @async begin
                tasklet = getnext_tasklet()
                while (tasklet != nothing)
                    (idx, fvals) = tasklet
                    try
                        result = remotecall_fetch(wpid, f, fvals...)
                        if isa(result, Exception)
                            ((wpid == myid()) ? rethrow(result) : 
throw(result))
                        else
                            #results[idx] = result
                            if haskey(results, wpid)
                                results[wpid] = op(results[wpid], result)
                            else
                                results[wpid] = result
                            end
                        end
                    catch ex
                        if err_retry
                            push!(retryqueue, (idx, fvals, ex))
                        else
                            #results[idx] = ex
                            # Not really sure what to do here
                            results[0] = ex
                        end
                        set_task_in_error()
                        break # remove this worker from accepting any more 
tasks
                    end
                    tasklet = getnext_tasklet()
                end
            end
        end
    end

    for failure in retryqueue
        #results[failure[1]] = failure[3]
        # Not really sure what to do here
        results[0] = failure[3]
    end

    #[results[x] for x in 1:nextidx]
    [ results[x] for x in sort(collect(keys(results))) ]
end







On Wednesday, September 2, 2015 at 5:30:34 PM UTC+10, David van Leeuwen 
wrote:

> Hi Greg,
>
> On Friday, August 28, 2015 at 6:40:33 AM UTC+2, Greg Plowman wrote:
>>
>> Firstly, I hope it's OK to revive this thread.
>>
>> @parallel doesn't do dynamic load balancing.
>> So I'm still interested in a pmap_reduce() or equivalent which does 
>> dynamic allocation to workers AND reduces on the fly.
>>
>> Just to be clear:
>> The "chunks" of work given to each worker might vary in execution time.
>> The workers might vary in their execution speed.
>>
>> It's difficult to optimally split job into chunks to assign to workers 
>> upfront.
>>
>> I had the same demands for my implementation in GaussianMixtures, 
> training parallel on large amounts of data in chunks of some natural 
> heterogeneous size, on a cluster with heterogeneous CPU power per node. 
>
> If you have a look at 
> https://github.com/davidavdav/GaussianMixtures.jl/blob/master/src/data.jl#L101-136
>  
> there is a function `dmapreduce` that works on chucks of data of type Data. 
>  Data is basically a Vector{Matrix}, but the Matrix can be loaded from disc 
> in my case.  The length of the vector---the number of chunks---can be very 
> large, but in reducing there are never more than the number of workers 
> results stored.  
>  
>
>> Although pmap does assign chunks of work dynamically, it's still not 
>> optimal if the number of chunks is of the same order of the number of 
>> workers.
>> It's very possible a slow worker will be left running a long "chunk". Or 
>> simply that only some workers will be required to finish any remaining 
>> chunks, leaving other workers idle.
>>
>> It seems more granularity may help. i.e. Splitting job into many chunks 
>> so that #chunks >> #workers.
>> But then pmap becomes unviable because it returns an array of results for 
>> each chunk, which requires too much memory.
>>
>
> This is exactly the reason why I reduce in two places: 
>  - in the @async loop there is a reduced for each worker
>  - there is a reducer at the end. 
>
> before, I was running out of memory on a large machine (128G) while 
> reducing a huge list of chunks, for which the results themselves (a tuple 
> of matrices) were quite sizeable.  Now with the `dmapreduce()` the 
> reduction size is limited by the number of workers, and not by the number 
> of chunks (the data). 
>
> Perhaps you can adapt the code a bit for your own purpose, or, if you have 
> your data chunks on disk in a matrix-like format, you can even use the Data 
> type.  
>
> Cheers, 
>
> ---david
>  
>
>> I don't need the intermediate results, so I could reduce on the fly.
>>
>> Hence my interest in a version of pmap which returns the final reduced 
>> result only.
>> Is it possible to comment on my original question about a modified pmap?
>>
>> Or have I misunderstood pmap &/or @parallel? Or are there other options?
>>
>> Thanks, Greg
>>
>>
>>
>> On Friday, July 17, 2015 at 3:09:03 PM UTC+10, Greg Plowman wrote:
>>
>> Thanks Jameson.
>>
>> My error was from the line: trialCounts = MySimulation(trial, numIter)
>> Error message: trialCounts not defined
>>
>> This had something to do with the variable name (used later, perhaps if 
>> statements guarding scope and now local)
>> In any case, if I change variable name, it works: counts = MySimulation(
>> trial, numIter)
>>
>>
>> Thanks again for your help.
>>
>> Greg
>>
>>
>>
>> On Friday, July 17, 2015 at 2:48:44 PM UTC+10, Jameson wrote:
>>
>> i believe that length(chunks) will be <= nworkers()
>>
>> the last statement of the for loop should be the "return" value from that 
>> iteration. (for example: the variable name `trialCount`).
>>
>> On Fri, Jul 17, 2015 at 12:12 AM Greg Plowman <[email protected]> 
>> wrote:
>>
>> OK thanks.
>> I didn't consider @parallel (probably because I considered it for only 
>> large trials of small work units, whereas I considered pmap more suited to 
>> relatively small trials of longer running work units)
>> In any case, @parallel works fine.
>>
>> Old pmap code skeleton:
>> trialCounts = pmap(MySimulation, [1:numTrials], fill(numIter, numTrials))
>> totalCounts = sum(trialCounts)
>>
>> New @parallel code
>> totalCounts = @parallel (+) for trial = 1:numTrials
>>     MySimulation(trial, numIter)
>> end
>>
>>
>>
>> However, I have 2 questions:
>>
>> 1. When I try to modify @parallel code to assign the result to a variable 
>> inside the loop, I get an error.
>> I don't understand the @parallel macro, but I'm guessing I can't assign 
>> to variable inside loop?
>>  
>> totalCounts = @parallel (+) for trial = 1:numTrials
>>     trialCount = MySimulation(trial, numPlays)
>>     print(trialCount) # or some other processing with trialCount
>> end
>>
>>
>>
>> 2. Again I don't @parallel macro but it seems to call preduce (see 
>> below), which seems to collect results in an array of size numTrials / 
>> nworkers().
>> If this is so, then memory requirement still has a dependency on the 
>> number of trials.
>> I was trying to limit the results array to the number of workers, 
>> independent of number of trials.
>> Is my understanding here correct?
>>
>> function preduce(reducer, f, N::Int)
>>     chunks = splitrange(N, nworkers())
>>     results = cell(length(chunks))
>>     for i in 1:length(chunks)
>>         results[i] = @spawn f(first(chunks[i]), last(chunks[i]))
>>     end
>>     mapreduce(fetch, reducer, results)
>> end
>>
>>
>>
>> Greg
>>
>>
>>
>>
>>
>>
>>
>>
>>  
>>
>> On Friday, July 10, 2015 at 12:24:16 PM UTC+10, Jameson wrote:
>>
>> this sounds like you may be looking for the `@parallel reduce_fn for itm 
>> = lst; f(itm); end` map-reducer construct (described on the same page)?
>>
>> On Thu, Jul 9, 2015 at 9:23 PM Greg Plowman <[email protected]> wrote:
>>
>> I have been using pmap for simulations and find it very useful 
>> and convenient.
>> However, sometimes I want to run a large number of trials where the 
>> results are also large. This requires a lot of memory to hold the returned 
>> results.
>> If I'm only interested the final, reduced result, and not concerned with 
>> the raw individual trial results, then returning entire array seems 
>> unnecessary.
>> I want to reduce on the fly, avoiding the need to keep all trial results.
>> I want to run more trials than workers for load balancing. (And possibly 
>> because I'm interested in summary results of individual trials, not the 
>> entire raw results).
>>
>> With the help of the simplified version of pmap presented in the docs (
>> http://julia.readthedocs.org/en/latest/manual/parallel-computing/), I 
>> have a tenuous understanding of how pmap works. Although the actual 
>> implementation scares me.
>> In any case, I was wondering before I progress further, whether a 
>> modified version of pmap could be designed to reduce on-the-fly.
>> Here are some modifications to the simplified, documentation version.
>> Would something like this work? I'm worried about the shared updates to 
>> final_result. Will these happen orderly? What else should I consider?
>>
>>
>> * function pmap(f, lst)
>>
>> * function pmap_reduce(f, lst, reduce_fn)  # extra argument is reduce 
>> function 
>>     np = nprocs()  # determine the number of processes available
>>     n = length(lst)
>>
>>
>> *   results = cell(n)
>> *   results = cell(np)  # hold results for currently executing procs only
>> *   final_result = cell(1)  # holds the final, reduced result
>>
>>     i = 1
>>     # function to produce the next work item from the queue.
>>     # in this case it's just an index.
>>     nextidx() = (idx=i; i+=1; idx)
>>
>>     @sync begin
>>         for p=1:np
>>             if p != myid() || np == 
>>
>> ...
>
>

Reply via email to