Hi John,

On Oct 20, 2009, at 10:11 AM, John Shea wrote:

Dear All,

(sorry for the long post)

I have been experimenting a bit with GCD in MacRuby 0.5 beta.

Cool!

I have managed to get some code working as I would expect it to work, and I am wondering if a few of you could provide a sanity check that everything i am doing is above board and not the result of unspecified behaviour.

My main concern is that when running a set of tasks and collecting the results that the collecting itself is not thread safe (in this case the << operator). And if it is not thread safe - then what is the best way to collect results from dispatched tasks.

Yes, appending objects to an array from multiple threads is definitely not going to work as you would expect.

Typically, you would use a Mutex object to synchronize access to the shared variable, but in the GCD world the most common way I believe is to use a sequential queue.

ary = []
ary_queue = Queue.new('com.apple.ary')

100.times do
  Queue.concurrent.dispatch do
      n = heavy_compute
      ary_queue.dispatch { ary << n }
  end
end

This way, the heavy_compute code is executed in parallel and appending the result to ary is done sequentially.

But so far it seems to work magically well.

Before I put the code down - here are some initial discoveries (which may have been obvious to everyone else).

1. When running tests you need wait until the threads are finished - otherwise the main loop will finish before the output comes back from other threads. I did that with a sleep, but also experimented with looping until an array is filled.

In case you do asynchronous dispatch calls to a sequential queue (that you created yourself), doing a synchronous call passing an empty block will block until asynchronous blocks are completed.

q = Queue.new('com.apple.ary')
100.times { q.dispatch { heavy_compute } }
q.dispatch(true) {} # wait

In case you use the concurrent queue, AFAIK the only way to wait is to use a Group.

g = Dispatch::Group.new
100.times { g.dispatch(Dispatch::Queue.concurrent) { heavy_compute } }
g.wait

2. Scope: Given one of the global system queues :               
gcdq = Dispatch::Queue.concurrent(:default)

and looping over a number of dispatches:
5.times do |index|
        gcdq.dispatch {puts index}
end
the task dispatched will hold the last value of index - it will not capture the changing value of index - I assume that the task has a pointer directly to the index variable. (an article by Bill Bumgarner http://www.friday.com/bbum/2009/08/29/blocks-tips-tricks/ pointed out something similar in objC - but had a different way of solving this)

So to capture that changing index, I created a Proc then turned it back to a block, eg:
5.times do |index|
        task = Proc.new {puts index}
        gcdq.dispatch &task
end

Good catch, it's actually a problem that needs to be solved. It also reproduces with normal Thread objects:

100.times { |i| Thread.new { p i } }

The current workaround for Thread is to pass the variables to #new.

100.times { |i| Thread.new(i) { |i| p i } }

For GCD, I think your idea is the best.

I have been considering a few ways to solve this problem, a naive one would be to copy the dynamic variables upon the block preparation if the compiler gives us enough hints.

3. You will notice that in the code below in check = a.run-disp2 that the array is returned - I assume empty - and is gradually filled by the tasks.

class A
  def run_disp(an_array)
    gcdq = Dispatch::Queue.concurrent(:default)
    ["f","g","h","i", "j"].each_with_index do |val, index|
       task = Proc.new {an_array  << do_something(val, index)}
       gcdq.dispatch &task
      puts "Loaded #{index}"
    end
  end

  def run_disp2
    an_array = []
    gcdq = Dispatch::Queue.concurrent(:default)
    ["f","g","h","i", "j"].each_with_index do |val, index|
       task = Proc.new {an_array  << do_something(val, index)}
       gcdq.dispatch &task
      puts "Loaded #{index}"
    end
    an_array
  end

  def do_something(val, index)
    #staggered sleeps so as to prove that various tasks finish first
    result = val * (index + 1)
    case index
    when 0, 1
      sleep 2
    when 2
      sleep 1
    else
      sleep 0
    end
    puts "yeah baby! #{index}"
    result
  end
end

a = A.new
#pass array to be filled
an_array = []
a.run_disp(an_array)
sleep 10
puts "passed array: #{an_array.join(", ")}"

check = a.run_disp2
sleep 10
puts "returned array: #{check.join(", ")}"

#results:
#     Loaded 0
#     Loaded 1
#     Loaded 2
#     Loaded 3
#     Loaded 4
#     yeah baby! 4yeah baby! 3
#
#     yeah baby! 2
#     yeah baby! 1
#     yeah baby! 0
#     passed array: iiii, jjjjj, hhh, gg, f
#     Loaded 0
#     Loaded 1
#     Loaded 2
#     Loaded 3
#     Loaded 4
#     yeah baby! 3
#     yeah baby! 4
#     yeah baby! 2
#     yeah baby! 0
#     yeah baby! 1
#     returned array: iiii, jjjjj, hhh, f, gg

#assume puts is not transactional.

I think you definitely want to synchronize the array access (using a sequential queue).

As for #puts & IO friends, definitely :)

Thanks for trying it out. Would you be interested to share your experiments later, in a website article?

Laurent
_______________________________________________
MacRuby-devel mailing list
MacRuby-devel@lists.macosforge.org
http://lists.macosforge.org/mailman/listinfo.cgi/macruby-devel

Reply via email to