[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2020-10-20 Thread ruozeng.w

ruozeng.w  added the comment:

I ran into the same issue.
I'm using Ansible to deliver thousands of  remote tasks. One TaskQueueManager 
starts multiple worker processes, each worker process executes a remote task 
and send task result data to TaskQueueManager through MultiProcessing.Queue, so 
there are 1 consumer and thousands of producers(every producer should exit 
after sending task result). In high concurrency scenarios, this MAY happen and 
many worker processes will never exit。

Example:
sending ansible ping task (which executes very fast and returns very short 
result) to 2 targets.

Environment
==
python 3.6.5
oel 7.4

gdb debug info of worker process
==
(gdb) py-list
1067# If the lock is acquired, the C code is done, and self._stop() 
is
1068# called.  That sets ._is_stopped to True, and ._tstate_lock to 
None.
1069lock = self._tstate_lock
1070if lock is None:  # already determined that the C code is done
1071assert self._is_stopped
>1072elif lock.acquire(block, timeout):
1073lock.release()
1074self._stop()
1075
1076@property
1077def name(self):
(gdb) py-bt
Traceback (most recent call first):
  
  File "/usr/local/lib/python3.6/threading.py", line 1072, in 
_wait_for_tstate_lock
elif lock.acquire(block, timeout):
  File "/usr/local/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
  File "/usr/local/lib/python3.6/multiprocessing/queues.py", line 191, in 
_finalize_join
thread.join()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 186, in __call__
res = self._callback(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 262, in 
_run_finalizers
finalizer()
  File "/usr/local/lib/python3.6/multiprocessing/util.py", line 322, in 
_exit_function
_run_finalizers()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 261, in 
_bootstrap
util._exit_function()
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 73, in 
_launch
code = process_obj._bootstrap()
  File "/usr/local/lib/python3.6/multiprocessing/popen_fork.py", line 19, in 
__init__
self._launch(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 277, in 
_Popen
return Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/context.py", line 223, in 
_Popen
return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 105, in start
self._popen = self._Popen(self)
  File 
"/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/__init__.py",
 line 328, in _queue_task
worker_prc.start()
  File 
"/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/plugins/strategy/telinear.py",
 line 301, in run
self._queue_task(host, task, task_vars, play_context)
  File 
"/usr/local/lib/python3.6/site-packages/ansible-2.7.0.post0-py3.6.egg/ansible/executor/task_queue_manager.py",
 line 308, in run
play_return = strategy.run(iterator, play_context)



Anyone can help? Thx!

--
nosy: +dukewrz

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2019-01-01 Thread beruhan


beruhan  added the comment:

I also tested on windows 10,it worked normally.But when I run it under 
ubuntu16.04,It will blocked.my python version is 3.6.5

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2018-12-29 Thread Tim Peters


Tim Peters  added the comment:

Antoine, alas, it's subtler than that.  The worker process (process_func()) 
puts _another_ `StopIteration` on the input queue in its `finally` clause.  So 
the first worker process to finish adds back a sentinel for the next worker to 
see, and so on.  At the end, one StopIteration is left on input_queue (added by 
the last worker to finish).

Everything shuts down cleanly for me on 64-bit Win10 Py 3.7.2 no matter what 
input I tried, so I can't reproduce.

The OP really needs to identify _where_ it's hanging, and when (after all input 
has been read?  somewhere "random in the middle"? ...), and if at all possible 
supply an input on which it does hang.

The OP should also try 3.7.2.

--
nosy: +tim.peters

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2018-12-29 Thread Antoine Pitrou


Antoine Pitrou  added the comment:

Your input_thread puts StopIteration once input the queue.  But there are 
several worker processes popping from that queue, and only one of them will see 
the StopIteration.  So I'm not surprised other worker processes would be stuck 
waiting in their loop.

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2018-12-29 Thread beruhan


beruhan  added the comment:

debug message as follows:
[DEBUG/MainProcess] created semlock with handle 140059486064640
[DEBUG/MainProcess] created semlock with handle 140059486060544
[DEBUG/MainProcess] created semlock with handle 140059486056448
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] created semlock with handle 140059486052352
[DEBUG/MainProcess] created semlock with handle 140059486048256
[DEBUG/MainProcess] created semlock with handle 140059486044160
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/MainProcess] doing self._thread.start()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
^CTraceback (most recent call last):
  File "main_simple.py", line 76, in 
proc.join()
  File "/usr/lib/python3.6/multiprocessing/process.py", line 124, in join
res = self._popen.wait(timeout)
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 50, in wait
return self.poll(os.WNOHANG if timeout == 0.0 else 0)
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll
pid, sts = os.waitpid(self.pid, flag)
KeyboardInterrupt
^CException ignored in: 
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 1294, in _shutdown
t.join()
  File "/usr/lib/python3.6/threading.py", line 1056, in join
self._wait_for_tstate_lock()
  File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] telling queue thread to quit
[INFO/MainProcess] calling join() for process Process-3
^CError in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python3.6/multiprocessing/popen_fork.py", line 28, in poll
pid, sts = os.waitpid(self.pid, flag)

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2018-12-28 Thread Emmanuel Arias


Emmanuel Arias  added the comment:

> data = result_queue.get()

And this is blocked

--

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2018-12-28 Thread Emmanuel Arias


Emmanuel Arias  added the comment:

Hi

>def write_to_stdout(result_queue: Queue):

I think that you have to write here a sleep. IMO this is blocking all.

--
nosy: +eamanu

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2018-12-28 Thread 白稳平

Change by 白稳平 :


--
type:  -> behavior

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com



[issue35608] python3 multiprocessing queue deadlock when use thread and process at same time

2018-12-28 Thread 白稳平

New submission from 白稳平 :

I used multi-processes to handle cpu intensive task,I have a thread reading 
data from stdin and put it to a input_queue,  a thread get data from 
output_queue and write it to stdout, multiple processes get data from input 
queue,then handled the data,and put it to output_queue.But It some times will 
block forever,I doubt that it was because inappropriate to use the 
multiprocessing Queue,But I don't know how to solved it,can anyone help me?
my code as follows:

import multiprocessing
import sys
import threading
import time
from multiprocessing import Queue


def write_to_stdout(result_queue: Queue):
"""write queue data to stdout"""
while True:
data = result_queue.get()
if data is StopIteration:
break
sys.stdout.write(data)
sys.stdout.flush()


def read_from_stdin(queue):
"""read data from stdin, put it in queue for process handling"""
try:
for line in sys.stdin:
queue.put(line)
finally:
queue.put(StopIteration)


def process_func(input_queue, result_queue):
"""get data from input_queue,handled,put result into result_queue"""
try:
while True:
data = input_queue.get()
if data is StopIteration:
break
# cpu intensive task,use time.sleep instead
# result = compute_something(data)
time.sleep(0.1)
result_queue.put(data)
finally:
# ensure every process end
input_queue.put(StopIteration)


if __name__ == '__main__':
# queue for reading to stdout
input_queue = Queue(1000)

# queue for writing to stdout
result_queue = Queue(1000)

# thread reading data from stdin
input_thread = threading.Thread(target=read_from_stdin, 
args=(input_queue,))
input_thread.start()

# thread reading data from stdin
output_thread = threading.Thread(target=write_to_stdout, 
args=(result_queue,))
output_thread.start()

processes = []
cpu_count = multiprocessing.cpu_count()
# start multi-process to handle some cpu intensive task
for i in range(cpu_count):
proc = multiprocessing.Process(target=process_func, 
args=(input_queue, result_queue))
proc.start()
processes.append(proc)

# joined input thread
input_thread.join()

# joined all task processes
for proc in processes:
proc.join()

# ensure output thread end
result_queue.put(StopIteration)

# joined output thread
output_thread.join()

test environment:  

python3.6.5
ubuntu16.04

--
components: Library (Lib)
messages: 332691
nosy: davin, pitrou, 白稳平
priority: normal
severity: normal
status: open
title: python3 multiprocessing queue deadlock when use thread and process at 
same time
versions: Python 3.6

___
Python tracker 

___
___
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com