Here seems to be a safer implementation... but I'm still not quite
confident it's optimally written. Opinions welcome!
function subdivide(v0,v1,filter::Function,middle::Function,doer::Function)
c = Channel(1000000)
results = Any[doer(v0),doer(v1)]
filter(v0,v1) || return results # nothing to subdivide
put!(c,(results[1],results[2]))
np = nprocs() # determine the number of processes available
idle = Array(Bool,np)
idle[myid()] = true
@sync begin
for p=1:np
if p != myid() || np == 1
@async begin
while true
if isready(c)
idle[p] = false
(d0,d1) = take!(c)
v = middle(d0,d1)
info("Doing task $v on process $p, queue
$c...\n")
d = remotecall_fetch(p,doer,v)
push!(results,d)
filter(d0,d) && put!(c,(d0,d))
filter(d,d1) && put!(c,(d,d1))
else
idle[p] = true
if all(idle) break end
yield()
end
end
end
end
end
end
results
end
On Friday, 4 September 2015 09:30:50 UTC+2, Laurent Bartholdi wrote:
>
> Hello world,
> I'm completely new to Julia, and advice/comments are welcome. I'd like to
> run calculations (~ 1 second per job) distributed along the different cores
> of my CPU. Each calculation may trigger new calculations.
> More specifically, each calculation works on a number in an interval, and
> may trigger a subdivision of the interval.
>
> I thought about the following implementation:
>
> function
> subdivide(v0::Number,v1::Number,filter::Function,middle::Function,doer::Function)
> c = Channel(1000)
> results = Any[doer(v0),doer(v1)]
> put!(c,(results[1],results[2]))
> np = nprocs() # determine the number of processes available
> @sync begin
> for p=1:np
> if p != myid() || np == 1
> @async begin
> for (d0,d1)=c
> v = middle(d0,d1)
> #println("Doing $v on process $p")
> d = remotecall_fetch(p,doer,v)
> push!(results,d)
> filter(d0,d) && put!(c,(d0,d))
> filter(d,d1) && put!(c,(d,d1))
> isready(c) || break
> end
> end
> end
> end
> end
> results
> end
>
> typical use:
>
> julia>
> subdivide(0,10,(x,y)->x[1]<y[1]-0.7,(x,y)->(x[1]+y[1])/2,v->[v,string(v)])
> 17-element Array{Any,1}:
> Any[0,"0"]
> Any[10,"10"]
> ⋮
> Any[8.125,"8.125"]
> Any[9.375,"9.375"]
>
> As you see, the function "subdivide" receives a "doer" which computes a
> bit of data, and a "filter" which decides if a middle point is required
> between data points. "middle" computes the argument of a middle point.
>
> My worry is: certainly there will be race conditions when a calculation
> has been removed from the channel, but the new calculations will not yet
> have been put in; then one of the cores will stop prematurely. The last
> "isready(c)" could also, potentially, return true but the upcoming "for
> (d0,d1)=c" will block because meanwhile a calculation has been removed by
> another (green) thread.
>
> How should I program this safely? It seems like a useful primitive, so
> maybe it already exists...
>
> Thanks in advance, Laurent
>