https://github.com/python/cpython/commit/598aa7cc98bc1b39f10ec41decddd8dd88799fe1
commit: 598aa7cc98bc1b39f10ec41decddd8dd88799fe1
branch: main
author: Ajay Kamdar <[email protected]>
committer: encukou <[email protected]>
date: 2025-06-10T13:28:31+02:00
summary:
gh-132969: Fix error/hang when shutdown(wait=False) and task exited abnormally
(GH-133222)
When shutdown is called with wait=False, the executor thread keeps running
even after the ProcessPoolExecutor's state is reset. The executor then tries
to replenish the worker processes pool resulting in an error and a potential
hang
when it comes across a worker that has died. Fixed the issue by having
_adjust_process_count() return without doing anything if the
ProcessPoolExecutor's
state has been reset.
Added unit tests to validate two scenarios:
max_workers < num_tasks (exception)
max_workers > num_tasks (exception + hang)
files:
A Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures/test_shutdown.py
diff --git a/Lib/concurrent/futures/process.py
b/Lib/concurrent/futures/process.py
index 76b7b2abe836d8..a14650bf5fa47c 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -755,6 +755,11 @@ def _start_executor_manager_thread(self):
self._executor_manager_thread_wakeup
def _adjust_process_count(self):
+ # gh-132969: avoid error when state is reset and executor is still
running,
+ # which will happen when shutdown(wait=False) is called.
+ if self._processes is None:
+ return
+
# if there's an idle process, we don't need to spawn a new one.
if self._idle_worker_semaphore.acquire(blocking=False):
return
diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py
b/Lib/test/test_concurrent_futures/test_shutdown.py
index 7a4065afd46fc8..99b315b47e2530 100644
--- a/Lib/test/test_concurrent_futures/test_shutdown.py
+++ b/Lib/test/test_concurrent_futures/test_shutdown.py
@@ -330,6 +330,64 @@ def test_shutdown_no_wait(self):
# shutdown.
assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
+ @classmethod
+ def _failing_task_gh_132969(cls, n):
+ raise ValueError("failing task")
+
+ @classmethod
+ def _good_task_gh_132969(cls, n):
+ time.sleep(0.1 * n)
+ return n
+
+ def _run_test_issue_gh_132969(self, max_workers):
+ # max_workers=2 will repro exception
+ # max_workers=4 will repro exception and then hang
+
+ # Repro conditions
+ # max_tasks_per_child=1
+ # a task ends abnormally
+ # shutdown(wait=False) is called
+ start_method = self.get_context().get_start_method()
+ if (start_method == "fork" or
+ (start_method == "forkserver" and sys.platform.startswith("win"))):
+ self.skipTest(f"Skipping test for {start_method = }")
+ executor = futures.ProcessPoolExecutor(
+ max_workers=max_workers,
+ max_tasks_per_child=1,
+ mp_context=self.get_context())
+ f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1)
+ f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969,
2)
+ f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3)
+ result = 0
+ try:
+ result += f1.result()
+ result += f2.result()
+ result += f3.result()
+ except ValueError:
+ # stop processing results upon first exception
+ pass
+
+ # Ensure that the executor cleans up after called
+ # shutdown with wait=False
+ executor_manager_thread = executor._executor_manager_thread
+ executor.shutdown(wait=False)
+ time.sleep(0.2)
+ executor_manager_thread.join()
+ return result
+
+ def test_shutdown_gh_132969_case_1(self):
+ # gh-132969: test that exception "object of type 'NoneType' has no
len()"
+ # is not raised when shutdown(wait=False) is called.
+ result = self._run_test_issue_gh_132969(2)
+ self.assertEqual(result, 1)
+
+ def test_shutdown_gh_132969_case_2(self):
+ # gh-132969: test that process does not hang and
+ # exception "object of type 'NoneType' has no len()" is not raised
+ # when shutdown(wait=False) is called.
+ result = self._run_test_issue_gh_132969(4)
+ self.assertEqual(result, 1)
+
create_executor_tests(globals(), ProcessPoolShutdownTest,
executor_mixins=(ProcessPoolForkMixin,
diff --git
a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
new file mode 100644
index 00000000000000..7364c425941233
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
@@ -0,0 +1,7 @@
+Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread,
+which remains running when :meth:`shutdown(wait=False)
+<concurrent.futures.Executor.shutdown>`, from
+attempting to adjust the pool's worker processes after the object state has
already been reset during shutdown.
+A combination of conditions, including a worker process having terminated
abormally,
+resulted in an exception and a potential hang when the still-running executor
thread
+attempted to replace dead workers within the pool.
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]