https://github.com/python/cpython/commit/4172644d78d58189e46424af0aea302b1d78e2de
commit: 4172644d78d58189e46424af0aea302b1d78e2de
branch: main
author: Petr Viktorin <[email protected]>
committer: encukou <[email protected]>
date: 2025-12-03T12:59:14Z
summary:
gh-142206: multiprocessing.resource_tracker: Decode messages using older
protocol (GH-142215)
files:
A Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst
M Lib/multiprocessing/resource_tracker.py
M Lib/test/_test_multiprocessing.py
diff --git a/Lib/multiprocessing/resource_tracker.py
b/Lib/multiprocessing/resource_tracker.py
index b0f9099f4a59f3..3606d1effb495b 100644
--- a/Lib/multiprocessing/resource_tracker.py
+++ b/Lib/multiprocessing/resource_tracker.py
@@ -68,6 +68,13 @@ def __init__(self):
self._exitcode = None
self._reentrant_messages = deque()
+ # True to use colon-separated lines, rather than JSON lines,
+ # for internal communication. (Mainly for testing).
+ # Filenames not supported by the simple format will always be sent
+ # using JSON.
+ # The reader should understand all formats.
+ self._use_simple_format = False
+
def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
# gets interrupted by a garbage collection, invoking a finalizer (*)
@@ -200,7 +207,9 @@ def _launch(self):
os.close(r)
def _make_probe_message(self):
- """Return a JSON-encoded probe message."""
+ """Return a probe message."""
+ if self._use_simple_format:
+ return b'PROBE:0:noop\n'
return (
json.dumps(
{"cmd": "PROBE", "rtype": "noop"},
@@ -267,6 +276,15 @@ def _write(self, msg):
assert nbytes == len(msg), f"{nbytes=} != {len(msg)=}"
def _send(self, cmd, name, rtype):
+ if self._use_simple_format and '\n' not in name:
+ msg = f"{cmd}:{name}:{rtype}\n".encode("ascii")
+ if len(msg) > 512:
+ # posix guarantees that writes to a pipe of less than PIPE_BUF
+ # bytes are atomic, and that PIPE_BUF >= 512
+ raise ValueError('msg too long')
+ self._ensure_running_and_write(msg)
+ return
+
# POSIX guarantees that writes to a pipe of less than PIPE_BUF (512 on
Linux)
# bytes are atomic. Therefore, we want the message to be shorter than
512 bytes.
# POSIX shm_open() and sem_open() require the name, including its
leading slash,
@@ -286,6 +304,7 @@ def _send(self, cmd, name, rtype):
# The entire JSON message is guaranteed < PIPE_BUF (512 bytes) by
construction.
assert len(msg) <= 512, f"internal error: message too long ({len(msg)}
bytes)"
+ assert msg.startswith(b'{')
self._ensure_running_and_write(msg)
@@ -296,6 +315,30 @@ def _send(self, cmd, name, rtype):
getfd = _resource_tracker.getfd
+def _decode_message(line):
+ if line.startswith(b'{'):
+ try:
+ obj = json.loads(line.decode('ascii'))
+ except Exception as e:
+ raise ValueError("malformed resource_tracker message: %r" %
(line,)) from e
+
+ cmd = obj["cmd"]
+ rtype = obj["rtype"]
+ b64 = obj.get("base64_name", "")
+
+ if not isinstance(cmd, str) or not isinstance(rtype, str) or not
isinstance(b64, str):
+ raise ValueError("malformed resource_tracker fields: %r" % (obj,))
+
+ try:
+ name = base64.urlsafe_b64decode(b64).decode('utf-8',
'surrogateescape')
+ except ValueError as e:
+ raise ValueError("malformed resource_tracker base64_name: %r" %
(b64,)) from e
+ else:
+ cmd, rest = line.strip().decode('ascii').split(':', maxsplit=1)
+ name, rtype = rest.rsplit(':', maxsplit=1)
+ return cmd, rtype, name
+
+
def main(fd):
'''Run resource tracker.'''
# protect the process from ^C and "killall python" etc
@@ -318,23 +361,7 @@ def main(fd):
with open(fd, 'rb') as f:
for line in f:
try:
- try:
- obj = json.loads(line.decode('ascii'))
- except Exception as e:
- raise ValueError("malformed resource_tracker message:
%r" % (line,)) from e
-
- cmd = obj["cmd"]
- rtype = obj["rtype"]
- b64 = obj.get("base64_name", "")
-
- if not isinstance(cmd, str) or not isinstance(rtype, str)
or not isinstance(b64, str):
- raise ValueError("malformed resource_tracker fields:
%r" % (obj,))
-
- try:
- name = base64.urlsafe_b64decode(b64).decode('utf-8',
'surrogateescape')
- except ValueError as e:
- raise ValueError("malformed resource_tracker
base64_name: %r" % (b64,)) from e
-
+ cmd, rtype, name = _decode_message(line)
cleanup_func = _CLEANUP_FUNCS.get(rtype, None)
if cleanup_func is None:
raise ValueError(
diff --git a/Lib/test/_test_multiprocessing.py
b/Lib/test/_test_multiprocessing.py
index d718a27231897f..d03eb1dfb253ec 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -39,6 +39,7 @@
from test.support import socket_helper
from test.support import threading_helper
from test.support import warnings_helper
+from test.support import subTests
from test.support.script_helper import assert_python_failure, assert_python_ok
# Skip tests if _multiprocessing wasn't built.
@@ -4383,6 +4384,19 @@ def test_copy(self):
self.assertEqual(bar.z, 2 ** 33)
+def resource_tracker_format_subtests(func):
+ """Run given test using both resource tracker communication formats"""
+ def _inner(self, *args, **kwargs):
+ tracker = resource_tracker._resource_tracker
+ for use_simple_format in False, True:
+ with (
+ self.subTest(use_simple_format=use_simple_format),
+ unittest.mock.patch.object(
+ tracker, '_use_simple_format', use_simple_format)
+ ):
+ func(self, *args, **kwargs)
+ return _inner
+
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
@hashlib_helper.requires_hashdigest('sha256')
class _TestSharedMemory(BaseTestCase):
@@ -4662,6 +4676,7 @@ def
test_shared_memory_SharedMemoryServer_ignores_sigint(self):
smm.shutdown()
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
+ @resource_tracker_format_subtests
def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
# bpo-36867: test that a SharedMemoryManager uses the
# same resource_tracker process as its parent.
@@ -4913,6 +4928,7 @@ def
test_shared_memory_cleaned_after_process_termination(self):
"shared_memory objects to clean up at shutdown", err)
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
+ @resource_tracker_format_subtests
def test_shared_memory_untracking(self):
# gh-82300: When a separate Python process accesses shared memory
# with track=False, it must not cause the memory to be deleted
@@ -4940,6 +4956,7 @@ def test_shared_memory_untracking(self):
mem.close()
@unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
+ @resource_tracker_format_subtests
def test_shared_memory_tracking(self):
# gh-82300: When a separate Python process accesses shared memory
# with track=True, it must cause the memory to be deleted when
@@ -7353,13 +7370,18 @@ def test_forkpty(self):
@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
class TestSharedMemoryNames(unittest.TestCase):
- def
test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self):
+ @subTests('use_simple_format', (True, False))
+ def
test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(
+ self, use_simple_format):
# Test script that creates and cleans up shared memory with colon in
name
test_script = textwrap.dedent("""
import sys
from multiprocessing import shared_memory
+ from multiprocessing import resource_tracker
import time
+ resource_tracker._resource_tracker._use_simple_format = %s
+
# Test various patterns of colons in names
test_names = [
"a:b",
@@ -7387,7 +7409,7 @@ def
test_that_shared_memory_name_with_colons_has_no_resource_tracker_errors(self
sys.exit(1)
print("SUCCESS")
- """)
+ """ % use_simple_format)
rc, out, err = assert_python_ok("-c", test_script)
self.assertIn(b"SUCCESS", out)
diff --git
a/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst
b/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst
new file mode 100644
index 00000000000000..90e4dd96985979
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-12-03-09-36-29.gh-issue-142206.ilwegH.rst
@@ -0,0 +1,4 @@
+The resource tracker in the :mod:`multiprocessing` module can now understand
+messages from older versions of itself. This avoids issues with upgrading
+Python while it is running. (Note that such 'in-place' upgrades are not
+tested.)
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]