https://github.com/python/cpython/commit/ba05a4ebcb67506b4e6d65ea11e78d06f57dc23b commit: ba05a4ebcb67506b4e6d65ea11e78d06f57dc23b branch: main author: Charles Machalow <csm10...@gmail.com> committer: gpshead <g...@krypto.org> date: 2025-03-05T14:31:42-08:00 summary:
gh-128041: Add `terminate_workers` and `kill_workers` methods to ProcessPoolExecutor (GH-130849) This adds two new methods to `multiprocessing`'s `ProcessPoolExecutor`: - **`terminate_workers()`**: forcefully terminates worker processes using `Process.terminate()` - **`kill_workers()`**: forcefully kills worker processes using `Process.kill()` These methods provide users with a direct way to stop worker processes without `shutdown()` or relying on implementation details, addressing situations where immediate termination is needed. Co-authored-by: Bénédikt Tran <10796600+picn...@users.noreply.github.com> Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com> Co-authored-by: Sam Gross @colesbury Commit-message-mostly-authored-by: Claude Sonnet 3.7 (because why not -greg) files: A Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst M Doc/library/concurrent.futures.rst M Doc/whatsnew/3.14.rst M Lib/concurrent/futures/process.py M Lib/test/test_concurrent_futures/test_process_pool.py diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 5a950081a1c98d..dc613f2f8f00cd 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -415,6 +415,30 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. require the *fork* start method for :class:`ProcessPoolExecutor` you must explicitly pass ``mp_context=multiprocessing.get_context("fork")``. + .. method:: terminate_workers() + + Attempt to terminate all living worker processes immediately by calling + :meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them. + Internally, it will also call :meth:`Executor.shutdown` to ensure that all + other resources associated with the executor are freed. + + After calling this method the caller should no longer submit tasks to the + executor. + + .. versionadded:: next + + .. method:: kill_workers() + + Attempt to kill all living worker processes immediately by calling + :meth:`Process.kill <multiprocessing.Process.kill>` on each of them. + Internally, it will also call :meth:`Executor.shutdown` to ensure that all + other resources associated with the executor are freed. + + After calling this method the caller should no longer submit tasks to the + executor. + + .. versionadded:: next + .. _processpoolexecutor-example: ProcessPoolExecutor Example diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index b457ccf66571b0..7d879d8b36669b 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -444,6 +444,11 @@ contextvars * Support context manager protocol by :class:`contextvars.Token`. (Contributed by Andrew Svetlov in :gh:`129889`.) +* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and + :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as + ways to terminate or kill all living worker processes in the given pool. + (Contributed by Charles Machalow in :gh:`130849`.) + ctypes ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 42eee72bc1457f..d79d6b959c90d3 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -626,6 +626,14 @@ class BrokenProcessPool(_base.BrokenExecutor): while a future was in the running state. """ +_TERMINATE = "terminate" +_KILL = "kill" + +_SHUTDOWN_CALLBACK_OPERATION = { + _TERMINATE, + _KILL +} + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, @@ -855,3 +863,66 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ + + def _force_shutdown(self, operation): + """Attempts to terminate or kill the executor's workers based off the + given operation. Iterates through all of the current processes and + performs the relevant task if the process is still alive. + + After terminating workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + if operation not in _SHUTDOWN_CALLBACK_OPERATION: + raise ValueError(f"Unsupported operation: {operation!r}") + + processes = {} + if self._processes: + processes = self._processes.copy() + + # shutdown will invalidate ._processes, so we copy it right before + # calling. If we waited here, we would deadlock if a process decides not + # to exit. + self.shutdown(wait=False, cancel_futures=True) + + if not processes: + return + + for proc in processes.values(): + try: + if not proc.is_alive(): + continue + except ValueError: + # The process is already exited/closed out. + continue + + try: + if operation == _TERMINATE: + proc.terminate() + elif operation == _KILL: + proc.kill() + except ProcessLookupError: + # The process just ended before our signal + continue + + def terminate_workers(self): + """Attempts to terminate the executor's workers. + Iterates through all of the current worker processes and terminates + each one that is still alive. + + After terminating workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + return self._force_shutdown(operation=_TERMINATE) + + def kill_workers(self): + """Attempts to kill the executor's workers. + Iterates through all of the current worker processes and kills + each one that is still alive. + + After killing workers, the pool will be in a broken state + and no longer usable (for instance, new tasks should not be + submitted). + """ + return self._force_shutdown(operation=_KILL) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 8b1bdaa33d8f5c..62503bfa55774b 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,13 +1,17 @@ import os +import queue +import signal import sys import threading import time import unittest +import unittest.mock from concurrent import futures from concurrent.futures.process import BrokenProcessPool from test import support from test.support import hashlib_helper +from test.test_importlib.metadata.fixtures import parameterize from .executor import ExecutorTest, mul from .util import ( @@ -22,6 +26,21 @@ def __init__(self, mgr): def __del__(self): self.event.set() +TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__ +KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__ +FORCE_SHUTDOWN_PARAMS = [ + dict(function_name=TERMINATE_WORKERS), + dict(function_name=KILL_WORKERS), +] + +def _put_wait_put(queue, event): + """ Used as part of test_terminate_workers """ + queue.put('started') + event.wait() + + # We should never get here since the event will not get set + queue.put('finished') + class ProcessPoolExecutorTest(ExecutorTest): @@ -218,6 +237,107 @@ def mock_start_new_thread(func, *args, **kwargs): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() + def test_terminate_workers(self): + mock_fn = unittest.mock.Mock() + with self.executor_type(max_workers=1) as executor: + executor._force_shutdown = mock_fn + executor.terminate_workers() + + mock_fn.assert_called_once_with(operation=futures.process._TERMINATE) + + def test_kill_workers(self): + mock_fn = unittest.mock.Mock() + with self.executor_type(max_workers=1) as executor: + executor._force_shutdown = mock_fn + executor.kill_workers() + + mock_fn.assert_called_once_with(operation=futures.process._KILL) + + def test_force_shutdown_workers_invalid_op(self): + with self.executor_type(max_workers=1) as executor: + self.assertRaises(ValueError, + executor._force_shutdown, + operation='invalid operation'), + + @parameterize(*FORCE_SHUTDOWN_PARAMS) + def test_force_shutdown_workers(self, function_name): + manager = self.get_context().Manager() + q = manager.Queue() + e = manager.Event() + + with self.executor_type(max_workers=1) as executor: + executor.submit(_put_wait_put, q, e) + + # We should get started, but not finished since we'll terminate the + # workers just after and never set the event. + self.assertEqual(q.get(timeout=support.SHORT_TIMEOUT), 'started') + + worker_process = list(executor._processes.values())[0] + + Mock = unittest.mock.Mock + worker_process.terminate = Mock(wraps=worker_process.terminate) + worker_process.kill = Mock(wraps=worker_process.kill) + + getattr(executor, function_name)() + worker_process.join() + + if function_name == TERMINATE_WORKERS: + worker_process.terminate.assert_called() + elif function_name == KILL_WORKERS: + worker_process.kill.assert_called() + else: + self.fail(f"Unknown operation: {function_name}") + + self.assertRaises(queue.Empty, q.get, timeout=0.01) + + @parameterize(*FORCE_SHUTDOWN_PARAMS) + def test_force_shutdown_workers_dead_workers(self, function_name): + with self.executor_type(max_workers=1) as executor: + future = executor.submit(os._exit, 1) + self.assertRaises(BrokenProcessPool, future.result) + + # even though the pool is broken, this shouldn't raise + getattr(executor, function_name)() + + @parameterize(*FORCE_SHUTDOWN_PARAMS) + def test_force_shutdown_workers_not_started_yet(self, function_name): + ctx = self.get_context() + with unittest.mock.patch.object(ctx, 'Process') as mock_process: + with self.executor_type(max_workers=1, mp_context=ctx) as executor: + # The worker has not been started yet, terminate/kill_workers + # should basically no-op + getattr(executor, function_name)() + + mock_process.return_value.kill.assert_not_called() + mock_process.return_value.terminate.assert_not_called() + + @parameterize(*FORCE_SHUTDOWN_PARAMS) + def test_force_shutdown_workers_stops_pool(self, function_name): + with self.executor_type(max_workers=1) as executor: + task = executor.submit(time.sleep, 0) + self.assertIsNone(task.result()) + + worker_process = list(executor._processes.values())[0] + getattr(executor, function_name)() + + self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) + + # A signal sent, is not a signal reacted to. + # So wait a moment here for the process to die. + # If we don't, every once in a while we may get an ENV CHANGE + # error since the process would be alive immediately after the + # test run.. and die a moment later. + worker_process.join(support.SHORT_TIMEOUT) + + # Oddly enough, even though join completes, sometimes it takes a + # moment for the process to actually be marked as dead. + # ... that seems a bit buggy. + # We need it dead before ending the test to ensure it doesn't + # get marked as an ENV CHANGE due to living child process. + for _ in support.sleeping_retry(support.SHORT_TIMEOUT): + if not worker_process.is_alive(): + break + create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst new file mode 100644 index 00000000000000..cd1ea7e9bc8635 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst @@ -0,0 +1,4 @@ +Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and +:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as +ways to terminate or kill all living worker processes in the given pool. +(Contributed by Charles Machalow in :gh:`130849`.) _______________________________________________ Python-checkins mailing list -- python-checkins@python.org To unsubscribe send an email to python-checkins-le...@python.org https://mail.python.org/mailman3/lists/python-checkins.python.org/ Member address: arch...@mail-archive.com