https://github.com/python/cpython/commit/efadc5874cdecc0420926afd5540b9b25c5e97fe commit: efadc5874cdecc0420926afd5540b9b25c5e97fe branch: main author: Sam Gross <colesb...@gmail.com> committer: colesbury <colesb...@gmail.com> date: 2025-03-04T11:19:06-05:00 summary:
Revert "gh-128041: Add `terminate_workers` and `kill_workers` methods to ProcessPoolExecutor (GH-128043)" (#130838) The test_concurrent_futures.test_process_pool test is failing in CI. This reverts commit f97e4098ff71a6488fd3411f9f9e6fa7a7bb4efe. files: D Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst M Doc/library/concurrent.futures.rst M Doc/whatsnew/3.14.rst M Lib/concurrent/futures/process.py M Lib/test/test_concurrent_futures/test_process_pool.py diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index dc613f2f8f00cd..5a950081a1c98d 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -415,30 +415,6 @@ to a :class:`ProcessPoolExecutor` will result in deadlock. require the *fork* start method for :class:`ProcessPoolExecutor` you must explicitly pass ``mp_context=multiprocessing.get_context("fork")``. - .. method:: terminate_workers() - - Attempt to terminate all living worker processes immediately by calling - :meth:`Process.terminate <multiprocessing.Process.terminate>` on each of them. - Internally, it will also call :meth:`Executor.shutdown` to ensure that all - other resources associated with the executor are freed. - - After calling this method the caller should no longer submit tasks to the - executor. - - .. versionadded:: next - - .. method:: kill_workers() - - Attempt to kill all living worker processes immediately by calling - :meth:`Process.kill <multiprocessing.Process.kill>` on each of them. - Internally, it will also call :meth:`Executor.shutdown` to ensure that all - other resources associated with the executor are freed. - - After calling this method the caller should no longer submit tasks to the - executor. - - .. versionadded:: next - .. _processpoolexecutor-example: ProcessPoolExecutor Example diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index 0721ea5cb8d5d1..aa802faae50b12 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -444,11 +444,6 @@ contextvars * Support context manager protocol by :class:`contextvars.Token`. (Contributed by Andrew Svetlov in :gh:`129889`.) -* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and - :meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as - ways to terminate or kill all living worker processes in the given pool. - (Contributed by Charles Machalow in :gh:`128043`.) - ctypes ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index d79d6b959c90d3..42eee72bc1457f 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -626,14 +626,6 @@ class BrokenProcessPool(_base.BrokenExecutor): while a future was in the running state. """ -_TERMINATE = "terminate" -_KILL = "kill" - -_SHUTDOWN_CALLBACK_OPERATION = { - _TERMINATE, - _KILL -} - class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, mp_context=None, @@ -863,66 +855,3 @@ def shutdown(self, wait=True, *, cancel_futures=False): self._executor_manager_thread_wakeup = None shutdown.__doc__ = _base.Executor.shutdown.__doc__ - - def _force_shutdown(self, operation): - """Attempts to terminate or kill the executor's workers based off the - given operation. Iterates through all of the current processes and - performs the relevant task if the process is still alive. - - After terminating workers, the pool will be in a broken state - and no longer usable (for instance, new tasks should not be - submitted). - """ - if operation not in _SHUTDOWN_CALLBACK_OPERATION: - raise ValueError(f"Unsupported operation: {operation!r}") - - processes = {} - if self._processes: - processes = self._processes.copy() - - # shutdown will invalidate ._processes, so we copy it right before - # calling. If we waited here, we would deadlock if a process decides not - # to exit. - self.shutdown(wait=False, cancel_futures=True) - - if not processes: - return - - for proc in processes.values(): - try: - if not proc.is_alive(): - continue - except ValueError: - # The process is already exited/closed out. - continue - - try: - if operation == _TERMINATE: - proc.terminate() - elif operation == _KILL: - proc.kill() - except ProcessLookupError: - # The process just ended before our signal - continue - - def terminate_workers(self): - """Attempts to terminate the executor's workers. - Iterates through all of the current worker processes and terminates - each one that is still alive. - - After terminating workers, the pool will be in a broken state - and no longer usable (for instance, new tasks should not be - submitted). - """ - return self._force_shutdown(operation=_TERMINATE) - - def kill_workers(self): - """Attempts to kill the executor's workers. - Iterates through all of the current worker processes and kills - each one that is still alive. - - After killing workers, the pool will be in a broken state - and no longer usable (for instance, new tasks should not be - submitted). - """ - return self._force_shutdown(operation=_KILL) diff --git a/Lib/test/test_concurrent_futures/test_process_pool.py b/Lib/test/test_concurrent_futures/test_process_pool.py index 354b7d0a346970..8b1bdaa33d8f5c 100644 --- a/Lib/test/test_concurrent_futures/test_process_pool.py +++ b/Lib/test/test_concurrent_futures/test_process_pool.py @@ -1,17 +1,13 @@ import os -import queue -import signal import sys import threading import time import unittest -import unittest.mock from concurrent import futures from concurrent.futures.process import BrokenProcessPool from test import support from test.support import hashlib_helper -from test.test_importlib.metadata.fixtures import parameterize from .executor import ExecutorTest, mul from .util import ( @@ -26,19 +22,6 @@ def __init__(self, mgr): def __del__(self): self.event.set() -TERMINATE_WORKERS = futures.ProcessPoolExecutor.terminate_workers.__name__ -KILL_WORKERS = futures.ProcessPoolExecutor.kill_workers.__name__ -FORCE_SHUTDOWN_PARAMS = [ - dict(function_name=TERMINATE_WORKERS), - dict(function_name=KILL_WORKERS), -] - -def _put_sleep_put(queue): - """ Used as part of test_terminate_workers """ - queue.put('started') - time.sleep(2) - queue.put('finished') - class ProcessPoolExecutorTest(ExecutorTest): @@ -235,86 +218,6 @@ def mock_start_new_thread(func, *args, **kwargs): list(executor.map(mul, [(2, 3)] * 10)) executor.shutdown() - def test_terminate_workers(self): - mock_fn = unittest.mock.Mock() - with self.executor_type(max_workers=1) as executor: - executor._force_shutdown = mock_fn - executor.terminate_workers() - - mock_fn.assert_called_once_with(operation=futures.process._TERMINATE) - - def test_kill_workers(self): - mock_fn = unittest.mock.Mock() - with self.executor_type(max_workers=1) as executor: - executor._force_shutdown = mock_fn - executor.kill_workers() - - mock_fn.assert_called_once_with(operation=futures.process._KILL) - - def test_force_shutdown_workers_invalid_op(self): - with self.executor_type(max_workers=1) as executor: - self.assertRaises(ValueError, - executor._force_shutdown, - operation='invalid operation'), - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers(self, function_name): - manager = self.get_context().Manager() - q = manager.Queue() - - with self.executor_type(max_workers=1) as executor: - executor.submit(_put_sleep_put, q) - - # We should get started, but not finished since we'll terminate the - # workers just after - self.assertEqual(q.get(timeout=5), 'started') - - worker_process = list(executor._processes.values())[0] - getattr(executor, function_name)() - worker_process.join() - - if function_name == TERMINATE_WORKERS or \ - sys.platform == 'win32': - # On windows, kill and terminate both send SIGTERM - self.assertEqual(worker_process.exitcode, -signal.SIGTERM) - elif function_name == KILL_WORKERS: - self.assertEqual(worker_process.exitcode, -signal.SIGKILL) - else: - self.fail(f"Unknown operation: {function_name}") - - self.assertRaises(queue.Empty, q.get, timeout=1) - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers_dead_workers(self, function_name): - with self.executor_type(max_workers=1) as executor: - future = executor.submit(os._exit, 1) - self.assertRaises(BrokenProcessPool, future.result) - - # even though the pool is broken, this shouldn't raise - getattr(executor, function_name)() - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers_not_started_yet(self, function_name): - ctx = self.get_context() - with unittest.mock.patch.object(ctx, 'Process') as mock_process: - with self.executor_type(max_workers=1, mp_context=ctx) as executor: - # The worker has not been started yet, terminate/kill_workers - # should basically no-op - getattr(executor, function_name)() - - mock_process.return_value.kill.assert_not_called() - mock_process.return_value.terminate.assert_not_called() - - @parameterize(*FORCE_SHUTDOWN_PARAMS) - def test_force_shutdown_workers_stops_pool(self, function_name): - with self.executor_type(max_workers=1) as executor: - task = executor.submit(time.sleep, 0) - self.assertIsNone(task.result()) - - getattr(executor, function_name)() - - self.assertRaises(RuntimeError, executor.submit, time.sleep, 0) - create_executor_tests(globals(), ProcessPoolExecutorTest, executor_mixins=(ProcessPoolForkMixin, diff --git a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst b/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst deleted file mode 100644 index bb9ef96d45eb79..00000000000000 --- a/Misc/NEWS.d/next/Library/2024-12-17-18-53-21.gh-issue-128041.W96kAr.rst +++ /dev/null @@ -1,4 +0,0 @@ -Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` and -:meth:`concurrent.futures.ProcessPoolExecutor.kill_workers` as -ways to terminate or kill all living worker processes in the given pool. -(Contributed by Charles Machalow in :gh:`128043`.) _______________________________________________ Python-checkins mailing list -- python-checkins@python.org To unsubscribe send an email to python-checkins-le...@python.org https://mail.python.org/mailman3/lists/python-checkins.python.org/ Member address: arch...@mail-archive.com