multiprocessing: pool with blocking queue
Hi, I am trying to implement a multiprocessing pool that assigns tasks from a blocking queue. My situation is a pretty classic producer/ consumer conundrum, where the producer can produce much faster than the consumers can consume. The wrinkle in the story is that the producer produces objects that consume large amounts of memory, so I would like the queue to block when it reaches, say, twice the number of tasks as there are processes in the pool, so that I don't devour my RAM. I have been able to implement this one way, but I am somewhat displeased with the result. Code speaks louder than words, so here is a brief example of the technique I am using, followed by an explanation of why I am unhappy with it: === #!/usr/bin/env python3 import time import random from multiprocessing import Pool, Queue, M def procfunc(queue): time.sleep(random.random() * 2) return queue.get()*2 def printrange(n): for i in range(n): print(generated + str(i)) yield i if __name__ == __main__: sm = Manager() pool = Pool() queuelen = len(pool._pool) * 2 queue = sm.Queue(queuelen) for i in printrange(100): queue.put(i) pool.apply_async(procfunc, (queue,), callback=print) pool.close() pool.join() === The reason I am unhappy with this trick is that if you examine the source code of pool.py, you will note that the class Pool already uses an internal queue, _taskqueue from which the tasks are assigned to processes in the pool. Particularly: def __init__(self, processes=None, initializer=None, initargs=()): self._setup_queues() self._taskqueue = queue.Queue() ...snip It seems to me that if I could only subclass and do queuelen = len(pool._pool) * 2 self._taskqueue = queue.Queue(queuelen) later in the constructor, once the pool length has been established, I would have a much more elegant, transparent solution to the problem. Unfortunately, the design of the Pool class is such that actually implementing this solution would be very hackish and inelegant. If only, say, _setup_queues() were called after the _taskqueue assignment, then I could override it. My questions, then, is: Is there a more elegant/pythonic way of doing what I am trying to do with the current Pool class? If the verdict is no, I'll be happy to file a bug report. -- http://mail.python.org/mailman/listinfo/python-list
Re: multiprocessing: pool with blocking queue
masher vertesp...@gmail.com writes: My questions, then, is: Is there a more elegant/pythonic way of doing what I am trying to do with the current Pool class? Forgive me, I may not fully understand what you are trying to do here (I've never really used multiprocessing all that much)... But couldn't you just assign your own Queue object to the Pool instance? -- http://mail.python.org/mailman/listinfo/python-list
Re: multiprocessing: pool with blocking queue
On Jul 2, 12:06 pm, J Kenneth King ja...@agentultra.com wrote: masher vertesp...@gmail.com writes: My questions, then, is: Is there a more elegant/pythonic way of doing what I am trying to do with the current Pool class? Forgive me, I may not fully understand what you are trying to do here (I've never really used multiprocessing all that much)... But couldn't you just assign your own Queue object to the Pool instance? That's basically my question. It does not appear as though there is any straightforward way of doing this because of the design of Pool's __init__ method, which passes _taskqueue to several functions. Hence, even if I were to reassign _taskqueue after __init__, that wouldn't change anything. -- http://mail.python.org/mailman/listinfo/python-list
Re: multiprocessing: pool with blocking queue
masher vertesp...@gmail.com writes: On Jul 2, 12:06 pm, J Kenneth King ja...@agentultra.com wrote: masher vertesp...@gmail.com writes: My questions, then, is: Is there a more elegant/pythonic way of doing what I am trying to do with the current Pool class? Forgive me, I may not fully understand what you are trying to do here (I've never really used multiprocessing all that much)... But couldn't you just assign your own Queue object to the Pool instance? That's basically my question. It does not appear as though there is any straightforward way of doing this because of the design of Pool's __init__ method, which passes _taskqueue to several functions. Hence, even if I were to reassign _taskqueue after __init__, that wouldn't change anything. I think I understand. There are ways to modify the class before instantiating it, but even the most clever or elegant solution will still smell funny. I suppose this might be worth submitting as a feature suggestion to the multiprocessing project. Best of luck. -- http://mail.python.org/mailman/listinfo/python-list
Re: multiprocessing: pool with blocking queue
On Jul 2, 11:09 am, masher vertesp...@gmail.com wrote: My questions, then, is: Is there a more elegant/pythonic way of doing what I am trying to do with the current Pool class? Another thing you might try is to subclass Pool and add an apply_async () wrapper which would wait for _taskqueue.qsize() to reach the desired size. You would probably do this wait in a loop with a small sleep. This approach would avoid needing a second Queue, but you would also add some delay to your producer due to the sleep (something you're not currently worried about). The minimum sleep may be something like 1 ms (it's really system dependent), but the time it takes for a thread blocked on a mutex to wake up is often more on the order of microseconds, which you have with your blocking queue. I doubt this offers you much satisfaction, though. If the verdict is no, I'll be happy to file a bug report. Yeah, I think it's a worth a try. -- http://mail.python.org/mailman/listinfo/python-list