New submission from Steve Lorimer <steve.lori...@gmail.com>:
There is a race condition when closing/joining a pool with maxtasksperchild=1 Illustrative example: ``` #!/usr/bin/env python3 import os import time import logging import multiprocessing.pool def run_task(i): print(f'[{os.getpid()}] task({i}) complete') if __name__ == '__main__': multiprocessing.log_to_stderr(logging.DEBUG) tasks = iter(range(10)) processes = 4 pool = multiprocessing.pool.Pool(processes=processes, maxtasksperchild=1) running = [] while True: try: running = [ f for f in running if not f.ready() ] avail = processes - len(running) if avail: for _ in range(avail): i = next(tasks) print(f'[{os.getpid()}] add task({i})') future = pool.apply_async(run_task, ( i, )) running.append(future) else: time.sleep(0.1) except StopIteration: print(f'[{os.getpid()}] all tasks scheduled') break print(f'[{os.getpid()}] close and join pool') pool.close() pool.join() print(f'[{os.getpid()}] all done') ``` Example output: ``` [DEBUG/MainProcess] created semlock with handle 140042193375232 [DEBUG/MainProcess] created semlock with handle 140042193371136 [DEBUG/MainProcess] created semlock with handle 140042193367040 [DEBUG/MainProcess] created semlock with handle 140042193362944 [DEBUG/MainProcess] added worker [INFO/ForkPoolWorker-1] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/ForkPoolWorker-2] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/ForkPoolWorker-3] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/ForkPoolWorker-4] child process calling self.run() [7150] add task(0) [7150] add task(1) [7150] add task(2) [7150] add task(3) [7151] task(0) complete [7152] task(1) complete [7153] task(2) complete [7154] task(3) complete [DEBUG/ForkPoolWorker-1] worker exiting after 1 tasks [INFO/ForkPoolWorker-1] process shutting down [DEBUG/ForkPoolWorker-2] worker exiting after 1 tasks [DEBUG/ForkPoolWorker-1] running all "atexit" finalizers with priority >= 0 [DEBUG/ForkPoolWorker-3] worker exiting after 1 tasks [INFO/ForkPoolWorker-2] process shutting down <snip> [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] cleaning up worker 1 [7150] add task(8) [7150] add task(9) [DEBUG/MainProcess] cleaning up worker 0 [7150] all tasks scheduled [DEBUG/MainProcess] added worker [7150] close and join pool [DEBUG/MainProcess] closing pool [DEBUG/MainProcess] joining pool [INFO/ForkPoolWorker-9] child process calling self.run() [DEBUG/MainProcess] added worker [INFO/ForkPoolWorker-10] child process calling self.run() [DEBUG/MainProcess] added worker [7164] task(9) complete [DEBUG/ForkPoolWorker-10] worker exiting after 1 tasks [INFO/ForkPoolWorker-10] process shutting down [DEBUG/ForkPoolWorker-10] running all "atexit" finalizers with priority >= 0 [DEBUG/ForkPoolWorker-10] running the remaining "atexit" finalizers [INFO/ForkPoolWorker-10] process exiting with exitcode 0 [INFO/ForkPoolWorker-11] child process calling self.run() [DEBUG/MainProcess] cleaning up worker 2 [DEBUG/MainProcess] added worker [INFO/ForkPoolWorker-12] child process calling self.run() ``` The process hangs forever. Interrupting the process then produces the following output: ``` ^CTraceback (most recent call last): [INFO/ForkPoolWorker-11] process shutting down File "./test.py", line 36, in <module> [INFO/ForkPoolWorker-12] process shutting down [DEBUG/ForkPoolWorker-11] running all "atexit" finalizers with priority >= 0 [DEBUG/ForkPoolWorker-12] running all "atexit" finalizers with priority >= 0 [DEBUG/ForkPoolWorker-11] running the remaining "atexit" finalizers Process ForkPoolWorker-11: [DEBUG/ForkPoolWorker-12] running the remaining "atexit" finalizers Process ForkPoolWorker-12: pool.join() File "/usr/lib/python3.6/multiprocessing/pool.py", line 546, in join self._worker_handler.join() File "/usr/lib/python3.6/threading.py", line 1056, in join self._wait_for_tstate_lock() File "/usr/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker task = get() File "/usr/lib/python3.6/multiprocessing/queues.py", line 334, in get with self._rlock: File "/usr/lib/python3.6/multiprocessing/synchronize.py", line 95, in __enter__ return self._semlock.__enter__() KeyboardInterrupt [INFO/ForkPoolWorker-12] process exiting with exitcode 1 elif lock.acquire(block, timeout): KeyboardInterrupt Traceback (most recent call last): File "/usr/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python3.6/multiprocessing/process.py", line 93, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python3.6/multiprocessing/pool.py", line 108, in worker task = get() File "/usr/lib/python3.6/multiprocessing/queues.py", line 335, in get res = self._reader.recv_bytes() File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes buf = self._recv(4) File "/usr/lib/python3.6/multiprocessing/connection.py", line 379, in _recv chunk = read(handle, remaining) KeyboardInterrupt [INFO/ForkPoolWorker-11] process exiting with exitcode 1 [INFO/MainProcess] process shutting down [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 [DEBUG/MainProcess] finalizing pool [DEBUG/MainProcess] helping task handler/workers to finish [DEBUG/MainProcess] removing tasks from inqueue until task handler finished [DEBUG/MainProcess] joining worker handler [DEBUG/MainProcess] result handler found thread._state=TERMINATE [DEBUG/MainProcess] ensuring that outqueue is not full [DEBUG/MainProcess] result handler exiting: len(cache)=2, thread._state=2 [DEBUG/MainProcess] worker handler exiting [DEBUG/MainProcess] terminating workers [DEBUG/MainProcess] task handler got sentinel [DEBUG/MainProcess] task handler sending sentinel to result handler [DEBUG/MainProcess] task handler sending sentinel to workers [DEBUG/MainProcess] joining task handler [DEBUG/MainProcess] task handler exiting [DEBUG/MainProcess] joining result handler [DEBUG/MainProcess] joining pool workers [DEBUG/MainProcess] running the remaining "atexit" finalizers ``` Since this is a race condition, the example code may need to be run several times to cause it to hang. ``` $ for i in {1..100}; do ./test.py; done ``` Notably, removing maxtasksperchild stops the process from hanging. Additionally, changing the sleep to a busy wait causes the issue to go away: ``` wait = time.time() + 0.1 while time.time() < wait: pass ``` ---------- components: Library (Lib) messages: 356609 nosy: steve.lori...@gmail.com priority: normal severity: normal status: open title: race condition in multiprocessing.Pool with maxtasksperchild=1 type: behavior versions: Python 3.6 _______________________________________ Python tracker <rep...@bugs.python.org> <https://bugs.python.org/issue38799> _______________________________________ _______________________________________________ Python-bugs-list mailing list Unsubscribe: https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com