https://github.com/ashgti updated 
https://github.com/llvm/llvm-project/pull/167754

>From c148dad94f05ff4ea81fe088df2d06c72fd8daf6 Mon Sep 17 00:00:00 2001
From: John Harrison <[email protected]>
Date: Wed, 12 Nov 2025 12:10:36 -0800
Subject: [PATCH 1/4] [lldb-dap] Fix running dap_server.py directly for
 debugging tests.

This adjusts the behavior of running dap_server.py directly to better support 
the current state of development.

* Instead of the custom tracefile parsing logic, I adjusted the replay helper 
to handle parsing lldb-dap log files created with the `LLDBDAP_LOG` env 
variable.
* Migrated argument parsing to `argparse`, that is in all verisons of py3+ and 
has a few improvements over `optparse`.
* Corrected the existing arguments and updated `run_vscode` > `run_adapter`. 
You can use this for simple debugging like: `xcrun python3 
lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
--adapter=lldb-dap --adapter-arg='--pre-init-command' --adapter-arg 'help' 
--program a.out --init-command 'help'`
---
 .../test/tools/lldb-dap/dap_server.py         | 414 ++++++++----------
 1 file changed, 183 insertions(+), 231 deletions(-)

diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index ac550962cfb85..f0aef30b3cd1d 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -1,8 +1,8 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 
 import binascii
 import json
-import optparse
+import argparse
 import os
 import pprint
 import socket
@@ -10,6 +10,7 @@
 import subprocess
 import signal
 import sys
+import pathlib
 import threading
 import warnings
 import time
@@ -20,10 +21,8 @@
     cast,
     List,
     Callable,
-    IO,
     Union,
     BinaryIO,
-    TextIO,
     TypedDict,
     Literal,
 )
@@ -143,35 +142,6 @@ def dump_memory(base_addr, data, num_per_line, outfile):
         outfile.write("\n")
 
 
-def read_packet(
-    f: IO[bytes], trace_file: Optional[IO[str]] = None
-) -> Optional[ProtocolMessage]:
-    """Decode a JSON packet that starts with the content length and is
-    followed by the JSON bytes from a file 'f'. Returns None on EOF.
-    """
-    line = f.readline().decode("utf-8")
-    if len(line) == 0:
-        return None  # EOF.
-
-    # Watch for line that starts with the prefix
-    prefix = "Content-Length: "
-    if line.startswith(prefix):
-        # Decode length of JSON bytes
-        length = int(line[len(prefix) :])
-        # Skip empty line
-        separator = f.readline().decode()
-        if separator != "":
-            Exception("malformed DAP content header, unexpected line: " + 
separator)
-        # Read JSON bytes
-        json_str = f.read(length).decode()
-        if trace_file:
-            trace_file.write("from adapter:\n%s\n" % (json_str))
-        # Decode the JSON bytes into a python dictionary
-        return json.loads(json_str)
-
-    raise Exception("unexpected malformed message from lldb-dap: " + line)
-
-
 def packet_type_is(packet, packet_type):
     return "type" in packet and packet["type"] == packet_type
 
@@ -199,8 +169,6 @@ def __init__(
         log_file: Optional[str] = None,
         spawn_helper: Optional[SpawnHelperCallback] = None,
     ):
-        # For debugging test failures, try setting `trace_file = sys.stderr`.
-        self.trace_file: Optional[TextIO] = None
         self.log_file = log_file
         self.send = send
         self.recv = recv
@@ -258,10 +226,34 @@ def validate_response(cls, command, response):
                 f"seq mismatch in response {command['seq']} != 
{response['request_seq']}"
             )
 
+    def _read_packet(self) -> Optional[ProtocolMessage]:
+        """Decode a JSON packet that starts with the content length and is
+        followed by the JSON bytes. Returns None on EOF.
+        """
+        line = self.recv.readline().decode("utf-8")
+        if len(line) == 0:
+            return None  # EOF.
+
+        # Watch for line that starts with the prefix
+        prefix = "Content-Length: "
+        if line.startswith(prefix):
+            # Decode length of JSON bytes
+            length = int(line[len(prefix) :])
+            # Skip empty line
+            separator = self.recv.readline().decode()
+            if separator != "":
+                Exception("malformed DAP content header, unexpected line: " + 
separator)
+            # Read JSON bytes
+            json_str = self.recv.read(length).decode()
+            # Decode the JSON bytes into a python dictionary
+            return json.loads(json_str)
+
+        raise Exception("unexpected malformed message from lldb-dap: " + line)
+
     def _read_packet_thread(self):
         try:
             while True:
-                packet = read_packet(self.recv, trace_file=self.trace_file)
+                packet = self._read_packet()
                 # `packet` will be `None` on EOF. We want to pass it down to
                 # handle_recv_packet anyway so the main thread can handle 
unexpected
                 # termination of lldb-dap and stop waiting for new packets.
@@ -521,9 +513,6 @@ def send_packet(self, packet: ProtocolMessage) -> int:
         # Encode our command dictionary as a JSON string
         json_str = json.dumps(packet, separators=(",", ":"))
 
-        if self.trace_file:
-            self.trace_file.write("to adapter:\n%s\n" % (json_str))
-
         length = len(json_str)
         if length > 0:
             # Send the encoded JSON packet and flush the 'send' file
@@ -735,45 +724,30 @@ def get_local_variable_child(
                 return child
         return None
 
-    def replay_packets(self, replay_file_path):
-        f = open(replay_file_path, "r")
-        mode = "invalid"
-        set_sequence = False
-        command_dict = None
-        while mode != "eof":
-            if mode == "invalid":
-                line = f.readline()
-                if line.startswith("to adapter:"):
-                    mode = "send"
-                elif line.startswith("from adapter:"):
-                    mode = "recv"
-            elif mode == "send":
-                command_dict = read_packet(f)
-                # Skip the end of line that follows the JSON
-                f.readline()
-                if command_dict is None:
-                    raise ValueError("decode packet failed from replay file")
-                print("Sending:")
-                pprint.PrettyPrinter(indent=2).pprint(command_dict)
-                # raw_input('Press ENTER to send:')
-                self.send_packet(command_dict, set_sequence)
-                mode = "invalid"
-            elif mode == "recv":
-                print("Replay response:")
-                replay_response = read_packet(f)
-                # Skip the end of line that follows the JSON
-                f.readline()
-                pprint.PrettyPrinter(indent=2).pprint(replay_response)
-                actual_response = self.recv_packet()
-                if actual_response:
-                    type = actual_response["type"]
+    def replay_packets(self, file: pathlib.Path, verbosity: int) -> None:
+        inflight: Dict[int, dict] = {}  # requests, keyed by seq
+        with open(file, "r") as f:
+            for line in f:
+                if "-->" in line:
+                    command_dict = json.loads(line.split("--> ")[1])
+                    if verbosity > 0:
+                        print("Sending:")
+                        pprint.PrettyPrinter(indent=2).pprint(command_dict)
+                    seq = self.send_packet(command_dict)
+                    if command_dict["type"] == "request":
+                        inflight[seq] = command_dict
+                elif "<--" in line:
+                    replay_response = json.loads(line.split("<-- ")[1])
+                    print("Replay response:")
+                    pprint.PrettyPrinter(indent=2).pprint(replay_response)
+                    actual_response = self._recv_packet(
+                        predicate=lambda packet: replay_response == packet
+                    )
                     print("Actual response:")
-                    if type == "response":
-                        self.validate_response(command_dict, actual_response)
                     pprint.PrettyPrinter(indent=2).pprint(actual_response)
-                else:
-                    print("error: didn't get a valid response")
-                mode = "invalid"
+                    if actual_response and actual_response["type"] == 
"response":
+                        command_dict = inflight[actual_response["request_seq"]]
+                        self.validate_response(command_dict, actual_response)
 
     def request_attach(
         self,
@@ -1646,65 +1620,63 @@ def __str__(self):
             return f"lldb-dap returned non-zero exit status {self.returncode}."
 
 
-def attach_options_specified(options):
-    if options.pid is not None:
+def attach_options_specified(opts):
+    if opts.pid is not None:
         return True
-    if options.waitFor:
+    if opts.wait_for:
         return True
-    if options.attach:
+    if opts.attach:
         return True
-    if options.attachCmds:
+    if opts.attach_command:
         return True
     return False
 
 
-def run_vscode(dbg, args, options):
-    dbg.request_initialize(options.sourceInitFile)
+def run_adapter(dbg: DebugCommunication, opts: argparse.Namespace) -> None:
+    dbg.request_initialize(opts.source_init_file)
 
-    if options.sourceBreakpoints:
-        source_to_lines = {}
-        for file_line in options.sourceBreakpoints:
-            (path, line) = file_line.split(":")
-            if len(path) == 0 or len(line) == 0:
-                print('error: invalid source with line "%s"' % (file_line))
-
-            else:
-                if path in source_to_lines:
-                    source_to_lines[path].append(int(line))
-                else:
-                    source_to_lines[path] = [int(line)]
-        for source in source_to_lines:
-            dbg.request_setBreakpoints(Source(source), source_to_lines[source])
-    if options.funcBreakpoints:
-        dbg.request_setFunctionBreakpoints(options.funcBreakpoints)
+    source_to_lines: Dict[str, List[int]] = {}
+    for sbp in cast(List[str], opts.source_bp):
+        if ":" not in sbp:
+            print('error: invalid source with line "%s"' % (sbp))
+            continue
+        path, line = sbp.split(":")
+        if path in source_to_lines:
+            source_to_lines[path].append(int(line))
+        else:
+            source_to_lines[path] = [int(line)]
+    for source in source_to_lines:
+        dbg.request_setBreakpoints(Source.build(path=source), 
source_to_lines[source])
+    if opts.function_bp:
+        dbg.request_setFunctionBreakpoints(opts.function_bp)
 
     dbg.request_configurationDone()
 
-    if attach_options_specified(options):
+    if attach_options_specified(opts):
         response = dbg.request_attach(
-            program=options.program,
-            pid=options.pid,
-            waitFor=options.waitFor,
-            attachCommands=options.attachCmds,
-            initCommands=options.initCmds,
-            preRunCommands=options.preRunCmds,
-            stopCommands=options.stopCmds,
-            exitCommands=options.exitCmds,
-            terminateCommands=options.terminateCmds,
+            program=opts.program,
+            pid=opts.pid,
+            waitFor=opts.wait_for,
+            attachCommands=opts.attach_command,
+            initCommands=opts.init_command,
+            preRunCommands=opts.pre_run_command,
+            stopCommands=opts.stop_command,
+            terminateCommands=opts.terminate_command,
+            exitCommands=opts.exit_command,
         )
     else:
         response = dbg.request_launch(
-            options.program,
-            args=args,
-            env=options.envs,
-            cwd=options.workingDir,
-            debuggerRoot=options.debuggerRoot,
-            sourcePath=options.sourcePath,
-            initCommands=options.initCmds,
-            preRunCommands=options.preRunCmds,
-            stopCommands=options.stopCmds,
-            exitCommands=options.exitCmds,
-            terminateCommands=options.terminateCmds,
+            opts.program,
+            args=opts.args,
+            env=opts.env,
+            cwd=opts.working_dir,
+            debuggerRoot=opts.debugger_root,
+            sourceMap=opts.source_map,
+            initCommands=opts.init_command,
+            preRunCommands=opts.pre_run_command,
+            stopCommands=opts.stop_command,
+            exitCommands=opts.exit_command,
+            terminateCommands=opts.terminate_command,
         )
 
     if response["success"]:
@@ -1716,110 +1688,98 @@ def run_vscode(dbg, args, options):
 
 
 def main():
-    parser = optparse.OptionParser(
+    parser = argparse.ArgumentParser(
+        prog="dap_server.py",
         description=(
             "A testing framework for the Visual Studio Code Debug Adapter 
protocol"
-        )
+        ),
     )
 
-    parser.add_option(
-        "--vscode",
-        type="string",
-        dest="vscode_path",
+    parser.add_argument(
+        "--adapter",
         help=(
-            "The path to the command line program that implements the "
-            "Visual Studio Code Debug Adapter protocol."
+            "The path to the command line program that implements the Debug 
Adapter protocol."
         ),
-        default=None,
     )
 
-    parser.add_option(
+    parser.add_argument(
+        "--adapter-arg",
+        action="append",
+        default=[],
+        help="Additional args to pass to the debug adapter.",
+    )
+
+    parser.add_argument(
         "--program",
-        type="string",
-        dest="program",
         help="The path to the program to debug.",
-        default=None,
     )
 
-    parser.add_option(
-        "--workingDir",
-        type="string",
-        dest="workingDir",
-        default=None,
+    parser.add_argument(
+        "--working-dir",
         help="Set the working directory for the process we launch.",
     )
 
-    parser.add_option(
-        "--sourcePath",
-        type="string",
-        dest="sourcePath",
-        default=None,
+    parser.add_argument(
+        "--source-map",
+        nargs=2,
+        action="extend",
+        metavar=("PREFIX", "REPLACEMENT"),
         help=(
-            "Set the relative source root for any debug info that has "
-            "relative paths in it."
+            "Source path remappings apply substitutions to the paths of source 
"
+            "files, typically needed to debug from a different host than the "
+            "one that built the target."
         ),
     )
 
-    parser.add_option(
-        "--debuggerRoot",
-        type="string",
-        dest="debuggerRoot",
-        default=None,
+    parser.add_argument(
+        "--debugger-root",
         help=(
             "Set the working directory for lldb-dap for any object files "
             "with relative paths in the Mach-o debug map."
         ),
     )
 
-    parser.add_option(
+    parser.add_argument(
         "-r",
         "--replay",
-        type="string",
-        dest="replay",
         help=(
             "Specify a file containing a packet log to replay with the "
-            "current Visual Studio Code Debug Adapter executable."
+            "current debug adapter."
         ),
-        default=None,
     )
 
-    parser.add_option(
+    parser.add_argument(
         "-g",
         "--debug",
         action="store_true",
-        dest="debug",
-        default=False,
-        help="Pause waiting for a debugger to attach to the debug adapter",
+        help="Pause waiting for a debugger to attach to the debug adapter.",
     )
 
-    parser.add_option(
-        "--sourceInitFile",
+    parser.add_argument(
+        "--source-init-file",
         action="store_true",
-        dest="sourceInitFile",
-        default=False,
-        help="Whether lldb-dap should source .lldbinit file or not",
+        help="Whether lldb-dap should source .lldbinit file or not.",
     )
 
-    parser.add_option(
+    parser.add_argument(
         "--connection",
         dest="connection",
-        help="Attach a socket connection of using STDIN for VSCode",
-        default=None,
+        help=(
+            "Communicate with the debug adapter over specified connection "
+            "instead of launching the debug adapter directly."
+        ),
     )
 
-    parser.add_option(
+    parser.add_argument(
         "--pid",
-        type="int",
+        type=int,
         dest="pid",
-        help="The process ID to attach to",
-        default=None,
+        help="The process ID to attach to.",
     )
 
-    parser.add_option(
+    parser.add_argument(
         "--attach",
         action="store_true",
-        dest="attach",
-        default=False,
         help=(
             "Specify this option to attach to a process by name. The "
             "process name is the basename of the executable specified with "
@@ -1827,38 +1787,30 @@ def main():
         ),
     )
 
-    parser.add_option(
+    parser.add_argument(
         "-f",
         "--function-bp",
-        type="string",
         action="append",
-        dest="funcBreakpoints",
+        default=[],
+        metavar="FUNCTION",
         help=(
-            "Specify the name of a function to break at. "
-            "Can be specified more than once."
+            "Specify the name of a function to break at. Can be specified more 
"
+            "than once."
         ),
-        default=[],
     )
 
-    parser.add_option(
+    parser.add_argument(
         "-s",
         "--source-bp",
-        type="string",
         action="append",
-        dest="sourceBreakpoints",
         default=[],
-        help=(
-            "Specify source breakpoints to set in the format of "
-            "<source>:<line>. "
-            "Can be specified more than once."
-        ),
+        metavar="SOURCE:LINE",
+        help="Specify source breakpoints to set. Can be specified more than 
once.",
     )
 
-    parser.add_option(
-        "--attachCommand",
-        type="string",
+    parser.add_argument(
+        "--attach-command",
         action="append",
-        dest="attachCmds",
         default=[],
         help=(
             "Specify a LLDB command that will attach to a process. "
@@ -1866,11 +1818,9 @@ def main():
         ),
     )
 
-    parser.add_option(
-        "--initCommand",
-        type="string",
+    parser.add_argument(
+        "--init-command",
         action="append",
-        dest="initCmds",
         default=[],
         help=(
             "Specify a LLDB command that will be executed before the target "
@@ -1878,11 +1828,9 @@ def main():
         ),
     )
 
-    parser.add_option(
-        "--preRunCommand",
-        type="string",
+    parser.add_argument(
+        "--pre-run-command",
         action="append",
-        dest="preRunCmds",
         default=[],
         help=(
             "Specify a LLDB command that will be executed after the target "
@@ -1890,11 +1838,9 @@ def main():
         ),
     )
 
-    parser.add_option(
-        "--stopCommand",
-        type="string",
+    parser.add_argument(
+        "--stop-command",
         action="append",
-        dest="stopCmds",
         default=[],
         help=(
             "Specify a LLDB command that will be executed each time the"
@@ -1902,11 +1848,9 @@ def main():
         ),
     )
 
-    parser.add_option(
-        "--exitCommand",
-        type="string",
+    parser.add_argument(
+        "--exit-command",
         action="append",
-        dest="exitCmds",
         default=[],
         help=(
             "Specify a LLDB command that will be executed when the process "
@@ -1914,11 +1858,9 @@ def main():
         ),
     )
 
-    parser.add_option(
-        "--terminateCommand",
-        type="string",
+    parser.add_argument(
+        "--terminate-command",
         action="append",
-        dest="terminateCmds",
         default=[],
         help=(
             "Specify a LLDB command that will be executed when the debugging "
@@ -1926,20 +1868,18 @@ def main():
         ),
     )
 
-    parser.add_option(
+    parser.add_argument(
         "--env",
-        type="string",
         action="append",
-        dest="envs",
         default=[],
-        help=("Specify environment variables to pass to the launched " 
"process."),
+        metavar="NAME=VALUE",
+        help="Specify environment variables to pass to the launched process. 
Can be specified more than once.",
     )
 
-    parser.add_option(
-        "--waitFor",
+    parser.add_argument(
+        "-w",
+        "--wait-for",
         action="store_true",
-        dest="waitFor",
-        default=False,
         help=(
             "Wait for the next process to be launched whose name matches "
             "the basename of the program specified with the --program "
@@ -1947,24 +1887,36 @@ def main():
         ),
     )
 
-    (options, args) = parser.parse_args(sys.argv[1:])
+    parser.add_argument(
+        "-v", "--verbose", help="Verbosity level.", action="count", default=0
+    )
 
-    if options.vscode_path is None and options.connection is None:
+    parser.add_argument(
+        "args",
+        nargs="*",
+        help="A list containing all the arguments to be passed to the 
executable when it is run.",
+    )
+
+    opts = parser.parse_args()
+
+    if opts.adapter is None and opts.connection is None:
         print(
-            "error: must either specify a path to a Visual Studio Code "
-            "Debug Adapter vscode executable path using the --vscode "
-            "option, or using the --connection option"
+            "error: must either specify a path to a Debug Protocol Adapter "
+            "executable using the --adapter option, or using the --connection "
+            "option"
         )
         return
     dbg = DebugAdapterServer(
-        executable=options.vscode_path, connection=options.connection
+        executable=opts.adapter,
+        connection=opts.connection,
+        additional_args=opts.adapter_arg,
     )
-    if options.debug:
-        raw_input('Waiting for debugger to attach pid "%i"' % (dbg.get_pid()))
-    if options.replay:
-        dbg.replay_packets(options.replay)
+    if opts.debug:
+        input('Waiting for debugger to attach pid "%i"' % (dbg.get_pid()))
+    if opts.replay:
+        dbg.replay_packets(opts.replay)
     else:
-        run_vscode(dbg, args, options)
+        run_adapter(dbg, opts)
     dbg.terminate()
 
 

>From 9d896a2d855d091cc12218877beccf57e7405d76 Mon Sep 17 00:00:00 2001
From: John Harrison <[email protected]>
Date: Thu, 13 Nov 2025 09:20:15 -0800
Subject: [PATCH 2/4] Make sure we only split 1 time on '-->' or '<--' when
 replaying a log file.

---
 .../Python/lldbsuite/test/tools/lldb-dap/dap_server.py      | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index f0aef30b3cd1d..583b3204df457 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -729,7 +729,8 @@ def replay_packets(self, file: pathlib.Path, verbosity: 
int) -> None:
         with open(file, "r") as f:
             for line in f:
                 if "-->" in line:
-                    command_dict = json.loads(line.split("--> ")[1])
+                    packet = line.split("--> ", maxsplit=1)[1]
+                    command_dict = json.loads(packet)
                     if verbosity > 0:
                         print("Sending:")
                         pprint.PrettyPrinter(indent=2).pprint(command_dict)
@@ -737,7 +738,8 @@ def replay_packets(self, file: pathlib.Path, verbosity: 
int) -> None:
                     if command_dict["type"] == "request":
                         inflight[seq] = command_dict
                 elif "<--" in line:
-                    replay_response = json.loads(line.split("<-- ")[1])
+                    packet = line.split("<-- ", maxsplit=1)[1]
+                    replay_response = json.loads(packet)
                     print("Replay response:")
                     pprint.PrettyPrinter(indent=2).pprint(replay_response)
                     actual_response = self._recv_packet(

>From 8af0bd2a6d0f4cc887406f386501159a839f58de Mon Sep 17 00:00:00 2001
From: John Harrison <[email protected]>
Date: Thu, 13 Nov 2025 14:01:16 -0800
Subject: [PATCH 3/4] Re-write the replay method to only replay requests.

Additionally, I updated the replay logic to try to re-write some stateful 
values from requests/responses, otherwise we end up making requests with bad 
values like 'threadId' or 'frameId'.
---
 .../test/tools/lldb-dap/dap_server.py         | 233 +++++++++++++-----
 1 file changed, 177 insertions(+), 56 deletions(-)

diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index 583b3204df457..90914005c2ab6 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -1,32 +1,39 @@
 #!/usr/bin/env python3
 
+import argparse
 import binascii
+import copy
+import dataclasses
+import enum
 import json
-import argparse
+import logging
 import os
+import pathlib
 import pprint
+import re
+import signal
 import socket
 import string
 import subprocess
-import signal
 import sys
-import pathlib
 import threading
-import warnings
 import time
+import warnings
 from typing import (
     Any,
-    Optional,
-    Dict,
+    BinaryIO,
+    Callable,
     cast,
+    Dict,
     List,
-    Callable,
-    Union,
-    BinaryIO,
-    TypedDict,
     Literal,
+    Optional,
+    Tuple,
+    TypedDict,
+    Union,
 )
 
+
 # set timeout based on whether ASAN was enabled or not. Increase
 # timeout by a factor of 10 if ASAN is enabled.
 DEFAULT_TIMEOUT = 10 * (10 if ("ASAN_OPTIONS" in os.environ) else 1)
@@ -160,7 +167,62 @@ class NotSupportedError(KeyError):
     """Raised if a feature is not supported due to its capabilities."""
 
 
+class ReplayMods(TypedDict, total=False):
+    """Fields that can be overwritten in requests during a replay."""
+
+    frameId: Optional[int]
+    threadId: Optional[int]
+
+
[email protected]
+class Log:
+    class Dir(enum.Enum):
+        SENT = 1
+        RECV = 2
+
+    @property
+    def requests(self) -> List[Tuple[Dir, Request]]:
+        """All requests in the log, in order."""
+        return [m for m in self.messages if m[1]["type"] == "request"]
+
+    @property
+    def events(self) -> List[Tuple[Dir, Event]]:
+        """All events in the log, in order."""
+        return [m for m in self.messages if m[1]["type"] == "event"]
+
+    @property
+    def responses(self) -> List[Tuple[Dir, Response]]:
+        """All responses in the log, in order."""
+        return [m for m in self.messages if m[1]["type"] == "response"]
+
+    messages: List[Tuple[Dir, ProtocolMessage]] = dataclasses.field(
+        default_factory=list
+    )
+
+    @classmethod
+    def load(cls, file: pathlib.Path) -> "Log":
+        """Load the file and parse any log messages. Returns (sent, recv)."""
+        sent_pattern = re.compile(r"\d+\.\d+ \(.+\) --> ")
+        recv_pattern = re.compile(r"\d+\.\d+ \(.+\) <-- ")
+
+        log = Log()
+        with open(file, "r") as f:
+            for line in f:
+                if sent_pattern.match(line):
+                    packet = line.split("--> ", maxsplit=1)[1]
+                    log.messages.append((Log.Dir.SENT, json.loads(packet)))
+                elif recv_pattern.match(line):
+                    packet = line.split("<-- ", maxsplit=1)[1]
+                    log.messages.append((Log.Dir.RECV, json.loads(packet)))
+        return log
+
+
 class DebugCommunication(object):
+    @property
+    def is_stopped(self):
+        """Returns True if the debuggee is in a stopped state, otherwise 
False."""
+        return len(self.thread_stop_reasons) > 0 or self.exit_status is not 
None
+
     def __init__(
         self,
         recv: BinaryIO,
@@ -169,6 +231,7 @@ def __init__(
         log_file: Optional[str] = None,
         spawn_helper: Optional[SpawnHelperCallback] = None,
     ):
+        self._log = Log()
         self.log_file = log_file
         self.send = send
         self.recv = recv
@@ -203,11 +266,16 @@ def __init__(
 
         # debuggee state
         self.threads: Optional[dict] = None
+        self.stopped_thread: Optional[dict] = None
+        self.thread_stacks: Optional[Dict[int, List[dict]]]
         self.thread_stop_reasons: Dict[str, Any] = {}
         self.frame_scopes: Dict[str, Any] = {}
         # keyed by breakpoint id
         self.resolved_breakpoints: dict[str, Breakpoint] = {}
 
+        # Modifiers used when replaying a log file.
+        self._mod = ReplayMods()
+
         # trigger enqueue thread
         self._recv_thread.start()
 
@@ -251,16 +319,13 @@ def _read_packet(self) -> Optional[ProtocolMessage]:
         raise Exception("unexpected malformed message from lldb-dap: " + line)
 
     def _read_packet_thread(self):
-        try:
-            while True:
-                packet = self._read_packet()
-                # `packet` will be `None` on EOF. We want to pass it down to
-                # handle_recv_packet anyway so the main thread can handle 
unexpected
-                # termination of lldb-dap and stop waiting for new packets.
-                if not self._handle_recv_packet(packet):
-                    break
-        finally:
-            dump_dap_log(self.log_file)
+        while True:
+            packet = self._read_packet()
+            # `packet` will be `None` on EOF. We want to pass it down to
+            # handle_recv_packet anyway so the main thread can handle 
unexpected
+            # termination of lldb-dap and stop waiting for new packets.
+            if not self._handle_recv_packet(packet):
+                break
 
     def get_modules(
         self, start_module: Optional[int] = None, module_count: Optional[int] 
= None
@@ -381,6 +446,8 @@ def _process_recv_packets(self) -> None:
                     warnings.warn(
                         f"received a malformed packet, expected 'seq != 0' for 
{packet!r}"
                     )
+                if packet:
+                    self._log.messages.append((Log.Dir.RECV, packet))
                 # Handle events that may modify any stateful properties of
                 # the DAP session.
                 if packet and packet["type"] == "event":
@@ -519,6 +586,8 @@ def send_packet(self, packet: ProtocolMessage) -> int:
             self.send.write(self.encode_content(json_str))
             self.send.flush()
 
+        self._log.messages.append((Log.Dir.SENT, packet))
+
         return packet["seq"]
 
     def _send_recv(self, request: Request) -> Optional[Response]:
@@ -724,32 +793,69 @@ def get_local_variable_child(
                 return child
         return None
 
-    def replay_packets(self, file: pathlib.Path, verbosity: int) -> None:
-        inflight: Dict[int, dict] = {}  # requests, keyed by seq
-        with open(file, "r") as f:
-            for line in f:
-                if "-->" in line:
-                    packet = line.split("--> ", maxsplit=1)[1]
-                    command_dict = json.loads(packet)
-                    if verbosity > 0:
-                        print("Sending:")
-                        pprint.PrettyPrinter(indent=2).pprint(command_dict)
-                    seq = self.send_packet(command_dict)
-                    if command_dict["type"] == "request":
-                        inflight[seq] = command_dict
-                elif "<--" in line:
-                    packet = line.split("<-- ", maxsplit=1)[1]
-                    replay_response = json.loads(packet)
-                    print("Replay response:")
-                    pprint.PrettyPrinter(indent=2).pprint(replay_response)
-                    actual_response = self._recv_packet(
-                        predicate=lambda packet: replay_response == packet
-                    )
-                    print("Actual response:")
-                    pprint.PrettyPrinter(indent=2).pprint(actual_response)
-                    if actual_response and actual_response["type"] == 
"response":
-                        command_dict = inflight[actual_response["request_seq"]]
-                        self.validate_response(command_dict, actual_response)
+    def _preconditions(self, req: Request) -> None:
+        """Validate any preconditions for the given command, potentially 
waiting
+        for the debuggee to be in a specific state.
+        """
+        if req["command"] == "threads":
+            logging.debug("Waiting on precondition: stopped")
+            self._recv_packet(predicate=lambda _: self.is_stopped)
+
+        # Apply any modifications to arguments.
+        args = req["arguments"]
+        if "threadId" in args and "threadId" in self._mod:
+            args["threadId"] = self._mod["threadId"]
+        if "frameId" in args and "frameId" in self._mod:
+            args["frameId"] = self._mod["frameId"]
+
+    def _postconditions(self, resp: Response) -> None:
+        """Validate any postconditions for the given response, potentially
+        waiting for the debuggee to be in a specific state.
+        """
+        if resp["command"] == "launch":
+            logging.debug("Waiting on postcondition: initialized")
+            self._recv_packet(predicate=lambda _: self.initialized)
+        elif resp["command"] == "configurationDone":
+            logging.debug("Waiting on postcondition: process")
+            self._recv_packet(predicate=lambda _: self.process_event_body is 
not None)
+
+        # Store some modifications related to replayed requests.
+        if resp["command"] == "threads":
+            self._mod["threadId"] = resp["body"]["threads"][0]["id"]
+        if resp["command"] in ["continue", "next", "stepIn", "stepOut", 
"pause"]:
+            self._mod.clear()
+            self._recv_packet(predicate=lambda _: self.is_stopped)
+        if resp["command"] == "stackTrace" and not self._mod.get("frameId", 
None):
+            self._mod["frameId"] = next(
+                (frame["id"] for frame in resp["body"]["stackFrames"]), None
+            )
+
+    def replay(self, file: pathlib.Path) -> None:
+        """Replay a log file."""
+        log = Log.load(file)
+        responses = {
+            r["request_seq"]: r for (dir, r) in log.responses if dir == 
Log.Dir.RECV
+        }
+        for dir, packet in log.messages:
+            if dir != Log.Dir.SENT or packet["type"] != "request":
+                continue
+            req = packet
+            want = responses[req["seq"]]
+
+            self._preconditions(req)
+
+            logging.info("Sending req %r", req)
+            got = self._send_recv(req)
+            logging.info("Received resp %r", got)
+
+            assert (
+                got["command"] == want["command"] == req["command"]
+            ), f"got {got} want {want} for req {req}"
+            assert (
+                got["success"] == want["success"]
+            ), f"got {got} want {want} for req {req}"
+
+            self._postconditions(got)
 
     def request_attach(
         self,
@@ -1447,6 +1553,8 @@ def terminate(self):
         self.send.close()
         if self._recv_thread.is_alive():
             self._recv_thread.join()
+        if self.log_file:
+            dump_dap_log(self.log_file)
 
     def request_setInstructionBreakpoints(self, memory_reference=[]):
         breakpoints = []
@@ -1640,7 +1748,7 @@ def run_adapter(dbg: DebugCommunication, opts: 
argparse.Namespace) -> None:
     source_to_lines: Dict[str, List[int]] = {}
     for sbp in cast(List[str], opts.source_bp):
         if ":" not in sbp:
-            print('error: invalid source with line "%s"' % (sbp))
+            print(f"error: invalid source with line {sbp!r}", file=sys.stderr)
             continue
         path, line = sbp.split(":")
         if path in source_to_lines:
@@ -1684,8 +1792,7 @@ def run_adapter(dbg: DebugCommunication, opts: 
argparse.Namespace) -> None:
     if response["success"]:
         dbg.wait_for_stopped()
     else:
-        if "message" in response:
-            print(response["message"])
+        print("failed to launch/attach: ", response)
     dbg.request_disconnect(terminateDebuggee=True)
 
 
@@ -1901,11 +2008,23 @@ def main():
 
     opts = parser.parse_args()
 
+    logging.basicConfig(
+        format="%(asctime)s - %(levelname)s - %(message)s",
+        level=(
+            logging.DEBUG
+            if opts.verbose > 1
+            else logging.INFO
+            if opts.verbose > 0
+            else logging.WARNING
+        ),
+    )
+
     if opts.adapter is None and opts.connection is None:
         print(
             "error: must either specify a path to a Debug Protocol Adapter "
             "executable using the --adapter option, or using the --connection "
-            "option"
+            "option",
+            file=sys.stderr,
         )
         return
     dbg = DebugAdapterServer(
@@ -1914,12 +2033,14 @@ def main():
         additional_args=opts.adapter_arg,
     )
     if opts.debug:
-        input('Waiting for debugger to attach pid "%i"' % (dbg.get_pid()))
-    if opts.replay:
-        dbg.replay_packets(opts.replay)
-    else:
-        run_adapter(dbg, opts)
-    dbg.terminate()
+        input(f"Waiting for debugger to attach pid '{dbg.get_pid()}'")
+    try:
+        if opts.replay:
+            dbg.replay(opts.replay)
+        else:
+            run_adapter(dbg, opts)
+    finally:
+        dbg.terminate()
 
 
 if __name__ == "__main__":

>From 995a25038ef42cc38eef49c25e07af8382f02a53 Mon Sep 17 00:00:00 2001
From: John Harrison <[email protected]>
Date: Mon, 8 Dec 2025 11:38:37 -0800
Subject: [PATCH 4/4] Forwarding additional output to stderr.

---
 .../Python/lldbsuite/test/tools/lldb-dap/dap_server.py        | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py 
b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
index 90914005c2ab6..d1082dbac09dc 100644
--- a/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
+++ b/lldb/packages/Python/lldbsuite/test/tools/lldb-dap/dap_server.py
@@ -2,14 +2,12 @@
 
 import argparse
 import binascii
-import copy
 import dataclasses
 import enum
 import json
 import logging
 import os
 import pathlib
-import pprint
 import re
 import signal
 import socket
@@ -1792,7 +1790,7 @@ def run_adapter(dbg: DebugCommunication, opts: 
argparse.Namespace) -> None:
     if response["success"]:
         dbg.wait_for_stopped()
     else:
-        print("failed to launch/attach: ", response)
+        print("failed to launch/attach: ", response, file=sys.stderr)
     dbg.request_disconnect(terminateDebuggee=True)
 
 

_______________________________________________
lldb-commits mailing list
[email protected]
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to