https://github.com/python/cpython/commit/797b29b1b52a697010f64cdfdecd0c6aed22d3bd
commit: 797b29b1b52a697010f64cdfdecd0c6aed22d3bd
branch: main
author: Matt Wozniski <mwozni...@bloomberg.net>
committer: pablogsal <pablog...@gmail.com>
date: 2025-04-25T01:43:23+01:00
summary:

gh-131591: Allow pdb to attach to a running process (#132451)

Co-authored-by: Pablo Galindo <pablog...@gmail.com>

files:
A Lib/test/test_remote_pdb.py
A Misc/NEWS.d/next/Library/2025-04-22-19-45-46.gh-issue-132451.eIzMvE.rst
M Doc/whatsnew/3.14.rst
M Lib/pdb.py
M Lib/test/test_pyclbr.py

diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst
index 5b03bd9e5a8caf..379632bb62a179 100644
--- a/Doc/whatsnew/3.14.rst
+++ b/Doc/whatsnew/3.14.rst
@@ -99,7 +99,9 @@ PEP 768: Safe external debugger interface for CPython
 
 :pep:`768` introduces a zero-overhead debugging interface that allows 
debuggers and profilers
 to safely attach to running Python processes. This is a significant 
enhancement to Python's
-debugging capabilities allowing debuggers to forego unsafe alternatives.
+debugging capabilities allowing debuggers to forego unsafe alternatives. See
+:ref:`below <whatsnew314-remote-pdb>` for how this feature is leveraged to
+implement the new :mod:`pdb` module's remote attaching capabilities.
 
 The new interface provides safe execution points for attaching debugger code 
without modifying
 the interpreter's normal execution path or adding runtime overhead. This 
enables tools to
@@ -149,6 +151,32 @@ See :pep:`768` for more details.
 
 (Contributed by Pablo Galindo Salgado, Matt Wozniski, and Ivona Stojanovic in 
:gh:`131591`.)
 
+
+.. _whatsnew314-remote-pdb:
+
+Remote attaching to a running Python process with PDB
+-----------------------------------------------------
+
+The :mod:`pdb` module now supports remote attaching to a running Python process
+using a new ``-p PID`` command-line option:
+
+.. code-block:: sh
+
+   python -m pdb -p 1234
+
+This will connect to the Python process with the given PID and allow you to
+debug it interactively. Notice that due to how the Python interpreter works
+attaching to a remote process that is blocked in a system call or waiting for
+I/O will only work once the next bytecode instruction is executed or when the
+process receives a signal.
+
+This feature leverages :pep:`768` and the :func:`sys.remote_exec` function
+to attach to the remote process and send the PDB commands to it.
+
+
+(Contributed by Matt Wozniski and Pablo Galindo in :gh:`131591`.)
+
+
 .. _whatsnew314-pep758:
 
 PEP 758 – Allow except and except* expressions without parentheses
diff --git a/Lib/pdb.py b/Lib/pdb.py
index 38a2fbceccc82b..380c6a56db72e5 100644
--- a/Lib/pdb.py
+++ b/Lib/pdb.py
@@ -74,13 +74,19 @@
 import dis
 import code
 import glob
+import json
 import token
 import types
 import codeop
 import pprint
 import signal
+import socket
+import typing
 import asyncio
 import inspect
+import weakref
+import builtins
+import tempfile
 import textwrap
 import tokenize
 import itertools
@@ -88,6 +94,7 @@
 import linecache
 import _colorize
 
+from contextlib import closing
 from contextlib import contextmanager
 from rlcompleter import Completer
 from types import CodeType
@@ -918,7 +925,7 @@ def handle_command_def(self, line):
         if cmd == 'end':
             return True  # end of cmd list
         elif cmd == 'EOF':
-            print('')
+            self.message('')
             return True  # end of cmd list
         cmdlist = self.commands[self.commands_bnum]
         if cmd == 'silent':
@@ -1458,6 +1465,13 @@ def do_ignore(self, arg):
 
     complete_ignore = _complete_bpnumber
 
+    def _prompt_for_confirmation(self, prompt, default):
+        try:
+            reply = input(prompt)
+        except EOFError:
+            reply = default
+        return reply.strip().lower()
+
     def do_clear(self, arg):
         """cl(ear) [filename:lineno | bpnumber ...]
 
@@ -1467,11 +1481,10 @@ def do_clear(self, arg):
         clear all breaks at that line in that file.
         """
         if not arg:
-            try:
-                reply = input('Clear all breaks? ')
-            except EOFError:
-                reply = 'no'
-            reply = reply.strip().lower()
+            reply = self._prompt_for_confirmation(
+                'Clear all breaks? ',
+                default='no',
+            )
             if reply in ('y', 'yes'):
                 bplist = [bp for bp in bdb.Breakpoint.bpbynumber if bp]
                 self.clear_all_breaks()
@@ -1775,6 +1788,9 @@ def do_jump(self, arg):
                 self.error('Jump failed: %s' % e)
     do_j = do_jump
 
+    def _create_recursive_debugger(self):
+        return Pdb(self.completekey, self.stdin, self.stdout)
+
     def do_debug(self, arg):
         """debug code
 
@@ -1788,7 +1804,7 @@ def do_debug(self, arg):
         self.stop_trace()
         globals = self.curframe.f_globals
         locals = self.curframe.f_locals
-        p = Pdb(self.completekey, self.stdin, self.stdout)
+        p = self._create_recursive_debugger()
         p.prompt = "(%s) " % self.prompt.strip()
         self.message("ENTERING RECURSIVE DEBUGGER")
         try:
@@ -2485,6 +2501,581 @@ def set_trace(*, header=None, commands=None):
         pdb.message(header)
     pdb.set_trace(sys._getframe().f_back, commands=commands)
 
+# Remote PDB
+
+class _PdbServer(Pdb):
+    def __init__(self, sockfile, owns_sockfile=True, **kwargs):
+        self._owns_sockfile = owns_sockfile
+        self._interact_state = None
+        self._sockfile = sockfile
+        self._command_name_cache = []
+        self._write_failed = False
+        super().__init__(**kwargs)
+
+    @staticmethod
+    def protocol_version():
+        # By default, assume a client and server are compatible if they run
+        # the same Python major.minor version. We'll try to keep backwards
+        # compatibility between patch versions of a minor version if possible.
+        # If we do need to change the protocol in a patch version, we'll change
+        # `revision` to the patch version where the protocol changed.
+        # We can ignore compatibility for pre-release versions; sys.remote_exec
+        # can't attach to a pre-release version except from that same version.
+        v = sys.version_info
+        revision = 0
+        return int(f"{v.major:02X}{v.minor:02X}{revision:02X}F0", 16)
+
+    def _ensure_valid_message(self, msg):
+        # Ensure the message conforms to our protocol.
+        # If anything needs to be changed here for a patch release of Python,
+        # the 'revision' in protocol_version() should be updated.
+        match msg:
+            case {"message": str(), "type": str()}:
+                # Have the client show a message. The client chooses how to
+                # format the message based on its type. The currently defined
+                # types are "info" and "error". If a message has a type the
+                # client doesn't recognize, it must be treated as "info".
+                pass
+            case {"help": str()}:
+                # Have the client show the help for a given argument.
+                pass
+            case {"prompt": str(), "state": str()}:
+                # Have the client display the given prompt and wait for a reply
+                # from the user. If the client recognizes the state it may
+                # enable mode-specific features like multi-line editing.
+                # If it doesn't recognize the state it must prompt for a single
+                # line only and send it directly to the server. A server won't
+                # progress until it gets a "reply" or "signal" message, but can
+                # process "complete" requests while waiting for the reply.
+                pass
+            case {
+                "completions": list(completions)
+            } if all(isinstance(c, str) for c in completions):
+                # Return valid completions for a client's "complete" request.
+                pass
+            case {
+                "command_list": list(command_list)
+            } if all(isinstance(c, str) for c in command_list):
+                # Report the list of legal PDB commands to the client.
+                # Due to aliases this list is not static, but the client
+                # needs to know it for multi-line editing.
+                pass
+            case _:
+                raise AssertionError(
+                    f"PDB message doesn't follow the schema! {msg}"
+                )
+
+    def _send(self, **kwargs):
+        self._ensure_valid_message(kwargs)
+        json_payload = json.dumps(kwargs)
+        try:
+            self._sockfile.write(json_payload.encode() + b"\n")
+            self._sockfile.flush()
+        except OSError:
+            # This means that the client has abruptly disconnected, but we'll
+            # handle that the next time we try to read from the client instead
+            # of trying to handle it from everywhere _send() may be called.
+            # Track this with a flag rather than assuming readline() will ever
+            # return an empty string because the socket may be half-closed.
+            self._write_failed = True
+
+    @typing.override
+    def message(self, msg, end="\n"):
+        self._send(message=str(msg) + end, type="info")
+
+    @typing.override
+    def error(self, msg):
+        self._send(message=str(msg), type="error")
+
+    def _get_input(self, prompt, state) -> str:
+        # Before displaying a (Pdb) prompt, send the list of PDB commands
+        # unless we've already sent an up-to-date list.
+        if state == "pdb" and not self._command_name_cache:
+            self._command_name_cache = self.completenames("", "", 0, 0)
+            self._send(command_list=self._command_name_cache)
+        self._send(prompt=prompt, state=state)
+        return self._read_reply()
+
+    def _read_reply(self):
+        # Loop until we get a 'reply' or 'signal' from the client,
+        # processing out-of-band 'complete' requests as they arrive.
+        while True:
+            if self._write_failed:
+                raise EOFError
+
+            msg = self._sockfile.readline()
+            if not msg:
+                raise EOFError
+
+            try:
+                payload = json.loads(msg)
+            except json.JSONDecodeError:
+                self.error(f"Disconnecting: client sent invalid JSON {msg}")
+                raise EOFError
+
+            match payload:
+                case {"reply": str(reply)}:
+                    return reply
+                case {"signal": str(signal)}:
+                    if signal == "INT":
+                        raise KeyboardInterrupt
+                    elif signal == "EOF":
+                        raise EOFError
+                    else:
+                        self.error(
+                            f"Received unrecognized signal: {signal}"
+                        )
+                        # Our best hope of recovering is to pretend we
+                        # got an EOF to exit whatever mode we're in.
+                        raise EOFError
+                case {
+                    "complete": {
+                        "text": str(text),
+                        "line": str(line),
+                        "begidx": int(begidx),
+                        "endidx": int(endidx),
+                    }
+                }:
+                    items = self._complete_any(text, line, begidx, endidx)
+                    self._send(completions=items)
+                    continue
+            # Valid JSON, but doesn't meet the schema.
+            self.error(f"Ignoring invalid message from client: {msg}")
+
+    def _complete_any(self, text, line, begidx, endidx):
+        if begidx == 0:
+            return self.completenames(text, line, begidx, endidx)
+
+        cmd = self.parseline(line)[0]
+        if cmd:
+            compfunc = getattr(self, "complete_" + cmd, self.completedefault)
+        else:
+            compfunc = self.completedefault
+        return compfunc(text, line, begidx, endidx)
+
+    def cmdloop(self, intro=None):
+        self.preloop()
+        if intro is not None:
+            self.intro = intro
+        if self.intro:
+            self.message(str(self.intro))
+        stop = None
+        while not stop:
+            if self._interact_state is not None:
+                try:
+                    reply = self._get_input(prompt=">>> ", state="interact")
+                except KeyboardInterrupt:
+                    # Match how KeyboardInterrupt is handled in a REPL
+                    self.message("\nKeyboardInterrupt")
+                except EOFError:
+                    self.message("\n*exit from pdb interact command*")
+                    self._interact_state = None
+                else:
+                    self._run_in_python_repl(reply)
+                continue
+
+            if not self.cmdqueue:
+                try:
+                    state = "commands" if self.commands_defining else "pdb"
+                    reply = self._get_input(prompt=self.prompt, state=state)
+                except EOFError:
+                    reply = "EOF"
+
+                self.cmdqueue.append(reply)
+
+            line = self.cmdqueue.pop(0)
+            line = self.precmd(line)
+            stop = self.onecmd(line)
+            stop = self.postcmd(stop, line)
+        self.postloop()
+
+    def postloop(self):
+        super().postloop()
+        if self.quitting:
+            self.detach()
+
+    def detach(self):
+        # Detach the debugger and close the socket without raising BdbQuit
+        self.quitting = False
+        if self._owns_sockfile:
+            # Don't try to reuse this instance, it's not valid anymore.
+            Pdb._last_pdb_instance = None
+            try:
+                self._sockfile.close()
+            except OSError:
+                # close() can fail if the connection was broken unexpectedly.
+                pass
+
+    def do_debug(self, arg):
+        # Clear our cached list of valid commands; the recursive debugger might
+        # send its own differing list, and so ours needs to be re-sent.
+        self._command_name_cache = []
+        return super().do_debug(arg)
+
+    def do_alias(self, arg):
+        # Clear our cached list of valid commands; one might be added.
+        self._command_name_cache = []
+        return super().do_alias(arg)
+
+    def do_unalias(self, arg):
+        # Clear our cached list of valid commands; one might be removed.
+        self._command_name_cache = []
+        return super().do_unalias(arg)
+
+    def do_help(self, arg):
+        # Tell the client to render the help, since it might need a pager.
+        self._send(help=arg)
+
+    do_h = do_help
+
+    def _interact_displayhook(self, obj):
+        # Like the default `sys.displayhook` except sending a socket message.
+        if obj is not None:
+            self.message(repr(obj))
+            builtins._ = obj
+
+    def _run_in_python_repl(self, lines):
+        # Run one 'interact' mode code block against an existing namespace.
+        assert self._interact_state
+        save_displayhook = sys.displayhook
+        try:
+            sys.displayhook = self._interact_displayhook
+            code_obj = self._interact_state["compiler"](lines + "\n")
+            if code_obj is None:
+                raise SyntaxError("Incomplete command")
+            exec(code_obj, self._interact_state["ns"])
+        except:
+            self._error_exc()
+        finally:
+            sys.displayhook = save_displayhook
+
+    def do_interact(self, arg):
+        # Prepare to run 'interact' mode code blocks, and trigger the client
+        # to start treating all input as Python commands, not PDB ones.
+        self.message("*pdb interact start*")
+        self._interact_state = dict(
+            compiler=codeop.CommandCompiler(),
+            ns={**self.curframe.f_globals, **self.curframe.f_locals},
+        )
+
+    @typing.override
+    def _create_recursive_debugger(self):
+        return _PdbServer(self._sockfile, owns_sockfile=False)
+
+    @typing.override
+    def _prompt_for_confirmation(self, prompt, default):
+        try:
+            return self._get_input(prompt=prompt, state="confirm")
+        except (EOFError, KeyboardInterrupt):
+            return default
+
+    def do_run(self, arg):
+        self.error("remote PDB cannot restart the program")
+
+    do_restart = do_run
+
+    def _error_exc(self):
+        if self._interact_state and isinstance(sys.exception(), SystemExit):
+            # If we get a SystemExit in 'interact' mode, exit the REPL.
+            self._interact_state = None
+            ret = super()._error_exc()
+            self.message("*exit from pdb interact command*")
+            return ret
+        else:
+            return super()._error_exc()
+
+    def default(self, line):
+        # Unlike Pdb, don't prompt for more lines of a multi-line command.
+        # The remote needs to send us the whole block in one go.
+        try:
+            candidate = line.removeprefix("!") + "\n"
+            if codeop.compile_command(candidate, "<stdin>", "single") is None:
+                raise SyntaxError("Incomplete command")
+            return super().default(candidate)
+        except:
+            self._error_exc()
+
+
+class _PdbClient:
+    def __init__(self, pid, sockfile, interrupt_script):
+        self.pid = pid
+        self.sockfile = sockfile
+        self.interrupt_script = interrupt_script
+        self.pdb_instance = Pdb()
+        self.pdb_commands = set()
+        self.completion_matches = []
+        self.state = "dumb"
+        self.write_failed = False
+
+    def _ensure_valid_message(self, msg):
+        # Ensure the message conforms to our protocol.
+        # If anything needs to be changed here for a patch release of Python,
+        # the 'revision' in protocol_version() should be updated.
+        match msg:
+            case {"reply": str()}:
+                # Send input typed by a user at a prompt to the remote PDB.
+                pass
+            case {"signal": "EOF"}:
+                # Tell the remote PDB that the user pressed ^D at a prompt.
+                pass
+            case {"signal": "INT"}:
+                # Tell the remote PDB that the user pressed ^C at a prompt.
+                pass
+            case {
+                "complete": {
+                    "text": str(),
+                    "line": str(),
+                    "begidx": int(),
+                    "endidx": int(),
+                }
+            }:
+                # Ask the remote PDB what completions are valid for the given
+                # parameters, using readline's completion protocol.
+                pass
+            case _:
+                raise AssertionError(
+                    f"PDB message doesn't follow the schema! {msg}"
+                )
+
+    def _send(self, **kwargs):
+        self._ensure_valid_message(kwargs)
+        json_payload = json.dumps(kwargs)
+        try:
+            self.sockfile.write(json_payload.encode() + b"\n")
+            self.sockfile.flush()
+        except OSError:
+            # This means that the client has abruptly disconnected, but we'll
+            # handle that the next time we try to read from the client instead
+            # of trying to handle it from everywhere _send() may be called.
+            # Track this with a flag rather than assuming readline() will ever
+            # return an empty string because the socket may be half-closed.
+            self.write_failed = True
+
+    def read_command(self, prompt):
+        reply = input(prompt)
+
+        if self.state == "dumb":
+            # No logic applied whatsoever, just pass the raw reply back.
+            return reply
+
+        prefix = ""
+        if self.state == "pdb":
+            # PDB command entry mode
+            cmd = self.pdb_instance.parseline(reply)[0]
+            if cmd in self.pdb_commands or reply.strip() == "":
+                # Recognized PDB command, or blank line repeating last command
+                return reply
+
+            # Otherwise, explicit or implicit exec command
+            if reply.startswith("!"):
+                prefix = "!"
+                reply = reply.removeprefix(prefix).lstrip()
+
+        if codeop.compile_command(reply + "\n", "<stdin>", "single") is not 
None:
+            # Valid single-line statement
+            return prefix + reply
+
+        # Otherwise, valid first line of a multi-line statement
+        continue_prompt = "...".ljust(len(prompt))
+        while codeop.compile_command(reply, "<stdin>", "single") is None:
+            reply += "\n" + input(continue_prompt)
+
+        return prefix + reply
+
+    @contextmanager
+    def readline_completion(self, completer):
+        try:
+            import readline
+        except ImportError:
+            yield
+            return
+
+        old_completer = readline.get_completer()
+        try:
+            readline.set_completer(completer)
+            if readline.backend == "editline":
+                # libedit uses "^I" instead of "tab"
+                command_string = "bind ^I rl_complete"
+            else:
+                command_string = "tab: complete"
+            readline.parse_and_bind(command_string)
+            yield
+        finally:
+            readline.set_completer(old_completer)
+
+    def cmdloop(self):
+        with self.readline_completion(self.complete):
+            while not self.write_failed:
+                try:
+                    if not (payload_bytes := self.sockfile.readline()):
+                        break
+                except KeyboardInterrupt:
+                    self.send_interrupt()
+                    continue
+
+                try:
+                    payload = json.loads(payload_bytes)
+                except json.JSONDecodeError:
+                    print(
+                        f"*** Invalid JSON from remote: {payload_bytes}",
+                        flush=True,
+                    )
+                    continue
+
+                self.process_payload(payload)
+
+    def send_interrupt(self):
+        print(
+            "\n*** Program will stop at the next bytecode instruction."
+            " (Use 'cont' to resume)."
+        )
+        sys.remote_exec(self.pid, self.interrupt_script)
+
+    def process_payload(self, payload):
+        match payload:
+            case {
+                "command_list": command_list
+            } if all(isinstance(c, str) for c in command_list):
+                self.pdb_commands = set(command_list)
+            case {"message": str(msg), "type": str(msg_type)}:
+                if msg_type == "error":
+                    print("***", msg, flush=True)
+                else:
+                    print(msg, end="", flush=True)
+            case {"help": str(arg)}:
+                self.pdb_instance.do_help(arg)
+            case {"prompt": str(prompt), "state": str(state)}:
+                if state not in ("pdb", "interact"):
+                    state = "dumb"
+                self.state = state
+                self.prompt_for_reply(prompt)
+            case _:
+                raise RuntimeError(f"Unrecognized payload {payload}")
+
+    def prompt_for_reply(self, prompt):
+        while True:
+            try:
+                payload = {"reply": self.read_command(prompt)}
+            except EOFError:
+                payload = {"signal": "EOF"}
+            except KeyboardInterrupt:
+                payload = {"signal": "INT"}
+            except Exception as exc:
+                msg = traceback.format_exception_only(exc)[-1].strip()
+                print("***", msg, flush=True)
+                continue
+
+            self._send(**payload)
+            return
+
+    def complete(self, text, state):
+        import readline
+
+        if state == 0:
+            self.completion_matches = []
+            if self.state not in ("pdb", "interact"):
+                return None
+
+            origline = readline.get_line_buffer()
+            line = origline.lstrip()
+            stripped = len(origline) - len(line)
+            begidx = readline.get_begidx() - stripped
+            endidx = readline.get_endidx() - stripped
+
+            msg = {
+                "complete": {
+                    "text": text,
+                    "line": line,
+                    "begidx": begidx,
+                    "endidx": endidx,
+                }
+            }
+
+            self._send(**msg)
+            if self.write_failed:
+                return None
+
+            payload = self.sockfile.readline()
+            if not payload:
+                return None
+
+            payload = json.loads(payload)
+            if "completions" not in payload:
+                raise RuntimeError(
+                    f"Failed to get valid completions. Got: {payload}"
+                )
+
+            self.completion_matches = payload["completions"]
+        try:
+            return self.completion_matches[state]
+        except IndexError:
+            return None
+
+
+def _connect(host, port, frame, commands, version):
+    with closing(socket.create_connection((host, port))) as conn:
+        sockfile = conn.makefile("rwb")
+
+    remote_pdb = _PdbServer(sockfile)
+    weakref.finalize(remote_pdb, sockfile.close)
+
+    if Pdb._last_pdb_instance is not None:
+        remote_pdb.error("Another PDB instance is already attached.")
+    elif version != remote_pdb.protocol_version():
+        target_ver = f"0x{remote_pdb.protocol_version():08X}"
+        attach_ver = f"0x{version:08X}"
+        remote_pdb.error(
+            f"The target process is running a Python version that is"
+            f" incompatible with this PDB module."
+            f"\nTarget process pdb protocol version: {target_ver}"
+            f"\nLocal pdb module's protocol version: {attach_ver}"
+        )
+    else:
+        remote_pdb.rcLines.extend(commands.splitlines())
+        remote_pdb.set_trace(frame=frame)
+
+
+def attach(pid, commands=()):
+    """Attach to a running process with the given PID."""
+    with closing(socket.create_server(("localhost", 0))) as server:
+        port = server.getsockname()[1]
+
+        with tempfile.NamedTemporaryFile("w", delete_on_close=False) as 
connect_script:
+            connect_script.write(
+                textwrap.dedent(
+                    f"""
+                    import pdb, sys
+                    pdb._connect(
+                        host="localhost",
+                        port={port},
+                        frame=sys._getframe(1),
+                        commands={json.dumps("\n".join(commands))},
+                        version={_PdbServer.protocol_version()},
+                    )
+                    """
+                )
+            )
+            connect_script.close()
+            sys.remote_exec(pid, connect_script.name)
+
+            # TODO Add a timeout? Or don't bother since the user can ^C?
+            client_sock, _ = server.accept()
+
+            with closing(client_sock):
+                sockfile = client_sock.makefile("rwb")
+
+            with closing(sockfile):
+                with tempfile.NamedTemporaryFile("w", delete_on_close=False) 
as interrupt_script:
+                    interrupt_script.write(
+                        'import pdb, sys\n'
+                        'if inst := pdb.Pdb._last_pdb_instance:\n'
+                        '    inst.set_trace(sys._getframe(1))\n'
+                    )
+                    interrupt_script.close()
+
+                    _PdbClient(pid, sockfile, interrupt_script.name).cmdloop()
+
+
 # Post-Mortem interface
 
 def post_mortem(t=None):
@@ -2554,7 +3145,7 @@ def help():
 def main():
     import argparse
 
-    parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m 
module | pyfile) [args ...]",
+    parser = argparse.ArgumentParser(usage="%(prog)s [-h] [-c command] (-m 
module | -p pid | pyfile) [args ...]",
                                      description=_usage,
                                      
formatter_class=argparse.RawDescriptionHelpFormatter,
                                      allow_abbrev=False)
@@ -2565,6 +3156,7 @@ def main():
     parser.add_argument('-c', '--command', action='append', default=[], 
metavar='command', dest='commands',
                         help='pdb commands to execute as if given in a .pdbrc 
file')
     parser.add_argument('-m', metavar='module', dest='module')
+    parser.add_argument('-p', '--pid', type=int, help="attach to the specified 
PID", default=None)
 
     if len(sys.argv) == 1:
         # If no arguments were given (python -m pdb), print the whole help 
message.
@@ -2574,7 +3166,15 @@ def main():
 
     opts, args = parser.parse_known_args()
 
-    if opts.module:
+    if opts.pid:
+        # If attaching to a remote pid, unrecognized arguments are not allowed.
+        # This will raise an error if there are extra unrecognized arguments.
+        opts = parser.parse_args()
+        if opts.module:
+            parser.error("argument -m: not allowed with argument --pid")
+        attach(opts.pid, opts.commands)
+        return
+    elif opts.module:
         # If a module is being debugged, we consider the arguments after "-m 
module" to
         # be potential arguments to the module itself. We need to parse the 
arguments
         # before "-m" to check if there is any invalid argument.
diff --git a/Lib/test/test_pyclbr.py b/Lib/test/test_pyclbr.py
index a9ac13395a8fac..df05cd07d7e249 100644
--- a/Lib/test/test_pyclbr.py
+++ b/Lib/test/test_pyclbr.py
@@ -253,7 +253,8 @@ def test_others(self):
             cm(
                 'pdb',
                 # pyclbr does not handle elegantly `typing` or properties
-                ignore=('Union', '_ModuleTarget', '_ScriptTarget', 
'_ZipTarget', 'curframe_locals'),
+                ignore=('Union', '_ModuleTarget', '_ScriptTarget', 
'_ZipTarget', 'curframe_locals',
+                        '_InteractState'),
             )
         cm('pydoc', ignore=('input', 'output',))  # properties
 
diff --git a/Lib/test/test_remote_pdb.py b/Lib/test/test_remote_pdb.py
new file mode 100644
index 00000000000000..cc0ada12814afd
--- /dev/null
+++ b/Lib/test/test_remote_pdb.py
@@ -0,0 +1,687 @@
+import io
+import json
+import os
+import signal
+import socket
+import subprocess
+import sys
+import tempfile
+import textwrap
+import threading
+import unittest
+import unittest.mock
+from contextlib import contextmanager
+from pathlib import Path
+from test.support import is_wasi, os_helper
+from test.support.os_helper import temp_dir, TESTFN, unlink
+from typing import Dict, List, Optional, Tuple, Union, Any
+
+import pdb
+from pdb import _PdbServer, _PdbClient
+
+
+class MockSocketFile:
+    """Mock socket file for testing _PdbServer without actual socket 
connections."""
+
+    def __init__(self):
+        self.input_queue = []
+        self.output_buffer = []
+
+    def write(self, data: bytes) -> None:
+        """Simulate write to socket."""
+        self.output_buffer.append(data)
+
+    def flush(self) -> None:
+        """No-op flush implementation."""
+        pass
+
+    def readline(self) -> bytes:
+        """Read a line from the prepared input queue."""
+        if not self.input_queue:
+            return b""
+        return self.input_queue.pop(0)
+
+    def close(self) -> None:
+        """Close the mock socket file."""
+        pass
+
+    def add_input(self, data: dict) -> None:
+        """Add input that will be returned by readline."""
+        self.input_queue.append(json.dumps(data).encode() + b"\n")
+
+    def get_output(self) -> List[dict]:
+        """Get the output that was written by the object being tested."""
+        results = []
+        for data in self.output_buffer:
+            if isinstance(data, bytes) and data.endswith(b"\n"):
+                try:
+                    results.append(json.loads(data.decode().strip()))
+                except json.JSONDecodeError:
+                    pass  # Ignore non-JSON output
+        self.output_buffer = []
+        return results
+
+
+class RemotePdbTestCase(unittest.TestCase):
+    """Tests for the _PdbServer class."""
+
+    def setUp(self):
+        self.sockfile = MockSocketFile()
+        self.pdb = _PdbServer(self.sockfile)
+
+        # Mock some Bdb attributes that are lazily created when tracing starts
+        self.pdb.botframe = None
+        self.pdb.quitting = False
+
+        # Create a frame for testing
+        self.test_globals = {'a': 1, 'b': 2, '__pdb_convenience_variables': 
{'x': 100}}
+        self.test_locals = {'c': 3, 'd': 4}
+
+        # Create a simple test frame
+        frame_info = unittest.mock.Mock()
+        frame_info.f_globals = self.test_globals
+        frame_info.f_locals = self.test_locals
+        frame_info.f_lineno = 42
+        frame_info.f_code = unittest.mock.Mock()
+        frame_info.f_code.co_filename = "test_file.py"
+        frame_info.f_code.co_name = "test_function"
+
+        self.pdb.curframe = frame_info
+
+    def test_message_and_error(self):
+        """Test message and error methods send correct JSON."""
+        self.pdb.message("Test message")
+        self.pdb.error("Test error")
+
+        outputs = self.sockfile.get_output()
+        self.assertEqual(len(outputs), 2)
+        self.assertEqual(outputs[0], {"message": "Test message\n", "type": 
"info"})
+        self.assertEqual(outputs[1], {"message": "Test error", "type": 
"error"})
+
+    def test_read_command(self):
+        """Test reading commands from the socket."""
+        # Add test input
+        self.sockfile.add_input({"reply": "help"})
+
+        # Read the command
+        cmd = self.pdb._read_reply()
+        self.assertEqual(cmd, "help")
+
+    def test_read_command_EOF(self):
+        """Test reading EOF command."""
+        # Simulate socket closure
+        self.pdb._write_failed = True
+        with self.assertRaises(EOFError):
+            self.pdb._read_reply()
+
+    def test_completion(self):
+        """Test handling completion requests."""
+        # Mock completenames to return specific values
+        with unittest.mock.patch.object(self.pdb, 'completenames',
+                                       return_value=["continue", "clear"]):
+
+            # Add a completion request
+            self.sockfile.add_input({
+                "complete": {
+                    "text": "c",
+                    "line": "c",
+                    "begidx": 0,
+                    "endidx": 1
+                }
+            })
+
+            # Add a regular command to break the loop
+            self.sockfile.add_input({"reply": "help"})
+
+            # Read command - this should process the completion request first
+            cmd = self.pdb._read_reply()
+
+            # Verify completion response was sent
+            outputs = self.sockfile.get_output()
+            self.assertEqual(len(outputs), 1)
+            self.assertEqual(outputs[0], {"completions": ["continue", 
"clear"]})
+
+            # The actual command should be returned
+            self.assertEqual(cmd, "help")
+
+    def test_do_help(self):
+        """Test that do_help sends the help message."""
+        self.pdb.do_help("break")
+
+        outputs = self.sockfile.get_output()
+        self.assertEqual(len(outputs), 1)
+        self.assertEqual(outputs[0], {"help": "break"})
+
+    def test_interact_mode(self):
+        """Test interaction mode setup and execution."""
+        # First set up interact mode
+        self.pdb.do_interact("")
+
+        # Verify _interact_state is properly initialized
+        self.assertIsNotNone(self.pdb._interact_state)
+        self.assertIsInstance(self.pdb._interact_state, dict)
+
+        # Test running code in interact mode
+        with unittest.mock.patch.object(self.pdb, '_error_exc') as mock_error:
+            self.pdb._run_in_python_repl("print('test')")
+            mock_error.assert_not_called()
+
+            # Test with syntax error
+            self.pdb._run_in_python_repl("if:")
+            mock_error.assert_called_once()
+
+    def test_registering_commands(self):
+        """Test registering breakpoint commands."""
+        # Mock get_bpbynumber
+        with unittest.mock.patch.object(self.pdb, 'get_bpbynumber'):
+            # Queue up some input to send
+            self.sockfile.add_input({"reply": "commands 1"})
+            self.sockfile.add_input({"reply": "silent"})
+            self.sockfile.add_input({"reply": "print('hi')"})
+            self.sockfile.add_input({"reply": "end"})
+            self.sockfile.add_input({"signal": "EOF"})
+
+            # Run the PDB command loop
+            self.pdb.cmdloop()
+
+            outputs = self.sockfile.get_output()
+            self.assertIn('command_list', outputs[0])
+            self.assertEqual(outputs[1], {"prompt": "(Pdb) ", "state": "pdb"})
+            self.assertEqual(outputs[2], {"prompt": "(com) ", "state": 
"commands"})
+            self.assertEqual(outputs[3], {"prompt": "(com) ", "state": 
"commands"})
+            self.assertEqual(outputs[4], {"prompt": "(com) ", "state": 
"commands"})
+            self.assertEqual(outputs[5], {"prompt": "(Pdb) ", "state": "pdb"})
+            self.assertEqual(outputs[6], {"message": "\n", "type": "info"})
+            self.assertEqual(len(outputs), 7)
+
+            self.assertEqual(
+                self.pdb.commands[1],
+                ["_pdbcmd_silence_frame_status", "print('hi')"],
+            )
+
+    def test_detach(self):
+        """Test the detach method."""
+        with unittest.mock.patch.object(self.sockfile, 'close') as mock_close:
+            self.pdb.detach()
+            mock_close.assert_called_once()
+            self.assertFalse(self.pdb.quitting)
+
+    def test_cmdloop(self):
+        """Test the command loop with various commands."""
+        # Mock onecmd to track command execution
+        with unittest.mock.patch.object(self.pdb, 'onecmd', 
return_value=False) as mock_onecmd:
+            # Add commands to the queue
+            self.pdb.cmdqueue = ['help', 'list']
+
+            # Add a command from the socket for when cmdqueue is empty
+            self.sockfile.add_input({"reply": "next"})
+
+            # Add a second command to break the loop
+            self.sockfile.add_input({"reply": "quit"})
+
+            # Configure onecmd to exit the loop on "quit"
+            def side_effect(line):
+                return line == 'quit'
+            mock_onecmd.side_effect = side_effect
+
+            # Run the command loop
+            self.pdb.quitting = False # Set this by hand because we don't want 
to really call set_trace()
+            self.pdb.cmdloop()
+
+            # Should have processed 4 commands: 2 from cmdqueue, 2 from socket
+            self.assertEqual(mock_onecmd.call_count, 4)
+            mock_onecmd.assert_any_call('help')
+            mock_onecmd.assert_any_call('list')
+            mock_onecmd.assert_any_call('next')
+            mock_onecmd.assert_any_call('quit')
+
+            # Check if prompt was sent to client
+            outputs = self.sockfile.get_output()
+            prompts = [o for o in outputs if 'prompt' in o]
+            self.assertEqual(len(prompts), 2)  # Should have sent 2 prompts
+
+
+@unittest.skipIf(is_wasi, "WASI does not support TCP sockets")
+class PdbConnectTestCase(unittest.TestCase):
+    """Tests for the _connect mechanism using direct socket communication."""
+
+    def setUp(self):
+        # Create a server socket that will wait for the debugger to connect
+        self.server_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.server_sock.bind(('127.0.0.1', 0))  # Let OS assign port
+        self.server_sock.listen(1)
+        self.port = self.server_sock.getsockname()[1]
+
+    def _create_script(self, script=None):
+        # Create a file for subprocess script
+        if script is None:
+            script = textwrap.dedent(
+                f"""
+                import pdb
+                import sys
+                import time
+
+                def foo():
+                    x = 42
+                    return bar()
+
+                def bar():
+                    return 42
+
+                def connect_to_debugger():
+                    # Create a frame to debug
+                    def dummy_function():
+                        x = 42
+                        # Call connect to establish connection
+                        # with the test server
+                        frame = sys._getframe()  # Get the current frame
+                        pdb._connect(
+                            host='127.0.0.1',
+                            port={self.port},
+                            frame=frame,
+                            commands="",
+                            version=pdb._PdbServer.protocol_version(),
+                        )
+                        return x  # This line won't be reached in debugging
+
+                    return dummy_function()
+
+                result = connect_to_debugger()
+                foo()
+                print(f"Function returned: {{result}}")
+                """)
+
+        self.script_path = TESTFN + "_connect_test.py"
+        with open(self.script_path, 'w') as f:
+            f.write(script)
+
+    def tearDown(self):
+        self.server_sock.close()
+        try:
+            unlink(self.script_path)
+        except OSError:
+            pass
+
+    def _connect_and_get_client_file(self):
+        """Helper to start subprocess and get connected client file."""
+        # Start the subprocess that will connect to our socket
+        process = subprocess.Popen(
+            [sys.executable, self.script_path],
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            text=True
+        )
+
+        # Accept the connection from the subprocess
+        client_sock, _ = self.server_sock.accept()
+        client_file = client_sock.makefile('rwb')
+        self.addCleanup(client_file.close)
+        self.addCleanup(client_sock.close)
+
+        return process, client_file
+
+    def _read_until_prompt(self, client_file):
+        """Helper to read messages until a prompt is received."""
+        messages = []
+        while True:
+            data = client_file.readline()
+            if not data:
+                break
+            msg = json.loads(data.decode())
+            messages.append(msg)
+            if 'prompt' in msg:
+                break
+        return messages
+
+    def _send_command(self, client_file, command):
+        """Helper to send a command to the debugger."""
+        client_file.write(json.dumps({"reply": command}).encode() + b"\n")
+        client_file.flush()
+
+    def _send_interrupt(self, pid):
+        """Helper to send an interrupt signal to the debugger."""
+        # with tempfile.NamedTemporaryFile("w", delete_on_close=False) as 
interrupt_script:
+        interrupt_script = TESTFN + "_interrupt_script.py"
+        with open(interrupt_script, 'w') as f:
+            f.write(
+                'import pdb, sys\n'
+                'print("Hello, world!")\n'
+                'if inst := pdb.Pdb._last_pdb_instance:\n'
+                '    inst.set_trace(sys._getframe(1))\n'
+            )
+        self.addCleanup(unlink, interrupt_script)
+        try:
+            sys.remote_exec(pid, interrupt_script)
+        except PermissionError:
+            self.skipTest("Insufficient permissions to execute code in remote 
process")
+
+    def test_connect_and_basic_commands(self):
+        """Test connecting to a remote debugger and sending basic commands."""
+        self._create_script()
+        process, client_file = self._connect_and_get_client_file()
+
+        with process:
+            # We should receive initial data from the debugger
+            data = client_file.readline()
+            initial_data = json.loads(data.decode())
+            self.assertIn('message', initial_data)
+            self.assertIn('pdb._connect', initial_data['message'])
+
+            # First, look for command_list message
+            data = client_file.readline()
+            command_list = json.loads(data.decode())
+            self.assertIn('command_list', command_list)
+
+            # Then, look for the first prompt
+            data = client_file.readline()
+            prompt_data = json.loads(data.decode())
+            self.assertIn('prompt', prompt_data)
+            self.assertEqual(prompt_data['state'], 'pdb')
+
+            # Send 'bt' (backtrace) command
+            self._send_command(client_file, "bt")
+
+            # Check for response - we should get some stack frames
+            messages = self._read_until_prompt(client_file)
+
+            # Extract text messages containing stack info
+            text_msg = [msg['message'] for msg in messages
+                    if 'message' in msg and 'connect_to_debugger' in 
msg['message']]
+            got_stack_info = bool(text_msg)
+
+            expected_stacks = [
+                "<module>",
+                "connect_to_debugger",
+            ]
+
+            for stack, msg in zip(expected_stacks, text_msg, strict=True):
+                self.assertIn(stack, msg)
+
+            self.assertTrue(got_stack_info, "Should have received stack trace 
information")
+
+            # Send 'c' (continue) command to let the program finish
+            self._send_command(client_file, "c")
+
+            # Wait for process to finish
+            stdout, _ = process.communicate(timeout=5)
+
+            # Check if we got the expected output
+            self.assertIn("Function returned: 42", stdout)
+            self.assertEqual(process.returncode, 0)
+
+    def test_breakpoints(self):
+        """Test setting and hitting breakpoints."""
+        self._create_script()
+        process, client_file = self._connect_and_get_client_file()
+        with process:
+            # Skip initial messages until we get to the prompt
+            self._read_until_prompt(client_file)
+
+            # Set a breakpoint at the return statement
+            self._send_command(client_file, "break bar")
+            messages = self._read_until_prompt(client_file)
+            bp_msg = next(msg['message'] for msg in messages if 'message' in 
msg)
+            self.assertIn("Breakpoint", bp_msg)
+
+            # Continue execution until breakpoint
+            self._send_command(client_file, "c")
+            messages = self._read_until_prompt(client_file)
+
+            # Verify we hit the breakpoint
+            hit_msg = next(msg['message'] for msg in messages if 'message' in 
msg)
+            self.assertIn("bar()", hit_msg)
+
+            # Check breakpoint list
+            self._send_command(client_file, "b")
+            messages = self._read_until_prompt(client_file)
+            list_msg = next(msg['message'] for msg in reversed(messages) if 
'message' in msg)
+            self.assertIn("1   breakpoint", list_msg)
+            self.assertIn("breakpoint already hit 1 time", list_msg)
+
+            # Clear breakpoint
+            self._send_command(client_file, "clear 1")
+            messages = self._read_until_prompt(client_file)
+            clear_msg = next(msg['message'] for msg in reversed(messages) if 
'message' in msg)
+            self.assertIn("Deleted breakpoint", clear_msg)
+
+            # Continue to end
+            self._send_command(client_file, "c")
+            stdout, _ = process.communicate(timeout=5)
+
+            self.assertIn("Function returned: 42", stdout)
+            self.assertEqual(process.returncode, 0)
+
+    def test_keyboard_interrupt(self):
+        """Test that sending keyboard interrupt breaks into pdb."""
+        synchronizer_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        synchronizer_sock.bind(('127.0.0.1', 0))  # Let OS assign port
+        synchronizer_sock.settimeout(5)
+        synchronizer_sock.listen(1)
+        self.addCleanup(synchronizer_sock.close)
+        sync_port = synchronizer_sock.getsockname()[1]
+
+        script = textwrap.dedent(f"""
+            import time
+            import sys
+            import socket
+            import pdb
+            def bar():
+                frame = sys._getframe()  # Get the current frame
+                pdb._connect(
+                    host='127.0.0.1',
+                    port={self.port},
+                    frame=frame,
+                    commands="",
+                    version=pdb._PdbServer.protocol_version(),
+                )
+                print("Connected to debugger")
+                iterations = 10
+                socket.create_connection(('127.0.0.1', {sync_port})).close()
+                while iterations > 0:
+                    print("Iteration", iterations)
+                    time.sleep(1)
+                    iterations -= 1
+                return 42
+
+            if __name__ == "__main__":
+                print("Function returned:", bar())
+            """)
+        self._create_script(script=script)
+        process, client_file = self._connect_and_get_client_file()
+
+        with process:
+
+            # Skip initial messages until we get to the prompt
+            self._read_until_prompt(client_file)
+
+            # Continue execution
+            self._send_command(client_file, "c")
+
+            # Wait until execution has continued
+            synchronizer_sock.accept()[0].close()
+
+            # Inject a script to interrupt the running process
+            self._send_interrupt(process.pid)
+            messages = self._read_until_prompt(client_file)
+
+            # Verify we got the keyboard interrupt message
+            interrupt_msg = next(msg['message'] for msg in messages if 
'message' in msg)
+            self.assertIn("bar()", interrupt_msg)
+
+            # Continue to end
+            self._send_command(client_file, "iterations = 0")
+            self._send_command(client_file, "c")
+            stdout, _ = process.communicate(timeout=5)
+            self.assertIn("Function returned: 42", stdout)
+            self.assertEqual(process.returncode, 0)
+
+    def test_handle_eof(self):
+        """Test that EOF signal properly exits the debugger."""
+        self._create_script()
+        process, client_file = self._connect_and_get_client_file()
+
+        with process:
+            # Skip initial messages until we get to the prompt
+            self._read_until_prompt(client_file)
+
+            # Send EOF signal to exit the debugger
+            client_file.write(json.dumps({"signal": "EOF"}).encode() + b"\n")
+            client_file.flush()
+
+            # The process should complete normally after receiving EOF
+            stdout, stderr = process.communicate(timeout=5)
+
+            # Verify process completed correctly
+            self.assertIn("Function returned: 42", stdout)
+            self.assertEqual(process.returncode, 0)
+            self.assertEqual(stderr, "")
+
+    def test_protocol_version(self):
+        """Test that incompatible protocol versions are properly detected."""
+        # Create a script using an incompatible protocol version
+        script = textwrap.dedent(f'''
+            import sys
+            import pdb
+
+            def run_test():
+                frame = sys._getframe()
+
+                # Use a fake version number that's definitely incompatible
+                fake_version = 0x01010101 # A fake version that doesn't match 
any real Python version
+
+                # Connect with the wrong version
+                pdb._connect(
+                    host='127.0.0.1',
+                    port={self.port},
+                    frame=frame,
+                    commands="",
+                    version=fake_version,
+                )
+
+                # This should print if the debugger detaches correctly
+                print("Debugger properly detected version mismatch")
+                return True
+
+            if __name__ == "__main__":
+                print("Test result:", run_test())
+            ''')
+        self._create_script(script=script)
+        process, client_file = self._connect_and_get_client_file()
+
+        with process:
+            # First message should be an error about protocol version mismatch
+            data = client_file.readline()
+            message = json.loads(data.decode())
+
+            self.assertIn('message', message)
+            self.assertEqual(message['type'], 'error')
+            self.assertIn('incompatible', message['message'])
+            self.assertIn('protocol version', message['message'])
+
+            # The process should complete normally
+            stdout, stderr = process.communicate(timeout=5)
+
+            # Verify the process completed successfully
+            self.assertIn("Test result: True", stdout)
+            self.assertIn("Debugger properly detected version mismatch", 
stdout)
+            self.assertEqual(process.returncode, 0)
+
+    def test_help_system(self):
+        """Test that the help system properly sends help text to the client."""
+        self._create_script()
+        process, client_file = self._connect_and_get_client_file()
+
+        with process:
+            # Skip initial messages until we get to the prompt
+            self._read_until_prompt(client_file)
+
+            # Request help for different commands
+            help_commands = ["help", "help break", "help continue", "help pdb"]
+
+            for cmd in help_commands:
+                self._send_command(client_file, cmd)
+
+                # Look for help message
+                data = client_file.readline()
+                message = json.loads(data.decode())
+
+                self.assertIn('help', message)
+
+                if cmd == "help":
+                    # Should just contain the command itself
+                    self.assertEqual(message['help'], "")
+                else:
+                    # Should contain the specific command we asked for help 
with
+                    command = cmd.split()[1]
+                    self.assertEqual(message['help'], command)
+
+                # Skip to the next prompt
+                self._read_until_prompt(client_file)
+
+            # Continue execution to finish the program
+            self._send_command(client_file, "c")
+
+            stdout, stderr = process.communicate(timeout=5)
+            self.assertIn("Function returned: 42", stdout)
+            self.assertEqual(process.returncode, 0)
+
+    def test_multi_line_commands(self):
+        """Test that multi-line commands work properly over remote 
connection."""
+        self._create_script()
+        process, client_file = self._connect_and_get_client_file()
+
+        with process:
+            # Skip initial messages until we get to the prompt
+            self._read_until_prompt(client_file)
+
+            # Send a multi-line command
+            multi_line_commands = [
+                # Define a function
+                "def test_func():\n    return 42",
+
+                # For loop
+                "for i in range(3):\n    print(i)",
+
+                # If statement
+                "if True:\n    x = 42\nelse:\n    x = 0",
+
+                # Try/except
+                "try:\n    result = 10/2\n    print(result)\nexcept 
ZeroDivisionError:\n    print('Error')",
+
+                # Class definition
+                "class TestClass:\n    def __init__(self):\n        self.value 
= 100\n    def get_value(self):\n        return self.value"
+            ]
+
+            for cmd in multi_line_commands:
+                self._send_command(client_file, cmd)
+                self._read_until_prompt(client_file)
+
+            # Test executing the defined function
+            self._send_command(client_file, "test_func()")
+            messages = self._read_until_prompt(client_file)
+
+            # Find the result message
+            result_msg = next(msg['message'] for msg in messages if 'message' 
in msg)
+            self.assertIn("42", result_msg)
+
+            # Test creating an instance of the defined class
+            self._send_command(client_file, "obj = TestClass()")
+            self._read_until_prompt(client_file)
+
+            # Test calling a method on the instance
+            self._send_command(client_file, "obj.get_value()")
+            messages = self._read_until_prompt(client_file)
+
+            # Find the result message
+            result_msg = next(msg['message'] for msg in messages if 'message' 
in msg)
+            self.assertIn("100", result_msg)
+
+            # Continue execution to finish
+            self._send_command(client_file, "c")
+
+            stdout, stderr = process.communicate(timeout=5)
+            self.assertIn("Function returned: 42", stdout)
+            self.assertEqual(process.returncode, 0)
+
+if __name__ == "__main__":
+    unittest.main()
diff --git 
a/Misc/NEWS.d/next/Library/2025-04-22-19-45-46.gh-issue-132451.eIzMvE.rst 
b/Misc/NEWS.d/next/Library/2025-04-22-19-45-46.gh-issue-132451.eIzMvE.rst
new file mode 100644
index 00000000000000..01ca64868531e6
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-04-22-19-45-46.gh-issue-132451.eIzMvE.rst
@@ -0,0 +1,3 @@
+The CLI for the PDB debugger now accepts a ``-p PID`` argument to allow
+attaching to a running process. The process must be running the same version
+of Python as the one running PDB.

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to