Time for me to ask something...
I've got a scenario where I need to pass data in both directions of a multiprocessing program. Specifically what I'm working with is a test runner from a homegrown testing harness (aside having nothing to do with the question: if it was me, I would build this starting with pytest, but the codebase predates pytest and I'm only willing to do so much surgery at the moment!). The workflow is: the main program either uses it arguments to figure out what tests to run or scans for them, building up a list of files which is then modified into a list of Test class instances. This is passed to a set of workers through a multiprocessing.Queue. Each worker pulls items off the queue and runs them, which causes the Test instance to be updated with results. This is easy enough and looks just like the threaded version of the runner; since everything is gathered and enqueued before the workers are fired up, finding when things are done is easy enough. For example, can use the sentinel trick of writing a None for each worker at the end so each one in turn will pick up one of those and quit. That's not even necessary, it seems. The mp version (which I'm doing just as a personal exercise) has a difference: the main program wants to go through the results and produce a summary report, which means it needs access to the modified Test instances. Unlike the threaded version, the list of instances is not shared between the processes. I figured I'd send the modified instances back through another queue which the main process reads. So now after all this vebiage, the question: how does the main process, acting in a consumer role on the resultq, read this queue effectively in the presence of multiple producers? If it does q.get in a loop, it will eventually block when the queue is empty. If it does q.get_nowait, it _might_ see an empty queue when some tests are pending that haven't been written to the queue yet (especially since the pickling that takes place on the queue apparently introduces a little delay) A somewhat simplified view of what I have now is: Queue, JoinableQueue imported from multiprocessing queue = JoinableQueue() resultq = Queue() procs = [RunTest(queue=queue, resultq=resultq) for _ in range(jobs)] total_num_tests = len(tests) for t in tests: queue.put(t) for p in procs: p.daemon = True p.start() queue.join() # collect back the modified test instances tests = [] for t in iter(resultq.get, None): tests.append(t) # because q.get() blocks, we need a way to break out # q.empty() is claimed not to be reliable in mp. if len(tests) >= total_num_tests: break This actually works fine based on knowing how many tests there are and assuming when we've collected the same number, everything is done. But it doesn't sound entirely "clean" to me. What if a test is "lost" due to an error, so its modified Test instance is never written to the result queue? Could use a sentinel, but who's going to write the sentinel - this seems to suffer from the same problem as the non-blocking get, in that a worker could write a sentinel when *it* is done, but other workers might still be running their last test and we'll conclude the queue has been drained a bit too early. I did play some with trying to set up a barrier across the workers, so they're all finished before each writes a sentinel, but that still blocked in the q.get (mea culpa: I'm violating the guidelines and not posting code for that - I didn't save that particular hack). I've read a little bit about multiprocessing.Manager and about multiprocessing.Pool to look for other approaches, both seemed to have limitations that made them awkward for this case, probably based on me failing to understand something. So... what else should I be trying to make this a little cleaner? thanks, -- mats _______________________________________________ Tutor maillist - Tutor@python.org To unsubscribe or change subscription options: https://mail.python.org/mailman/listinfo/tutor