https://github.com/python/cpython/commit/598aa7cc98bc1b39f10ec41decddd8dd88799fe1
commit: 598aa7cc98bc1b39f10ec41decddd8dd88799fe1
branch: main
author: Ajay Kamdar <140011370+ogbigg...@users.noreply.github.com>
committer: encukou <encu...@gmail.com>
date: 2025-06-10T13:28:31+02:00
summary:

gh-132969:  Fix error/hang when shutdown(wait=False) and task exited abnormally 
(GH-133222)

When shutdown is called with wait=False, the executor thread keeps running
even after the ProcessPoolExecutor's state is reset. The executor then tries
to replenish the worker processes pool resulting in an error and a potential 
hang
when it comes across a worker that has died. Fixed the issue by having
_adjust_process_count() return without doing anything if the 
ProcessPoolExecutor's
state has been reset.

Added unit tests to validate two scenarios:
max_workers < num_tasks (exception)
max_workers > num_tasks (exception + hang)

files:
A Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures/test_shutdown.py

diff --git a/Lib/concurrent/futures/process.py 
b/Lib/concurrent/futures/process.py
index 76b7b2abe836d8..a14650bf5fa47c 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -755,6 +755,11 @@ def _start_executor_manager_thread(self):
                 self._executor_manager_thread_wakeup
 
     def _adjust_process_count(self):
+        # gh-132969: avoid error when state is reset and executor is still 
running,
+        # which will happen when shutdown(wait=False) is called.
+        if self._processes is None:
+            return
+
         # if there's an idle process, we don't need to spawn a new one.
         if self._idle_worker_semaphore.acquire(blocking=False):
             return
diff --git a/Lib/test/test_concurrent_futures/test_shutdown.py 
b/Lib/test/test_concurrent_futures/test_shutdown.py
index 7a4065afd46fc8..99b315b47e2530 100644
--- a/Lib/test/test_concurrent_futures/test_shutdown.py
+++ b/Lib/test/test_concurrent_futures/test_shutdown.py
@@ -330,6 +330,64 @@ def test_shutdown_no_wait(self):
         # shutdown.
         assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
 
+    @classmethod
+    def _failing_task_gh_132969(cls, n):
+        raise ValueError("failing task")
+
+    @classmethod
+    def _good_task_gh_132969(cls, n):
+        time.sleep(0.1 * n)
+        return n
+
+    def _run_test_issue_gh_132969(self, max_workers):
+        # max_workers=2 will repro exception
+        # max_workers=4 will repro exception and then hang
+
+        # Repro conditions
+        #   max_tasks_per_child=1
+        #   a task ends abnormally
+        #   shutdown(wait=False) is called
+        start_method = self.get_context().get_start_method()
+        if (start_method == "fork" or
+           (start_method == "forkserver" and sys.platform.startswith("win"))):
+                self.skipTest(f"Skipping test for {start_method = }")
+        executor = futures.ProcessPoolExecutor(
+                max_workers=max_workers,
+                max_tasks_per_child=1,
+                mp_context=self.get_context())
+        f1 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 1)
+        f2 = executor.submit(ProcessPoolShutdownTest._failing_task_gh_132969, 
2)
+        f3 = executor.submit(ProcessPoolShutdownTest._good_task_gh_132969, 3)
+        result = 0
+        try:
+            result += f1.result()
+            result += f2.result()
+            result += f3.result()
+        except ValueError:
+            # stop processing results upon first exception
+            pass
+
+        # Ensure that the executor cleans up after called
+        # shutdown with wait=False
+        executor_manager_thread = executor._executor_manager_thread
+        executor.shutdown(wait=False)
+        time.sleep(0.2)
+        executor_manager_thread.join()
+        return result
+
+    def test_shutdown_gh_132969_case_1(self):
+        # gh-132969: test that exception "object of type 'NoneType' has no 
len()"
+        # is not raised when shutdown(wait=False) is called.
+        result = self._run_test_issue_gh_132969(2)
+        self.assertEqual(result, 1)
+
+    def test_shutdown_gh_132969_case_2(self):
+        # gh-132969: test that process does not hang and
+        # exception "object of type 'NoneType' has no len()" is not raised
+        # when shutdown(wait=False) is called.
+        result = self._run_test_issue_gh_132969(4)
+        self.assertEqual(result, 1)
+
 
 create_executor_tests(globals(), ProcessPoolShutdownTest,
                       executor_mixins=(ProcessPoolForkMixin,
diff --git 
a/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst 
b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
new file mode 100644
index 00000000000000..7364c425941233
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-04-30-19-32-18.gh-issue-132969.EagQ3G.rst
@@ -0,0 +1,7 @@
+Prevent the :class:`~concurrent.futures.ProcessPoolExecutor` executor thread,
+which remains running when :meth:`shutdown(wait=False)
+<concurrent.futures.Executor.shutdown>`, from
+attempting to adjust the pool's worker processes after the object state has 
already been reset during shutdown.
+A combination of conditions, including a worker process having terminated 
abormally,
+resulted in an exception and a potential hang when the still-running executor 
thread
+attempted to replace dead workers within the pool.

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: arch...@mail-archive.com

Reply via email to