Hi John,
On Oct 20, 2009, at 10:11 AM, John Shea wrote:
Dear All,
(sorry for the long post)
I have been experimenting a bit with GCD in MacRuby 0.5 beta.
Cool!
I have managed to get some code working as I would expect it to
work, and I am wondering if a few of you could provide a sanity
check that everything i am doing is above board and not the result
of unspecified behaviour.
My main concern is that when running a set of tasks and collecting
the results that the collecting itself is not thread safe (in this
case the << operator). And if it is not thread safe - then what is
the best way to collect results from dispatched tasks.
Yes, appending objects to an array from multiple threads is definitely
not going to work as you would expect.
Typically, you would use a Mutex object to synchronize access to the
shared variable, but in the GCD world the most common way I believe is
to use a sequential queue.
ary = []
ary_queue = Queue.new('com.apple.ary')
100.times do
Queue.concurrent.dispatch do
n = heavy_compute
ary_queue.dispatch { ary << n }
end
end
This way, the heavy_compute code is executed in parallel and appending
the result to ary is done sequentially.
But so far it seems to work magically well.
Before I put the code down - here are some initial discoveries
(which may have been obvious to everyone else).
1. When running tests you need wait until the threads are finished -
otherwise the main loop will finish before the output comes back
from other threads.
I did that with a sleep, but also experimented with looping until an
array is filled.
In case you do asynchronous dispatch calls to a sequential queue (that
you created yourself), doing a synchronous call passing an empty block
will block until asynchronous blocks are completed.
q = Queue.new('com.apple.ary')
100.times { q.dispatch { heavy_compute } }
q.dispatch(true) {} # wait
In case you use the concurrent queue, AFAIK the only way to wait is to
use a Group.
g = Dispatch::Group.new
100.times { g.dispatch(Dispatch::Queue.concurrent) { heavy_compute } }
g.wait
2. Scope: Given one of the global system queues :
gcdq = Dispatch::Queue.concurrent(:default)
and looping over a number of dispatches:
5.times do |index|
gcdq.dispatch {puts index}
end
the task dispatched will hold the last value of index - it will not
capture the changing value of index - I assume that the task has a
pointer directly to the index variable.
(an article by Bill Bumgarner http://www.friday.com/bbum/2009/08/29/blocks-tips-tricks/
pointed out something similar in objC - but had a different way of
solving this)
So to capture that changing index, I created a Proc then turned it
back to a block, eg:
5.times do |index|
task = Proc.new {puts index}
gcdq.dispatch &task
end
Good catch, it's actually a problem that needs to be solved. It also
reproduces with normal Thread objects:
100.times { |i| Thread.new { p i } }
The current workaround for Thread is to pass the variables to #new.
100.times { |i| Thread.new(i) { |i| p i } }
For GCD, I think your idea is the best.
I have been considering a few ways to solve this problem, a naive one
would be to copy the dynamic variables upon the block preparation if
the compiler gives us enough hints.
3. You will notice that in the code below in check = a.run-disp2
that the array is returned - I assume empty - and is gradually
filled by the tasks.
class A
def run_disp(an_array)
gcdq = Dispatch::Queue.concurrent(:default)
["f","g","h","i", "j"].each_with_index do |val, index|
task = Proc.new {an_array << do_something(val, index)}
gcdq.dispatch &task
puts "Loaded #{index}"
end
end
def run_disp2
an_array = []
gcdq = Dispatch::Queue.concurrent(:default)
["f","g","h","i", "j"].each_with_index do |val, index|
task = Proc.new {an_array << do_something(val, index)}
gcdq.dispatch &task
puts "Loaded #{index}"
end
an_array
end
def do_something(val, index)
#staggered sleeps so as to prove that various tasks finish first
result = val * (index + 1)
case index
when 0, 1
sleep 2
when 2
sleep 1
else
sleep 0
end
puts "yeah baby! #{index}"
result
end
end
a = A.new
#pass array to be filled
an_array = []
a.run_disp(an_array)
sleep 10
puts "passed array: #{an_array.join(", ")}"
check = a.run_disp2
sleep 10
puts "returned array: #{check.join(", ")}"
#results:
# Loaded 0
# Loaded 1
# Loaded 2
# Loaded 3
# Loaded 4
# yeah baby! 4yeah baby! 3
#
# yeah baby! 2
# yeah baby! 1
# yeah baby! 0
# passed array: iiii, jjjjj, hhh, gg, f
# Loaded 0
# Loaded 1
# Loaded 2
# Loaded 3
# Loaded 4
# yeah baby! 3
# yeah baby! 4
# yeah baby! 2
# yeah baby! 0
# yeah baby! 1
# returned array: iiii, jjjjj, hhh, f, gg
#assume puts is not transactional.
I think you definitely want to synchronize the array access (using a
sequential queue).
As for #puts & IO friends, definitely :)
Thanks for trying it out. Would you be interested to share your
experiments later, in a website article?
Laurent
_______________________________________________
MacRuby-devel mailing list
MacRuby-devel@lists.macosforge.org
http://lists.macosforge.org/mailman/listinfo.cgi/macruby-devel