https://github.com/python/cpython/commit/f97e4098ff71a6488fd3411f9f9e6fa7a7bb4efe
commit: f97e4098ff71a6488fd3411f9f9e6fa7a7bb4efe
branch: main
author: Charles Machalow <csm10...@gmail.com>
committer: gpshead <g...@krypto.org>
date: 2025-03-02T18:01:45-08:00
summary:

gh-128041: Add `terminate_workers` and `kill_workers` methods to 
ProcessPoolExecutor (GH-128043)

This adds two new methods to `multiprocessing`'s `ProcessPoolExecutor`:
- **`terminate_workers()`**: forcefully terminates worker processes using 
`Process.terminate()`
- **`kill_workers()`**: forcefully kills worker processes using `Process.kill()`

These methods provide users with a direct way to stop worker processes without 
`shutdown()` or relying on implementation details, addressing situations where 
immediate termination is needed.

Co-authored-by: Bénédikt Tran <10796600+picn...@users.noreply.github.com>
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Commit-message-mostly-authored-by: Claude Sonnet 3.7 (because why not -greg)

files:
A Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst
M Doc/library/concurrent.futures.rst
M Doc/whatsnew/3.14.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures/test_process_pool.py

diff --git a/Doc/library/concurrent.futures.rst 
b/Doc/library/concurrent.futures.rst
index 5a950081a1c98d..dc613f2f8f00cd 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -415,6 +415,30 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
       require the *fork* start method for :class:`ProcessPoolExecutor` you must
       explicitly pass ``mp_context=multiprocessing.get_context("fork")``.
 
+   .. method:: terminate_workers()
+
+      Attempt to terminate all living worker processes immediately by calling
+      :meth:`Process.terminate <multiprocessing.Process.terminate>` on each of 
them.
+      Internally, it will also call :meth:`Executor.shutdown` to ensure that 
all
+      other resources associated with the executor are freed.
+
+      After calling this method the caller should no longer submit tasks to the
+      executor.
+
+      .. versionadded:: next
+
+   .. method:: kill_workers()
+
+      Attempt to kill all living worker processes immediately by calling
+      :meth:`Process.kill <multiprocessing.Process.kill>` on each of them.
+      Internally, it will also call :meth:`Executor.shutdown` to ensure that 
all
+      other resources associated with the executor are freed.
+
+      After calling this method the caller should no longer submit tasks to the
+      executor.
+
+      .. versionadded:: next
+
 .. _processpoolexecutor-example:
 
 ProcessPoolExecutor Example
diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst
index ce75b5fffc0a4c..2875913e4abdf1 100644
--- a/Doc/whatsnew/3.14.rst
+++ b/Doc/whatsnew/3.14.rst
@@ -444,6 +444,11 @@ contextvars
 * Support context manager protocol by :class:`contextvars.Token`.
   (Contributed by Andrew Svetlov in :gh:`129889`.)
 
+* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
+  :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
+  ways to terminate or kill all living worker processes in the given pool.
+  (Contributed by Charles Machalow in :gh:`128043`.)
+
 
 ctypes
 ------
diff --git a/Lib/concurrent/futures/process.py 
b/Lib/concurrent/futures/process.py
index 42eee72bc1457f..d79d6b959c90d3 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -626,6 +626,14 @@ class BrokenProcessPool(_base.BrokenExecutor):
     while a future was in the running state.
     """
 
+_TERMINATE = "terminate"
+_KILL = "kill"
+
+_SHUTDOWN_CALLBACK_OPERATION = {
+    _TERMINATE,
+    _KILL
+}
+
 
 class ProcessPoolExecutor(_base.Executor):
     def __init__(self, max_workers=None, mp_context=None,
@@ -855,3 +863,66 @@ def shutdown(self, wait=True, *, cancel_futures=False):
         self._executor_manager_thread_wakeup = None
 
     shutdown.__doc__ = _base.Executor.shutdown.__doc__
+
+    def _force_shutdown(self, operation):
+        """Attempts to terminate or kill the executor's workers based off the
+        given operation. Iterates through all of the current processes and
+        performs the relevant task if the process is still alive.
+
+        After terminating workers, the pool will be in a broken state
+        and no longer usable (for instance, new tasks should not be
+        submitted).
+        """
+        if operation not in _SHUTDOWN_CALLBACK_OPERATION:
+            raise ValueError(f"Unsupported operation: {operation!r}")
+
+        processes = {}
+        if self._processes:
+            processes = self._processes.copy()
+
+        # shutdown will invalidate ._processes, so we copy it right before
+        # calling. If we waited here, we would deadlock if a process decides 
not
+        # to exit.
+        self.shutdown(wait=False, cancel_futures=True)
+
+        if not processes:
+            return
+
+        for proc in processes.values():
+            try:
+                if not proc.is_alive():
+                    continue
+            except ValueError:
+                # The process is already exited/closed out.
+                continue
+
+            try:
+                if operation == _TERMINATE:
+                    proc.terminate()
+                elif operation == _KILL:
+                    proc.kill()
+            except ProcessLookupError:
+                # The process just ended before our signal
+                continue
+
+    def terminate_workers(self):
+        """Attempts to terminate the executor's workers.
+        Iterates through all of the current worker processes and terminates
+        each one that is still alive.
+
+        After terminating workers, the pool will be in a broken state
+        and no longer usable (for instance, new tasks should not be
+        submitted).
+        """
+        return self._force_shutdown(operation=_TERMINATE)
+
+    def kill_workers(self):
+        """Attempts to kill the executor's workers.
+        Iterates through all of the current worker processes and kills
+        each one that is still alive.
+
+        After killing workers, the pool will be in a broken state
+        and no longer usable (for instance, new tasks should not be
+        submitted).
+        """
+        return self._force_shutdown(operation=_KILL)
diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py 
b/Lib/test/test_concurrent_futures/test_process_pool.py
index 8b1bdaa33d8f5c..354b7d0a346970 100644
--- a/Lib/test/test_concurrent_futures/test_process_pool.py
+++ b/Lib/test/test_concurrent_futures/test_process_pool.py
@@ -1,13 +1,17 @@
 import os
+import queue
+import signal
 import sys
 import threading
 import time
 import unittest
+import unittest.mock
 from concurrent import futures
 from concurrent.futures.process import BrokenProcessPool
 
 from test import support
 from test.support import hashlib_helper
+from test.test_importlib.metadata.fixtures import parameterize
 
 from .executor import ExecutorTest, mul
 from .util import (
@@ -22,6 +26,19 @@ def __init__(self, mgr):
     def __del__(self):
         self.event.set()
 
+TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__
+KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__
+FORCE_SHUTDOWN_PARAMS = [
+    dict(function_name=TERMINATE_WORKERS),
+    dict(function_name=KILL_WORKERS),
+]
+
+def _put_sleep_put(queue):
+    """ Used as part of test_terminate_workers """
+    queue.put('started')
+    time.sleep(2)
+    queue.put('finished')
+
 
 class ProcessPoolExecutorTest(ExecutorTest):
 
@@ -218,6 +235,86 @@ def mock_start_new_thread(func, *args, **kwargs):
                     list(executor.map(mul, [(2, 3)] * 10))
             executor.shutdown()
 
+    def test_terminate_workers(self):
+        mock_fn = unittest.mock.Mock()
+        with self.executor_type(max_workers=1) as executor:
+            executor._force_shutdown = mock_fn
+            executor.terminate_workers()
+
+        mock_fn.assert_called_once_with(operation=futures.process._TERMINATE)
+
+    def test_kill_workers(self):
+        mock_fn = unittest.mock.Mock()
+        with self.executor_type(max_workers=1) as executor:
+            executor._force_shutdown = mock_fn
+            executor.kill_workers()
+
+        mock_fn.assert_called_once_with(operation=futures.process._KILL)
+
+    def test_force_shutdown_workers_invalid_op(self):
+        with self.executor_type(max_workers=1) as executor:
+            self.assertRaises(ValueError,
+                              executor._force_shutdown,
+                              operation='invalid operation'),
+
+    @parameterize(*FORCE_SHUTDOWN_PARAMS)
+    def test_force_shutdown_workers(self, function_name):
+        manager = self.get_context().Manager()
+        q = manager.Queue()
+
+        with self.executor_type(max_workers=1) as executor:
+            executor.submit(_put_sleep_put, q)
+
+            # We should get started, but not finished since we'll terminate the
+            # workers just after
+            self.assertEqual(q.get(timeout=5), 'started')
+
+            worker_process = list(executor._processes.values())[0]
+            getattr(executor, function_name)()
+            worker_process.join()
+
+            if function_name == TERMINATE_WORKERS or \
+                sys.platform == 'win32':
+                # On windows, kill and terminate both send SIGTERM
+                self.assertEqual(worker_process.exitcode, -signal.SIGTERM)
+            elif function_name == KILL_WORKERS:
+                self.assertEqual(worker_process.exitcode, -signal.SIGKILL)
+            else:
+                self.fail(f"Unknown operation: {function_name}")
+
+            self.assertRaises(queue.Empty, q.get, timeout=1)
+
+    @parameterize(*FORCE_SHUTDOWN_PARAMS)
+    def test_force_shutdown_workers_dead_workers(self, function_name):
+        with self.executor_type(max_workers=1) as executor:
+            future = executor.submit(os._exit, 1)
+            self.assertRaises(BrokenProcessPool, future.result)
+
+            # even though the pool is broken, this shouldn't raise
+            getattr(executor, function_name)()
+
+    @parameterize(*FORCE_SHUTDOWN_PARAMS)
+    def test_force_shutdown_workers_not_started_yet(self, function_name):
+        ctx = self.get_context()
+        with unittest.mock.patch.object(ctx, 'Process') as mock_process:
+            with self.executor_type(max_workers=1, mp_context=ctx) as executor:
+                # The worker has not been started yet, terminate/kill_workers
+                # should basically no-op
+                getattr(executor, function_name)()
+
+            mock_process.return_value.kill.assert_not_called()
+            mock_process.return_value.terminate.assert_not_called()
+
+    @parameterize(*FORCE_SHUTDOWN_PARAMS)
+    def test_force_shutdown_workers_stops_pool(self, function_name):
+        with self.executor_type(max_workers=1) as executor:
+            task = executor.submit(time.sleep, 0)
+            self.assertIsNone(task.result())
+
+            getattr(executor, function_name)()
+
+            self.assertRaises(RuntimeError, executor.submit, time.sleep, 0)
+
 
 create_executor_tests(globals(), ProcessPoolExecutorTest,
                       executor_mixins=(ProcessPoolForkMixin,
diff --git 
a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst 
b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst
new file mode 100644
index 00000000000000..bb9ef96d45eb79
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst
@@ -0,0 +1,4 @@
+Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and
+:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as
+ways to terminate or kill all living worker processes in the given pool.
+(Contributed by Charles Machalow in :gh:`128043`.)

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to