New submission from 白稳平 <[email protected]>:
I used multi-processes to handle cpu intensive task,I have a thread reading
data from stdin and put it to a input_queue, a thread get data from
output_queue and write it to stdout, multiple processes get data from input
queue,then handled the data,and put it to output_queue.But It some times will
block forever,I doubt that it was because inappropriate to use the
multiprocessing Queue,But I don't know how to solved it,can anyone help me?
my code as follows:
import multiprocessing
import sys
import threading
import time
from multiprocessing import Queue
def write_to_stdout(result_queue: Queue):
"""write queue data to stdout"""
while True:
data = result_queue.get()
if data is StopIteration:
break
sys.stdout.write(data)
sys.stdout.flush()
def read_from_stdin(queue):
"""read data from stdin, put it in queue for process handling"""
try:
for line in sys.stdin:
queue.put(line)
finally:
queue.put(StopIteration)
def process_func(input_queue, result_queue):
"""get data from input_queue,handled,put result into result_queue"""
try:
while True:
data = input_queue.get()
if data is StopIteration:
break
# cpu intensive task,use time.sleep instead
# result = compute_something(data)
time.sleep(0.1)
result_queue.put(data)
finally:
# ensure every process end
input_queue.put(StopIteration)
if __name__ == '__main__':
# queue for reading to stdout
input_queue = Queue(1000)
# queue for writing to stdout
result_queue = Queue(1000)
# thread reading data from stdin
input_thread = threading.Thread(target=read_from_stdin,
args=(input_queue,))
input_thread.start()
# thread reading data from stdin
output_thread = threading.Thread(target=write_to_stdout,
args=(result_queue,))
output_thread.start()
processes = []
cpu_count = multiprocessing.cpu_count()
# start multi-process to handle some cpu intensive task
for i in range(cpu_count):
proc = multiprocessing.Process(target=process_func,
args=(input_queue, result_queue))
proc.start()
processes.append(proc)
# joined input thread
input_thread.join()
# joined all task processes
for proc in processes:
proc.join()
# ensure output thread end
result_queue.put(StopIteration)
# joined output thread
output_thread.join()
test environment:
python3.6.5
ubuntu16.04
----------
components: Library (Lib)
messages: 332691
nosy: davin, pitrou, 白稳平
priority: normal
severity: normal
status: open
title: python3 multiprocessing queue deadlock when use thread and process at
same time
versions: Python 3.6
_______________________________________
Python tracker <[email protected]>
<https://bugs.python.org/issue35608>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com