https://github.com/python/cpython/commit/f24a012350f71141648cbd61081a25a458dd7fff
commit: f24a012350f71141648cbd61081a25a458dd7fff
branch: main
author: Thomas Grainger <tagr...@gmail.com>
committer: gpshead <68491+gpsh...@users.noreply.github.com>
date: 2025-08-13T20:00:23Z
summary:

gh-131788: make resource_tracker re-entrant safe (GH-131787)

* make resource_tracker re-entrant safe
* Update Lib/multiprocessing/resource_tracker.py
* trim trailing whitespace
* use f-string and args = [x, *y, z]
* raise self._reentrant_call_error

---------

Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Gregory P. Smith <g...@krypto.org>

files:
A Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst
M Lib/multiprocessing/resource_tracker.py

diff --git a/Lib/multiprocessing/resource_tracker.py 
b/Lib/multiprocessing/resource_tracker.py
index c4d0ca81e7034a..c53092f6e34b32 100644
--- a/Lib/multiprocessing/resource_tracker.py
+++ b/Lib/multiprocessing/resource_tracker.py
@@ -20,6 +20,7 @@
 import sys
 import threading
 import warnings
+from collections import deque
 
 from . import spawn
 from . import util
@@ -62,6 +63,7 @@ def __init__(self):
         self._fd = None
         self._pid = None
         self._exitcode = None
+        self._reentrant_messages = deque()
 
     def _reentrant_call_error(self):
         # gh-109629: this happens if an explicit call to the ResourceTracker
@@ -98,7 +100,7 @@ def _stop_locked(
         # This shouldn't happen (it might when called by a finalizer)
         # so we check for it anyway.
         if self._lock._recursion_count() > 1:
-            return self._reentrant_call_error()
+            raise self._reentrant_call_error()
         if self._fd is None:
             # not running
             return
@@ -128,69 +130,99 @@ def ensure_running(self):
 
         This can be run from any process.  Usually a child process will use
         the resource created by its parent.'''
+        return self._ensure_running_and_write()
+
+    def _teardown_dead_process(self):
+        os.close(self._fd)
+
+        # Clean-up to avoid dangling processes.
+        try:
+            # _pid can be None if this process is a child from another
+            # python process, which has started the resource_tracker.
+            if self._pid is not None:
+                os.waitpid(self._pid, 0)
+        except ChildProcessError:
+            # The resource_tracker has already been terminated.
+            pass
+        self._fd = None
+        self._pid = None
+        self._exitcode = None
+
+        warnings.warn('resource_tracker: process died unexpectedly, '
+                      'relaunching.  Some resources might leak.')
+
+    def _launch(self):
+        fds_to_pass = []
+        try:
+            fds_to_pass.append(sys.stderr.fileno())
+        except Exception:
+            pass
+        r, w = os.pipe()
+        try:
+            fds_to_pass.append(r)
+            # process will out live us, so no need to wait on pid
+            exe = spawn.get_executable()
+            args = [
+                exe,
+                *util._args_from_interpreter_flags(),
+                '-c',
+                f'from multiprocessing.resource_tracker import main;main({r})',
+            ]
+            # bpo-33613: Register a signal mask that will block the signals.
+            # This signal mask will be inherited by the child that is going
+            # to be spawned and will protect the child from a race condition
+            # that can make the child die before it registers signal handlers
+            # for SIGINT and SIGTERM. The mask is unregistered after spawning
+            # the child.
+            prev_sigmask = None
+            try:
+                if _HAVE_SIGMASK:
+                    prev_sigmask = signal.pthread_sigmask(signal.SIG_BLOCK, 
_IGNORED_SIGNALS)
+                pid = util.spawnv_passfds(exe, args, fds_to_pass)
+            finally:
+                if prev_sigmask is not None:
+                    signal.pthread_sigmask(signal.SIG_SETMASK, prev_sigmask)
+        except:
+            os.close(w)
+            raise
+        else:
+            self._fd = w
+            self._pid = pid
+        finally:
+            os.close(r)
+
+    def _ensure_running_and_write(self, msg=None):
         with self._lock:
             if self._lock._recursion_count() > 1:
                 # The code below is certainly not reentrant-safe, so bail out
-                return self._reentrant_call_error()
+                if msg is None:
+                    raise self._reentrant_call_error()
+                return self._reentrant_messages.append(msg)
+
             if self._fd is not None:
                 # resource tracker was launched before, is it still running?
-                if self._check_alive():
-                    # => still alive
-                    return
-                # => dead, launch it again
-                os.close(self._fd)
-
-                # Clean-up to avoid dangling processes.
+                if msg is None:
+                    to_send = b'PROBE:0:noop\n'
+                else:
+                    to_send = msg
                 try:
-                    # _pid can be None if this process is a child from another
-                    # python process, which has started the resource_tracker.
-                    if self._pid is not None:
-                        os.waitpid(self._pid, 0)
-                except ChildProcessError:
-                    # The resource_tracker has already been terminated.
-                    pass
-                self._fd = None
-                self._pid = None
-                self._exitcode = None
+                    self._write(to_send)
+                except OSError:
+                    self._teardown_dead_process()
+                    self._launch()
 
-                warnings.warn('resource_tracker: process died unexpectedly, '
-                              'relaunching.  Some resources might leak.')
+                msg = None  # message was sent in probe
+            else:
+                self._launch()
 
-            fds_to_pass = []
+        while True:
             try:
-                fds_to_pass.append(sys.stderr.fileno())
-            except Exception:
-                pass
-            cmd = 'from multiprocessing.resource_tracker import main;main(%d)'
-            r, w = os.pipe()
-            try:
-                fds_to_pass.append(r)
-                # process will out live us, so no need to wait on pid
-                exe = spawn.get_executable()
-                args = [exe] + util._args_from_interpreter_flags()
-                args += ['-c', cmd % r]
-                # bpo-33613: Register a signal mask that will block the 
signals.
-                # This signal mask will be inherited by the child that is going
-                # to be spawned and will protect the child from a race 
condition
-                # that can make the child die before it registers signal 
handlers
-                # for SIGINT and SIGTERM. The mask is unregistered after 
spawning
-                # the child.
-                prev_sigmask = None
-                try:
-                    if _HAVE_SIGMASK:
-                        prev_sigmask = 
signal.pthread_sigmask(signal.SIG_BLOCK, _IGNORED_SIGNALS)
-                    pid = util.spawnv_passfds(exe, args, fds_to_pass)
-                finally:
-                    if prev_sigmask is not None:
-                        signal.pthread_sigmask(signal.SIG_SETMASK, 
prev_sigmask)
-            except:
-                os.close(w)
-                raise
-            else:
-                self._fd = w
-                self._pid = pid
-            finally:
-                os.close(r)
+                reentrant_msg = self._reentrant_messages.popleft()
+            except IndexError:
+                break
+            self._write(reentrant_msg)
+        if msg is not None:
+            self._write(msg)
 
     def _check_alive(self):
         '''Check that the pipe has not been closed by sending a probe.'''
@@ -211,27 +243,18 @@ def unregister(self, name, rtype):
         '''Unregister name of resource with resource tracker.'''
         self._send('UNREGISTER', name, rtype)
 
+    def _write(self, msg):
+        nbytes = os.write(self._fd, msg)
+        assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
+
     def _send(self, cmd, name, rtype):
-        try:
-            self.ensure_running()
-        except ReentrantCallError:
-            # The code below might or might not work, depending on whether
-            # the resource tracker was already running and still alive.
-            # Better warn the user.
-            # (XXX is warnings.warn itself reentrant-safe? :-)
-            warnings.warn(
-                f"ResourceTracker called reentrantly for resource cleanup, "
-                f"which is unsupported. "
-                f"The {rtype} object {name!r} might leak.")
-        msg = '{0}:{1}:{2}\n'.format(cmd, name, rtype).encode('ascii')
+        msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
         if len(msg) > 512:
             # posix guarantees that writes to a pipe of less than PIPE_BUF
             # bytes are atomic, and that PIPE_BUF >= 512
             raise ValueError('msg too long')
-        nbytes = os.write(self._fd, msg)
-        assert nbytes == len(msg), "nbytes {0:n} but len(msg) {1:n}".format(
-            nbytes, len(msg))
 
+        self._ensure_running_and_write(msg)
 
 _resource_tracker = ResourceTracker()
 ensure_running = _resource_tracker.ensure_running
diff --git 
a/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst 
b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst
new file mode 100644
index 00000000000000..525802405bd8bd
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-03-27-08-13-32.gh-issue-131788.0RWiFc.rst
@@ -0,0 +1 @@
+Make ``ResourceTracker.send`` from :mod:`multiprocessing` re-entrant safe

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: arch...@mail-archive.com

Reply via email to