Re: Q: multiprocessing.Queue size limitations or bug...

2009-08-27 Thread ryles
On Aug 26, 4:56 am, Michael Riedel mrie...@inova-semiconductors.de
wrote:
 Sorry for being not more specific but I'm not absolutely certain whether
 I encountered a bug or did anything wrong:

 The (stupid) code below results in a stall forever or not at 'p0.join()'
 depending on the value of TROUBLE_MAKER.

 Any help, thoughts, comments?

 Thank you for your time.

 Michael

 # --

 from multiprocessing import Process, Queue

 # bit vector size
 BVS=8

 #
 TROUBLE_MAKER=12  # for greater values p0.join() is never satisfied...

 def evaluate(q, id, start=0, stop=2**BVS):

     cmin = {0: []}

     for mask0 in range(start, stop):
         for mask1 in range(0, 2**BVS):
             for mask2 in range(mask1, TROUBLE_MAKER):
                 cmin[0].append((mask0, mask1, mask2))

     print 'process %d finished (dict layout: %d/%d)...' % (id,
 len(cmin), len(cmin[0]))
     q.put(cmin.copy())
     q.close()

 if __name__ == '__main__':

     q0 = Queue()
     q1 = Queue()
     q2 = Queue()
     q3 = Queue()

     part = 2**BVS/4
     p0 = Process(target=evaluate, args=(q0, 0, 0*part, 1*part),
 name='worker_0')
     p1 = Process(target=evaluate, args=(q1, 1, 1*part, 2*part),
 name='worker_1')
     p2 = Process(target=evaluate, args=(q2, 2, 2*part, 3*part),
 name='worker_2')
     p3 = Process(target=evaluate, args=(q3, 3, 3*part, 4*part),
 name='worker_3')
     p0.start()
     print 'process 0 started...'
     p1.start()
     print 'process 1 started...'
     p2.start()
     print 'process 2 started...'
     p3.start()
     print 'process 3 started...'
     # main process stalls at p0.join() for bigger TROUBLE_MAKER
     p0.join()
     p1.join()
     p2.join()
     p3.join()
     res0 = q0.get()
     res1 = q1.get()
     res2 = q2.get()
     res3 = q3.get()
     print 'results fetched...'

 # --

 --

There is a warning related to this in the documentation:

http://docs.python.org/library/multiprocessing.html#pipes-and-queues

Basically, you should reverse the order of the get() and join() calls.

multiprocessing does a pretty nice job of abstracting away the low-
level details of IPC, but there are still some gotchas. As you've
noticed, your program will deadlock when there is a large enough
amount of data being put into the queue. This is related to a hidden
thread that exists inside each of your child processes. The thread is
responsible for taking your queue items from an internal buffer and
then writing them into a pipe that your parent process will read from
when get() is called. The pipe mechanism is what allows the two
processes to pass information, and is supported directly by the
Operating System. However, the pipe has a limited capacity, and when
it is full, the writer thread is stuck waiting for the reader to read
enough from the pipe so that it can finish its write. The problem is
that your parent process (reader) is not actually calling get() to
drain the pipe. Instead it's stuck in join() waiting for the writer to
complete.
-- 
http://mail.python.org/mailman/listinfo/python-list


Re: Q: multiprocessing.Queue size limitations or bug...

2009-08-27 Thread Michael
On Aug 27, 8:56 am, ryles ryle...@gmail.com wrote:
 On Aug 26, 4:56 am, Michael Riedel mrie...@inova-semiconductors.de
 wrote:



  Sorry for being not more specific but I'm not absolutely certain whether
  I encountered a bug or did anything wrong:

  The (stupid) code below results in a stall forever or not at 'p0.join()'
  depending on the value of TROUBLE_MAKER.

  Any help, thoughts, comments?

  Thank you for your time.

  Michael

  # --

  from multiprocessing import Process, Queue

  # bit vector size
  BVS=8

  #
  TROUBLE_MAKER=12  # for greater values p0.join() is never satisfied...

  def evaluate(q, id, start=0, stop=2**BVS):

      cmin = {0: []}

      for mask0 in range(start, stop):
          for mask1 in range(0, 2**BVS):
              for mask2 in range(mask1, TROUBLE_MAKER):
                  cmin[0].append((mask0, mask1, mask2))

      print 'process %d finished (dict layout: %d/%d)...' % (id,
  len(cmin), len(cmin[0]))
      q.put(cmin.copy())
      q.close()

  if __name__ == '__main__':

      q0 = Queue()
      q1 = Queue()
      q2 = Queue()
      q3 = Queue()

      part = 2**BVS/4
      p0 = Process(target=evaluate, args=(q0, 0, 0*part, 1*part),
  name='worker_0')
      p1 = Process(target=evaluate, args=(q1, 1, 1*part, 2*part),
  name='worker_1')
      p2 = Process(target=evaluate, args=(q2, 2, 2*part, 3*part),
  name='worker_2')
      p3 = Process(target=evaluate, args=(q3, 3, 3*part, 4*part),
  name='worker_3')
      p0.start()
      print 'process 0 started...'
      p1.start()
      print 'process 1 started...'
      p2.start()
      print 'process 2 started...'
      p3.start()
      print 'process 3 started...'
      # main process stalls at p0.join() for bigger TROUBLE_MAKER
      p0.join()
      p1.join()
      p2.join()
      p3.join()
      res0 = q0.get()
      res1 = q1.get()
      res2 = q2.get()
      res3 = q3.get()
      print 'results fetched...'

  # --

  --

 There is a warning related to this in the documentation:

 http://docs.python.org/library/multiprocessing.html#pipes-and-queues

 Basically, you should reverse the order of the get() and join() calls.

 multiprocessing does a pretty nice job of abstracting away the low-
 level details of IPC, but there are still some gotchas. As you've
 noticed, your program will deadlock when there is a large enough
 amount of data being put into the queue. This is related to a hidden
 thread that exists inside each of your child processes. The thread is
 responsible for taking your queue items from an internal buffer and
 then writing them into a pipe that your parent process will read from
 when get() is called. The pipe mechanism is what allows the two
 processes to pass information, and is supported directly by the
 Operating System. However, the pipe has a limited capacity, and when
 it is full, the writer thread is stuck waiting for the reader to read
 enough from the pipe so that it can finish its write. The problem is
 that your parent process (reader) is not actually calling get() to
 drain the pipe. Instead it's stuck in join() waiting for the writer to
 complete.

I see. I really appreciate your valuable feedback.
-- 
http://mail.python.org/mailman/listinfo/python-list


Q: multiprocessing.Queue size limitations or bug...

2009-08-26 Thread Michael Riedel
Sorry for being not more specific but I'm not absolutely certain whether
I encountered a bug or did anything wrong:

The (stupid) code below results in a stall forever or not at 'p0.join()'
depending on the value of TROUBLE_MAKER.

Any help, thoughts, comments?

Thank you for your time.

Michael

# --

from multiprocessing import Process, Queue

# bit vector size
BVS=8

#
TROUBLE_MAKER=12  # for greater values p0.join() is never satisfied...

def evaluate(q, id, start=0, stop=2**BVS):

cmin = {0: []}

for mask0 in range(start, stop):
for mask1 in range(0, 2**BVS):
for mask2 in range(mask1, TROUBLE_MAKER):
cmin[0].append((mask0, mask1, mask2))

print 'process %d finished (dict layout: %d/%d)...' % (id,
len(cmin), len(cmin[0]))
q.put(cmin.copy())
q.close()


if __name__ == '__main__':

q0 = Queue()
q1 = Queue()
q2 = Queue()
q3 = Queue()

part = 2**BVS/4
p0 = Process(target=evaluate, args=(q0, 0, 0*part, 1*part),
name='worker_0')
p1 = Process(target=evaluate, args=(q1, 1, 1*part, 2*part),
name='worker_1')
p2 = Process(target=evaluate, args=(q2, 2, 2*part, 3*part),
name='worker_2')
p3 = Process(target=evaluate, args=(q3, 3, 3*part, 4*part),
name='worker_3')
p0.start()
print 'process 0 started...'
p1.start()
print 'process 1 started...'
p2.start()
print 'process 2 started...'
p3.start()
print 'process 3 started...'
# main process stalls at p0.join() for bigger TROUBLE_MAKER
p0.join()
p1.join()
p2.join()
p3.join()
res0 = q0.get()
res1 = q1.get()
res2 = q2.get()
res3 = q3.get()
print 'results fetched...'

# --

-- 

-- 
http://mail.python.org/mailman/listinfo/python-list