Tim Peters <t...@python.org> added the comment:

First thing:  the code uses the global name `outputer` for two different 
things, as the name of a module function and as the global name given to the 
Process object running that function.  At least on Windows under Python 3.6.4 
that confusion prevents the program from running.  So rename one of them.

Then comes the pain ;-)  A multiprocessing queue is a rather complex object 
under the covers, and the docs don't really spell out all the details.  Maybe 
they should.

The docs do vaguely sketch that a "feeder thread" is created in each process 
using an mp.queue, which feeds object you .put() from an internal buffer into 
an interprocess pipe.  The internal buffer is needed in case you .put() so many 
objects so fast that feeding them into a pipe directly would cause the OS pipe 
functions to fail.

And that happens in your case:  you have 10 producers running at full speed 
overwhelming a single slow consumer.  _Most_ of the data enqueued by 
output_queue.put(i+1) is sitting in those internal buffers most of the time, 
and the base interprocess pipe doesn't know anything about them for the 
duration.

The practical consequence:  while the queue always reflects the order in which 
objects were .put() within a single process, there's no guarantee about 
ordering _across_ processes.  Objects are fed from internal buffers into the 
shared pipe whenever a process's feeder thread happens to wake up and sees that 
the pipe isn't "too full".  task_queue.task_done() only records that an object 
has been taken off of task_queue and _given_ to output_queue.put(i+1); most of 
the time, the latter just sticks i+1 into an internal buffer because 
output_queue's shared pipe is too full to accept another object.

Given that this is how things actually work, what you can do instead is add:

    for w in workers:
        w.join()

somwehere before output_queue.put(None).  A worker process doesn't end until 
its feeder thread(s) complete feeding all the internal buffer objects into 
pipes, so .join()'ing a worker is the one "obvious" way to guarantee that all 
the worker's .put() actions have wholly completed.

In which case, there's no point to using a JoinableQueue at all - .task_done() 
no longer serves any real purpose in the code then.

----------
nosy: +tim.peters

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue32382>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to