https://github.com/python/cpython/commit/fd37f1a8ad8e244ddccfc76608b02b6e29e30e16
commit: fd37f1a8ad8e244ddccfc76608b02b6e29e30e16
branch: main
author: Matt Wozniski <mwozni...@bloomberg.net>
committer: pablogsal <pablog...@gmail.com>
date: 2025-05-06T11:44:49+02:00
summary:

gh-133490: Fix syntax highlighting for remote PDB (#133494)

files:
M Lib/_pyrepl/utils.py
M Lib/pdb.py
M Lib/test/test_remote_pdb.py

diff --git a/Lib/_pyrepl/utils.py b/Lib/_pyrepl/utils.py
index dd327d6990234c..38cf6b5a08e892 100644
--- a/Lib/_pyrepl/utils.py
+++ b/Lib/_pyrepl/utils.py
@@ -23,9 +23,9 @@
 BUILTINS = {str(name) for name in dir(builtins) if not name.startswith('_')}
 
 
-def THEME():
+def THEME(**kwargs):
     # Not cached: the user can modify the theme inside the interactive session.
-    return _colorize.get_theme().syntax
+    return _colorize.get_theme(**kwargs).syntax
 
 
 class Span(NamedTuple):
@@ -254,7 +254,10 @@ def is_soft_keyword_used(*tokens: TI | None) -> bool:
 
 
 def disp_str(
-    buffer: str, colors: list[ColorSpan] | None = None, start_index: int = 0
+    buffer: str,
+    colors: list[ColorSpan] | None = None,
+    start_index: int = 0,
+    force_color: bool = False,
 ) -> tuple[CharBuffer, CharWidths]:
     r"""Decompose the input buffer into a printable variant with applied 
colors.
 
@@ -295,7 +298,7 @@ def disp_str(
         # move past irrelevant spans
         colors.pop(0)
 
-    theme = THEME()
+    theme = THEME(force_color=force_color)
     pre_color = ""
     post_color = ""
     if colors and colors[0].span.start < start_index:
diff --git a/Lib/pdb.py b/Lib/pdb.py
index 4efda171b7a813..f89d104fcddb9a 100644
--- a/Lib/pdb.py
+++ b/Lib/pdb.py
@@ -1069,7 +1069,7 @@ def handle_command_def(self, line):
     def _colorize_code(self, code):
         if self.colorize:
             colors = list(_pyrepl.utils.gen_colors(code))
-            chars, _ = _pyrepl.utils.disp_str(code, colors=colors)
+            chars, _ = _pyrepl.utils.disp_str(code, colors=colors, 
force_color=True)
             code = "".join(chars)
         return code
 
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py
index 7c77bedb766c53..aef8a6b0129092 100644
--- a/Lib/test/test_remote_pdb.py
+++ b/Lib/test/test_remote_pdb.py
@@ -3,6 +3,7 @@
 import itertools
 import json
 import os
+import re
 import signal
 import socket
 import subprocess
@@ -12,9 +13,9 @@
 import threading
 import unittest
 import unittest.mock
-from contextlib import closing, contextmanager, redirect_stdout, ExitStack
+from contextlib import closing, contextmanager, redirect_stdout, 
redirect_stderr, ExitStack
 from pathlib import Path
-from test.support import is_wasi, os_helper, requires_subprocess, SHORT_TIMEOUT
+from test.support import is_wasi, cpython_only, force_color, 
requires_subprocess, SHORT_TIMEOUT
 from test.support.os_helper import temp_dir, TESTFN, unlink
 from typing import Dict, List, Optional, Tuple, Union, Any
 
@@ -1431,5 +1432,152 @@ def test_multi_line_commands(self):
             self.assertIn("Function returned: 42", stdout)
             self.assertEqual(process.returncode, 0)
 
+
+def _supports_remote_attaching():
+    from contextlib import suppress
+    PROCESS_VM_READV_SUPPORTED = False
+
+    try:
+        from _remote_debugging import PROCESS_VM_READV_SUPPORTED
+    except ImportError:
+        pass
+
+    return PROCESS_VM_READV_SUPPORTED
+
+
+@unittest.skipIf(not sys.is_remote_debug_enabled(), "Remote debugging is not 
enabled")
+@unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux" and 
sys.platform != "win32",
+                    "Test only runs on Linux, Windows and MacOS")
+@unittest.skipIf(sys.platform == "linux" and not _supports_remote_attaching(),
+                    "Testing on Linux requires process_vm_readv support")
+@cpython_only
+@requires_subprocess()
+class PdbAttachTestCase(unittest.TestCase):
+    def setUp(self):
+        # Create a server socket that will wait for the debugger to connect
+        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock.bind(('127.0.0.1', 0))  # Let OS assign port
+        self.sock.listen(1)
+        self.port = self.sock.getsockname()[1]
+        self._create_script()
+
+    def _create_script(self, script=None):
+        # Create a file for subprocess script
+        script = textwrap.dedent(
+            f"""
+            import socket
+            import time
+
+            def foo():
+                return bar()
+
+            def bar():
+                return baz()
+
+            def baz():
+                x = 1
+                # Trigger attach
+                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+                sock.connect(('127.0.0.1', {self.port}))
+                sock.close()
+                count = 0
+                while x == 1 and count < 100:
+                    count += 1
+                    time.sleep(0.1)
+                return x
+
+            result = foo()
+            print(f"Function returned: {{result}}")
+            """
+        )
+
+        self.script_path = TESTFN + "_connect_test.py"
+        with open(self.script_path, 'w') as f:
+            f.write(script)
+
+    def tearDown(self):
+        self.sock.close()
+        try:
+            unlink(self.script_path)
+        except OSError:
+            pass
+
+    def do_integration_test(self, client_stdin):
+        process = subprocess.Popen(
+            [sys.executable, self.script_path],
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            text=True
+        )
+        self.addCleanup(process.stdout.close)
+        self.addCleanup(process.stderr.close)
+
+        # Wait for the process to reach our attachment point
+        self.sock.settimeout(10)
+        conn, _ = self.sock.accept()
+        conn.close()
+
+        client_stdin = io.StringIO(client_stdin)
+        client_stdout = io.StringIO()
+        client_stderr = io.StringIO()
+
+        self.addCleanup(client_stdin.close)
+        self.addCleanup(client_stdout.close)
+        self.addCleanup(client_stderr.close)
+        self.addCleanup(process.wait)
+
+        with (
+            unittest.mock.patch("sys.stdin", client_stdin),
+            redirect_stdout(client_stdout),
+            redirect_stderr(client_stderr),
+            unittest.mock.patch("sys.argv", ["pdb", "-p", str(process.pid)]),
+        ):
+            try:
+                pdb.main()
+            except PermissionError:
+                self.skipTest("Insufficient permissions for remote execution")
+
+        process.wait()
+        server_stdout = process.stdout.read()
+        server_stderr = process.stderr.read()
+
+        if process.returncode != 0:
+            print("server failed")
+            print(f"server stdout:\n{server_stdout}")
+            print(f"server stderr:\n{server_stderr}")
+
+        self.assertEqual(process.returncode, 0)
+        return {
+            "client": {
+                "stdout": client_stdout.getvalue(),
+                "stderr": client_stderr.getvalue(),
+            },
+            "server": {
+                "stdout": server_stdout,
+                "stderr": server_stderr,
+            },
+        }
+
+    def test_attach_to_process_without_colors(self):
+        with force_color(False):
+            output = self.do_integration_test("ll\nx=42\n")
+        self.assertEqual(output["client"]["stderr"], "")
+        self.assertEqual(output["server"]["stderr"], "")
+
+        self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+        self.assertIn("while x == 1", output["client"]["stdout"])
+        self.assertNotIn("\x1b", output["client"]["stdout"])
+
+    def test_attach_to_process_with_colors(self):
+        with force_color(True):
+            output = self.do_integration_test("ll\nx=42\n")
+        self.assertEqual(output["client"]["stderr"], "")
+        self.assertEqual(output["server"]["stderr"], "")
+
+        self.assertEqual(output["server"]["stdout"], "Function returned: 42\n")
+        self.assertIn("\x1b", output["client"]["stdout"])
+        self.assertNotIn("while x == 1", output["client"]["stdout"])
+        self.assertIn("while x == 1", re.sub("\x1b[^m]*m", "", 
output["client"]["stdout"]))
+
 if __name__ == "__main__":
     unittest.main()

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to