Looks like this will get what you need.

def some_complex_function(x):
    global q
    #stuff using q

def pool_init(q2):
    global q
    q = q2

def main():
    #initalize the Queue
    mp_comm_queue = mp.Queue()
    
    #Set up a pool to process a bunch of stuff in parallel
    pool = mp.Pool(initializer = pool_init, initargs = (mp_comm_queue,))
    ...



-----Original Message-----
From: David Raymond 
Sent: Monday, April 6, 2020 4:19 PM
To: python-list@python.org
Subject: RE: Multiprocessing queue sharing and python3.8

Attempting reply as much for my own understanding.

Are you on Mac? I think this is the pertinent bit for you:
Changed in version 3.8: On macOS, the spawn start method is now the default. 
The fork start method should be considered unsafe as it can lead to crashes of 
the subprocess. See bpo-33725.

When you start a new process (with the spawn method) it runs the module just 
like it's being imported. So your global " mp_comm_queue2=mp.Queue()" creates a 
new Queue in each process. Your initialization of mp_comm_queue is also done 
inside the main() function, which doesn't get run in each process. So each 
process in the Pool is going to have mp_comm_queue as None, and have its own 
version of mp_comm_queue2. The ID being the same or different is the result of 
one or more processes in the Pool being used repeatedly for the multiple steps 
in imap, probably because the function that the Pool is executing finishes so 
quickly.

Add a little extra info to the print calls (and/or set up logging to stdout 
with the process name/id included) and you can see some of this. Here's the 
hacked together changes I did for that.

import multiprocessing as mp
import os

mp_comm_queue = None #Will be initalized in the main function
mp_comm_queue2 = mp.Queue() #Test pre-initalized as well

def some_complex_function(x):
    print("proc id", os.getpid())
    print("mp_comm_queue", mp_comm_queue)
    print("queue2 id", id(mp_comm_queue2))
    mp_comm_queue2.put(x)
    print("queue size", mp_comm_queue2.qsize())
    print("x", x)
    return x * 2

def main():
    global mp_comm_queue
    #initalize the Queue
    mp_comm_queue = mp.Queue()
    
    #Set up a pool to process a bunch of stuff in parallel
    pool = mp.Pool()
    values = range(20)
    data = pool.imap(some_complex_function, values)
    
    for val in data:
        print(f"**{val}**")
    print("final queue2 size", mp_comm_queue2.qsize())
    
if __name__ == "__main__":
    main()



When making your own Process object and stating it then the Queue should be 
passed into the function as an argument, yes. The error text seems to be part 
of the Pool implementation, which I'm not as familiar with enough to know the 
best way to handle it. (Probably something using the "initializer" and 
"initargs" arguments for Pool)(maybe)



-----Original Message-----
From: Python-list <python-list-bounces+david.raymond=tomtom....@python.org> On 
Behalf Of Israel Brewster
Sent: Monday, April 6, 2020 1:24 PM
To: Python <python-list@python.org>
Subject: Multiprocessing queue sharing and python3.8

Under python 3.7 (and all previous versions I have used), the following code 
works properly, and produces the expected output:

import multiprocessing as mp

mp_comm_queue = None #Will be initalized in the main function
mp_comm_queue2=mp.Queue() #Test pre-initalized as well

def some_complex_function(x):
    print(id(mp_comm_queue2))
    assert(mp_comm_queue is not None)
    print(x)    
    return x*2
        
def main():
    global mp_comm_queue
    #initalize the Queue
    mp_comm_queue=mp.Queue()
    
    #Set up a pool to process a bunch of stuff in parallel
    pool=mp.Pool()
    values=range(20)
    data=pool.imap(some_complex_function,values)
    
    for val in data:
        print(f"**{val}**")
        
if __name__=="__main__":
    main()

- mp_comm_queue2 has the same ID for all iterations of some_complex_function, 
and the assert passes (mp_comm_queue is not None). However, under python 3.8, 
it fails - mp_comm_queue2 is a *different* object for each iteration, and the 
assert fails. 

So what am I doing wrong with the above example block? Assuming that it broke 
in 3.8 because I wasn’t sharing the Queue properly, what is the proper way to 
share a Queue object among multiple processes for the purposes of inter-process 
communication?

The documentation 
(https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes
 
<https://docs.python.org/3.8/library/multiprocessing.html#exchanging-objects-between-processes>)
 appears to indicate that I should pass the queue as an argument to the 
function to be executed in parallel, however that fails as well (on ALL 
versions of python I have tried) with the error:

Traceback (most recent call last):
  File "test_multi.py", line 32, in <module>
    main()
  File "test_multi.py", line 28, in main
    for val in data:
  File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py",
 line 748, in next
    raise value
  File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/pool.py",
 line 431, in _handle_tasks
    put(task)
  File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py",
 line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/reduction.py",
 line 51, in dumps
    cls(buf, protocol).dump(obj)
  File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/queues.py",
 line 58, in __getstate__
    context.assert_spawning(self)
  File 
"/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/context.py",
 line 356, in assert_spawning
    ' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through 
inheritance

after I add the following to the code to try passing the queue rather than 
having it global:

#Try by passing queue
values=[(x,mp_comm_queue) for x in range(20)]
data=pool.imap(some_complex_function,values)
for val in data:
    print(f"**{val}**")   

So if I can’t pass it as an argument, and having it global is incorrect (at 
least starting with 3.8), what is the proper method of getting multiprocessing 
queues to child processes?

---
Israel Brewster
Software Engineer
Alaska Volcano Observatory 
Geophysical Institute - UAF 
2156 Koyukuk Drive 
Fairbanks AK 99775-7320
Work: 907-474-5172
cell:  907-328-9145

-- 
https://mail.python.org/mailman/listinfo/python-list
-- 
https://mail.python.org/mailman/listinfo/python-list

Reply via email to