https://github.com/python/cpython/commit/3efd2f4db6de0990136a9503910cbb36343ec0af
commit: 3efd2f4db6de0990136a9503910cbb36343ec0af
branch: main
author: Pablo Galindo Salgado <[email protected]>
committer: pablogsal <[email protected]>
date: 2026-05-04T01:02:33+01:00
summary:

gh-149296: Add `dump` subcommand to sampling profiler for one-shot stack 
snapshots (#149297)

Adds `python -m profiling.sampling dump <pid>`, which prints a single
traceback-style snapshot of a running process's Python stack via the
existing `_remote_debugging` unwinder. Supports per-thread status,
source line highlighting, optional bytecode opcodes, and async-aware
task reconstruction (`--async-aware`, default `--async-mode=all`).

files:
A Lib/profiling/sampling/dump.py
A Lib/test/test_profiling/test_sampling_profiler/test_dump.py
A Misc/NEWS.d/next/Library/2026-05-02-19-09-04.gh-issue-149296.DuKF0j.rst
M Doc/library/profiling.sampling.rst
M Doc/whatsnew/3.15.rst
M Lib/_colorize.py
M Lib/profiling/sampling/cli.py
M Lib/profiling/sampling/collector.py
M Lib/profiling/sampling/sample.py
M Lib/test/test_profiling/test_sampling_profiler/test_cli.py
M Lib/test/test_profiling/test_sampling_profiler/test_profiler.py

diff --git a/Doc/library/profiling.sampling.rst 
b/Doc/library/profiling.sampling.rst
index 790d36001800d0..aeb1a429b58515 100644
--- a/Doc/library/profiling.sampling.rst
+++ b/Doc/library/profiling.sampling.rst
@@ -153,6 +153,10 @@ Attach to a running process by PID::
 
    python -m profiling.sampling attach 12345
 
+Print a single snapshot of a running process's stack::
+
+   python -m profiling.sampling dump 12345
+
 Use live mode for real-time monitoring (press ``q`` to quit)::
 
    python -m profiling.sampling run --live script.py
@@ -173,8 +177,9 @@ Enable opcode-level profiling to see which bytecode 
instructions are executing::
 Commands
 ========
 
-Tachyon operates through two subcommands that determine how to obtain the
-target process.
+Tachyon operates through several subcommands. ``run`` and ``attach`` collect
+samples over time; ``dump`` captures a single snapshot; ``replay`` converts
+binary profiles to other formats.
 
 
 The ``run`` command
@@ -217,6 +222,78 @@ On most systems, attaching to another process requires 
appropriate permissions.
 See :ref:`profiling-permissions` for platform-specific requirements.
 
 
+.. _dump-command:
+
+The ``dump`` command
+--------------------
+
+The ``dump`` command prints a single snapshot of a running process's Python
+stack and exits, similar to a traceback::
+
+   python -m profiling.sampling dump 12345
+
+Unlike ``attach``, ``dump`` does not run a sampling loop: it reads the
+stack once. This is useful for investigating hung or unresponsive
+processes, or for answering "what is this process doing right now?".
+
+The output mirrors a traceback (most recent call last) and annotates each
+thread with its current state (main thread, has GIL, on CPU, waiting for
+GIL, has exception, or idle):
+
+.. code-block:: text
+
+   Stack dump for PID 12345, thread 140735 (main thread, has GIL, on CPU; most 
recent call last):
+     File "server.py", line 28, in serve
+       await handle_request(req)
+     File "handler.py", line 91, in handle_request
+       result = expensive_call(req)
+
+When the target's source files are readable, ``dump`` prints the source
+line for each frame and highlights the executing expression.
+
+Like ``attach``, ``dump`` requires permission to read the target process's
+memory. See :ref:`profiling-permissions`.
+
+The ``dump`` command supports the following options:
+
+``-a``, ``--all-threads``
+   Dump every thread in the target process. Without this flag only the main
+   thread is shown.
+
+``--native``
+   Include synthetic ``<native>`` frames marking transitions into C
+   extensions or other non-Python code.
+
+``--no-gc``
+   Hide the synthetic ``<GC>`` frames that mark active garbage collection.
+
+``--opcodes``
+   Annotate each frame with the bytecode opcode the thread is currently
+   executing (for example, ``opcode=CALL_KW``). Useful for
+   instruction-level investigation, including identifying specializations
+   chosen by the adaptive interpreter.
+
+``--async-aware``
+   Reconstruct stacks across ``await`` boundaries. ``dump`` walks the task
+   graph and emits one section per task, with ``<task>`` markers separating
+   coroutines awaiting each other.
+
+``--async-mode {running,all}``
+   Controls which tasks are included when ``--async-aware`` is enabled.
+   ``running`` shows only the task currently executing on each thread;
+   ``all`` (the default for ``dump``) also includes tasks suspended on a
+   wait. ``attach``'s default for this flag is ``running``; ``dump``
+   defaults to ``all`` because a single snapshot is most useful when it
+   shows the full task graph.
+
+``--blocking``
+   Pause every thread in the target while reading its stack and resume
+   them after. Guarantees a fully consistent snapshot at the cost of
+   briefly stopping the target. Without it, ``dump`` reads memory while
+   the target keeps running, which is faster but can occasionally produce
+   a torn stack.
+
+
 .. _replay-command:
 
 The ``replay`` command
@@ -1441,11 +1518,52 @@ Global options
 
    Attach to and profile a running process by PID.
 
+.. option:: dump
+
+   Print a single one-shot snapshot of a running process's Python stack.
+
 .. option:: replay
 
    Convert a binary profile file to another output format.
 
 
+Dump options
+------------
+
+The following options apply to the ``dump`` subcommand:
+
+.. option:: -a, --all-threads
+
+   Dump all threads in the target process instead of just the main thread.
+
+.. option:: --native
+
+   Include ``<native>`` frames for non-Python code.
+
+.. option:: --no-gc
+
+   Exclude ``<GC>`` frames for active garbage collection.
+
+.. option:: --opcodes
+
+   Show bytecode opcode names when available.
+
+.. option:: --async-aware
+
+   Reconstruct the stack across ``await`` boundaries for asyncio
+   applications.
+
+.. option:: --async-mode <mode>
+
+   Async stack mode: ``running`` (only the running task) or ``all``
+   (all tasks including waiting). Defaults to ``all`` for ``dump``.
+   Requires :option:`--async-aware`.
+
+.. option:: --blocking
+
+   Pause all threads in the target process while reading the stack.
+
+
 Sampling options
 ----------------
 
diff --git a/Doc/whatsnew/3.15.rst b/Doc/whatsnew/3.15.rst
index 53628b2ff46cfc..586a1306d83c4c 100644
--- a/Doc/whatsnew/3.15.rst
+++ b/Doc/whatsnew/3.15.rst
@@ -322,6 +322,9 @@ Key features include:
   * Profile running processes by PID (``attach``) - attach to already-running 
applications
   * Run and profile scripts directly (``run``) - profile from the very start 
of execution
   * Execute and profile modules (``run -m``) - profile packages run as 
``python -m module``
+  * Capture a one-shot snapshot of a running process (``dump``) - print a
+    traceback-style stack of every thread (or all asyncio tasks with
+    ``--async-aware``). Useful for investigating hung processes.
 
 * **Multiple profiling modes**: Choose what to measure based on your 
performance investigation:
 
diff --git a/Lib/_colorize.py b/Lib/_colorize.py
index 62806b1d8d7bcf..7c500e557f0180 100644
--- a/Lib/_colorize.py
+++ b/Lib/_colorize.py
@@ -359,6 +359,23 @@ class LiveProfiler(ThemeSection):
 )
 
 
+@dataclass(frozen=True, kw_only=True)
+class ProfilerDump(ThemeSection):
+    header: str = ANSIColors.BOLD_BLUE
+    interpreter: str = ANSIColors.GREY
+    thread: str = ANSIColors.BOLD_CYAN
+    status: str = ANSIColors.YELLOW
+    frame_index: str = ANSIColors.GREY
+    frame: str = ANSIColors.BOLD_GREEN
+    filename: str = ANSIColors.CYAN
+    line_no: str = ANSIColors.YELLOW
+    source: str = ANSIColors.WHITE
+    source_highlight: str = ANSIColors.BOLD_YELLOW
+    opcode: str = ANSIColors.GREY
+    warning: str = ANSIColors.YELLOW
+    reset: str = ANSIColors.RESET
+
+
 @dataclass(frozen=True, kw_only=True)
 class Pickletools(ThemeSection):
     annotation: str = ANSIColors.GREY
@@ -447,6 +464,7 @@ class Theme:
     http_server: HttpServer = field(default_factory=HttpServer)
     live_profiler: LiveProfiler = field(default_factory=LiveProfiler)
     pickletools: Pickletools = field(default_factory=Pickletools)
+    profiler_dump: ProfilerDump = field(default_factory=ProfilerDump)
     syntax: Syntax = field(default_factory=Syntax)
     timeit: Timeit = field(default_factory=Timeit)
     tokenize: Tokenize = field(default_factory=Tokenize)
@@ -463,6 +481,7 @@ def copy_with(
         http_server: HttpServer | None = None,
         live_profiler: LiveProfiler | None = None,
         pickletools: Pickletools | None = None,
+        profiler_dump: ProfilerDump | None = None,
         syntax: Syntax | None = None,
         timeit: Timeit | None = None,
         tokenize: Tokenize | None = None,
@@ -482,6 +501,7 @@ def copy_with(
             http_server=http_server or self.http_server,
             live_profiler=live_profiler or self.live_profiler,
             pickletools=pickletools or self.pickletools,
+            profiler_dump=profiler_dump or self.profiler_dump,
             syntax=syntax or self.syntax,
             timeit=timeit or self.timeit,
             tokenize=tokenize or self.tokenize,
@@ -505,6 +525,7 @@ def no_colors(cls) -> Self:
             http_server=HttpServer.no_colors(),
             live_profiler=LiveProfiler.no_colors(),
             pickletools=Pickletools.no_colors(),
+            profiler_dump=ProfilerDump.no_colors(),
             syntax=Syntax.no_colors(),
             timeit=Timeit.no_colors(),
             tokenize=Tokenize.no_colors(),
diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py
index c0aa3ae024a120..bc879c43e15965 100644
--- a/Lib/profiling/sampling/cli.py
+++ b/Lib/profiling/sampling/cli.py
@@ -14,7 +14,8 @@
 from contextlib import nullcontext
 
 from .errors import SamplingUnknownProcessError, SamplingModuleNotFoundError, 
SamplingScriptNotFoundError
-from .sample import sample, sample_live, _is_process_running
+from .sample import sample, sample_live, dump_stack, _is_process_running
+from .dump import print_stack_dump
 from .pstats_collector import PstatsCollector
 from .stack_collector import CollapsedStackCollector, FlamegraphCollector, 
DiffFlamegraphCollector
 from .heatmap_collector import HeatmapCollector
@@ -72,6 +73,9 @@ def __call__(self, parser, namespace, values, 
option_string=None):
   # Attach to a running process
   `python -m profiling.sampling attach 1234`
 
+  # Dump a running process's current stack
+  `python -m profiling.sampling dump 1234`
+
   # Live interactive mode for a script
   `python -m profiling.sampling run --live script.py`
 
@@ -548,6 +552,51 @@ def _add_pstats_options(parser):
     )
 
 
+def _add_dump_options(parser):
+    """Add one-shot stack dump options to a parser."""
+    dump_group = parser.add_argument_group("Dump options")
+    dump_group.add_argument(
+        "-a",
+        "--all-threads",
+        action="store_true",
+        help="Dump all threads in the process instead of just the main thread",
+    )
+    dump_group.add_argument(
+        "--native",
+        action="store_true",
+        help='Include artificial "<native>" frames to denote calls to 
non-Python code',
+    )
+    dump_group.add_argument(
+        "--no-gc",
+        action="store_false",
+        dest="gc",
+        help='Don\'t include artificial "<GC>" frames to denote active garbage 
collection',
+    )
+    dump_group.add_argument(
+        "--opcodes",
+        action="store_true",
+        help="Show bytecode opcode names when available.",
+    )
+    dump_group.add_argument(
+        "--async-aware",
+        action="store_true",
+        help="Enable async-aware stack reconstruction",
+    )
+    dump_group.add_argument(
+        "--async-mode",
+        choices=["running", "all"],
+        default=argparse.SUPPRESS,
+        help='Async stack mode: "running" (only running task) '
+        'or "all" (all tasks including waiting, default for dump). '
+        "Requires --async-aware",
+    )
+    dump_group.add_argument(
+        "--blocking",
+        action="store_true",
+        help="Stop all threads in target process before dumping the stack.",
+    )
+
+
 def _sort_to_mode(sort_choice):
     """Convert sort choice string to SORT_MODE constant."""
     sort_map = {
@@ -785,18 +834,10 @@ def _validate_args(args, parser):
         args: Parsed command-line arguments
         parser: ArgumentParser instance for error reporting
     """
-    # Replay command has no special validation needed
-    if getattr(args, 'command', None) == "replay":
-        return
+    command = getattr(args, 'command', None)
 
-    # Warn about blocking mode with aggressive sampling intervals
-    if args.blocking and args.sample_interval_usec < 100:
-        print(
-            f"Warning: --blocking with a {args.sample_interval_usec} µs 
interval will stop all threads "
-            f"{1_000_000 // args.sample_interval_usec} times per second. "
-            "Consider using --sampling-rate 1khz or lower to reduce overhead.",
-            file=sys.stderr
-        )
+    if command == "replay":
+        return
 
     # Check if live mode is available
     if hasattr(args, 'live') and args.live and LiveStatsCollector is None:
@@ -817,9 +858,9 @@ def _validate_args(args, parser):
     # Async-aware mode is incompatible with --native, --no-gc, --mode, and 
--all-threads
     if getattr(args, 'async_aware', False):
         issues = []
-        if args.native:
+        if getattr(args, 'native', False):
             issues.append("--native")
-        if not args.gc:
+        if not getattr(args, 'gc', True):
             issues.append("--no-gc")
         if hasattr(args, 'mode') and args.mode != "wall":
             issues.append(f"--mode={args.mode}")
@@ -831,9 +872,26 @@ def _validate_args(args, parser):
                 "Async-aware profiling uses task-based stack reconstruction."
             )
 
-    # --async-mode requires --async-aware
-    if hasattr(args, 'async_mode') and args.async_mode != "running" and not 
getattr(args, 'async_aware', False):
-        parser.error("--async-mode requires --async-aware to be enabled.")
+    # --async-mode requires --async-aware when explicitly set
+    if not getattr(args, 'async_aware', False):
+        if command == "dump":
+            # dump uses SUPPRESS default, so attr only exists if user passed it
+            if hasattr(args, 'async_mode'):
+                parser.error("--async-mode requires --async-aware to be 
enabled.")
+        elif hasattr(args, 'async_mode') and args.async_mode != "running":
+            parser.error("--async-mode requires --async-aware to be enabled.")
+
+    if command == "dump":
+        return
+
+    # Warn about blocking mode with aggressive sampling intervals
+    if args.blocking and args.sample_interval_usec < 100:
+        print(
+            f"Warning: --blocking with a {args.sample_interval_usec} µs 
interval will stop all threads "
+            f"{1_000_000 // args.sample_interval_usec} times per second. "
+            "Consider using --sampling-rate 1khz or lower to reduce overhead.",
+            file=sys.stderr
+        )
 
     # Live mode is incompatible with format options
     if hasattr(args, 'live') and args.live:
@@ -990,6 +1048,27 @@ def _main():
     _add_format_options(attach_parser)
     _add_pstats_options(attach_parser)
 
+    # === DUMP COMMAND ===
+    dump_parser = subparsers.add_parser(
+        "dump",
+        help="Dump a running process's current stack",
+        formatter_class=CustomFormatter,
+        description="""Dump a running process's current Python stack
+
+Examples:
+  # Dump the main thread stack
+  `python -m profiling.sampling dump 1234`
+
+  # Dump all thread stacks
+  `python -m profiling.sampling dump -a 1234`""",
+    )
+    dump_parser.add_argument(
+        "pid",
+        type=int,
+        help="Process ID to dump",
+    )
+    _add_dump_options(dump_parser)
+
     # === REPLAY COMMAND ===
     replay_parser = subparsers.add_parser(
         "replay",
@@ -1024,6 +1103,7 @@ def _main():
     command_handlers = {
         "run": _handle_run,
         "attach": _handle_attach,
+        "dump": _handle_dump,
         "replay": _handle_replay,
     }
 
@@ -1085,6 +1165,34 @@ def _handle_attach(args):
         _handle_output(collector, args, args.pid, mode)
 
 
+def _handle_dump(args):
+    if not _is_process_running(args.pid):
+        raise SamplingUnknownProcessError(args.pid)
+
+    # Async-aware reconstruction requires wall mode so every thread is sampled,
+    # not just those holding the GIL or on CPU.
+    mode = PROFILING_MODE_WALL if args.async_aware else PROFILING_MODE_ALL
+    async_mode = getattr(args, "async_mode", "all") if args.async_aware else 
None
+    try:
+        stack_frames = dump_stack(
+            args.pid,
+            all_threads=args.all_threads,
+            mode=mode,
+            async_aware=async_mode,
+            native=args.native,
+            gc=args.gc,
+            opcodes=args.opcodes,
+            blocking=args.blocking,
+        )
+    except ProcessLookupError:
+        sys.exit(
+            f"No stack dump collected - process {args.pid} exited before "
+            "its stack could be read."
+        )
+
+    print_stack_dump(stack_frames, pid=args.pid)
+
+
 def _handle_run(args):
     """Handle the 'run' command."""
     # Validate target exists before launching subprocess
diff --git a/Lib/profiling/sampling/collector.py 
b/Lib/profiling/sampling/collector.py
index 7dc095c6c279bd..08759b611696b7 100644
--- a/Lib/profiling/sampling/collector.py
+++ b/Lib/profiling/sampling/collector.py
@@ -62,6 +62,80 @@ def filter_internal_frames(frames):
     return [f for f in frames if not _is_internal_frame(f)]
 
 
+def _build_task_graph(awaited_info_list):
+    task_map = {}
+    child_to_parent = {}  # child_id -> (selected_parent_id, parent_count)
+    all_task_ids = set()
+    all_parent_ids = set()
+
+    for awaited_info in awaited_info_list:
+        thread_id = awaited_info.thread_id
+        for task_info in awaited_info.awaited_by:
+            task_id = task_info.task_id
+            task_map[task_id] = (task_info, thread_id)
+            all_task_ids.add(task_id)
+
+            if task_info.awaited_by:
+                parent_ids = [p.task_name for p in task_info.awaited_by]
+                parent_count = len(parent_ids)
+                all_parent_ids.update(parent_ids)
+                selected_parent = min(parent_ids) if parent_count > 1 else 
parent_ids[0]
+                child_to_parent[task_id] = (selected_parent, parent_count)
+
+    return task_map, child_to_parent, all_task_ids, all_parent_ids
+
+
+def _find_leaf_tasks(all_task_ids, all_parent_ids):
+    return all_task_ids - all_parent_ids
+
+
+def _build_linear_stacks(leaf_task_ids, task_map, child_to_parent):
+    for leaf_id in leaf_task_ids:
+        frames = []
+        visited = set()
+        current_id = leaf_id
+        thread_id = None
+
+        while current_id is not None:
+            if current_id in visited:
+                break
+            visited.add(current_id)
+
+            if current_id not in task_map:
+                break
+
+            task_info, tid = task_map[current_id]
+
+            if thread_id is None:
+                thread_id = tid
+
+            if task_info.coroutine_stack:
+                for coro_info in task_info.coroutine_stack:
+                    for frame in coro_info.call_stack:
+                        frames.append(frame)
+
+            parent_info = child_to_parent.get(current_id)
+            task_name = task_info.task_name or "Task-" + str(task_info.task_id)
+            if parent_info:
+                selected_parent, parent_count = parent_info
+                if parent_count > 1:
+                    task_name = f"{task_name} ({parent_count} parents)"
+                frames.append(FrameInfo(("<task>", None, task_name, None)))
+                current_id = selected_parent
+            else:
+                frames.append(FrameInfo(("<task>", None, task_name, None)))
+                current_id = None
+
+        if frames and thread_id is not None:
+            yield frames, thread_id, leaf_id
+
+
+def iter_async_frames(awaited_info_list):
+    task_map, child_to_parent, all_task_ids, all_parent_ids = 
_build_task_graph(awaited_info_list)
+    leaf_task_ids = _find_leaf_tasks(all_task_ids, all_parent_ids)
+    yield from _build_linear_stacks(leaf_task_ids, task_map, child_to_parent)
+
+
 class Collector(ABC):
     @abstractmethod
     def collect(self, stack_frames, timestamps_us=None):
@@ -106,19 +180,12 @@ def _iter_all_frames(self, stack_frames, skip_idle=False):
                         yield frames, thread_info.thread_id
 
     def _iter_async_frames(self, awaited_info_list):
-        # Phase 1: Index tasks and build parent relationships with 
pre-computed selection
-        task_map, child_to_parent, all_task_ids, all_parent_ids = 
self._build_task_graph(awaited_info_list)
-
-        # Phase 2: Find leaf tasks (tasks not awaited by anyone)
-        leaf_task_ids = self._find_leaf_tasks(all_task_ids, all_parent_ids)
-
-        # Phase 3: Build linear stacks from each leaf to root (optimized - no 
sorting!)
-        yield from self._build_linear_stacks(leaf_task_ids, task_map, 
child_to_parent)
+        yield from iter_async_frames(awaited_info_list)
 
     def _iter_stacks(self, stack_frames, skip_idle=False):
         """Yield (frames, thread_id) for all stacks, handling both sync and 
async modes."""
         if stack_frames and hasattr(stack_frames[0], "awaited_by"):
-            for frames, thread_id, _ in self._iter_async_frames(stack_frames):
+            for frames, thread_id, _ in iter_async_frames(stack_frames):
                 if frames:
                     yield frames, thread_id
         else:
@@ -126,85 +193,6 @@ def _iter_stacks(self, stack_frames, skip_idle=False):
                 if frames:
                     yield frames, thread_id
 
-    def _build_task_graph(self, awaited_info_list):
-        task_map = {}
-        child_to_parent = {}  # Maps child_id -> (selected_parent_id, 
parent_count)
-        all_task_ids = set()
-        all_parent_ids = set()  # Track ALL parent IDs for leaf detection
-
-        for awaited_info in awaited_info_list:
-            thread_id = awaited_info.thread_id
-            for task_info in awaited_info.awaited_by:
-                task_id = task_info.task_id
-                task_map[task_id] = (task_info, thread_id)
-                all_task_ids.add(task_id)
-
-                # Pre-compute selected parent and count for optimization
-                if task_info.awaited_by:
-                    parent_ids = [p.task_name for p in task_info.awaited_by]
-                    parent_count = len(parent_ids)
-                    # Track ALL parents for leaf detection
-                    all_parent_ids.update(parent_ids)
-                    # Use min() for O(n) instead of sorted()[0] which is O(n 
log n)
-                    selected_parent = min(parent_ids) if parent_count > 1 else 
parent_ids[0]
-                    child_to_parent[task_id] = (selected_parent, parent_count)
-
-        return task_map, child_to_parent, all_task_ids, all_parent_ids
-
-    def _find_leaf_tasks(self, all_task_ids, all_parent_ids):
-        # Leaves are tasks that are not parents of any other task
-        return all_task_ids - all_parent_ids
-
-    def _build_linear_stacks(self, leaf_task_ids, task_map, child_to_parent):
-        for leaf_id in leaf_task_ids:
-            frames = []
-            visited = set()
-            current_id = leaf_id
-            thread_id = None
-
-            # Follow the single parent chain from leaf to root
-            while current_id is not None:
-                # Cycle detection
-                if current_id in visited:
-                    break
-                visited.add(current_id)
-
-                # Check if task exists in task_map
-                if current_id not in task_map:
-                    break
-
-                task_info, tid = task_map[current_id]
-
-                # Set thread_id from first task
-                if thread_id is None:
-                    thread_id = tid
-
-                # Add all frames from all coroutines in this task
-                if task_info.coroutine_stack:
-                    for coro_info in task_info.coroutine_stack:
-                        for frame in coro_info.call_stack:
-                            frames.append(frame)
-
-                # Get pre-computed parent info (no sorting needed!)
-                parent_info = child_to_parent.get(current_id)
-
-                # Add task boundary marker with parent count annotation if 
multiple parents
-                task_name = task_info.task_name or "Task-" + 
str(task_info.task_id)
-                if parent_info:
-                    selected_parent, parent_count = parent_info
-                    if parent_count > 1:
-                        task_name = f"{task_name} ({parent_count} parents)"
-                    frames.append(FrameInfo(("<task>", None, task_name, None)))
-                    current_id = selected_parent
-                else:
-                    # Root task - no parent
-                    frames.append(FrameInfo(("<task>", None, task_name, None)))
-                    current_id = None
-
-            # Yield the complete stack if we collected any frames
-            if frames and thread_id is not None:
-                yield frames, thread_id, leaf_id
-
     def _is_gc_frame(self, frame):
         if isinstance(frame, tuple):
             funcname = frame[2] if len(frame) >= 3 else ""
diff --git a/Lib/profiling/sampling/dump.py b/Lib/profiling/sampling/dump.py
new file mode 100644
index 00000000000000..7cf43bd29ebffb
--- /dev/null
+++ b/Lib/profiling/sampling/dump.py
@@ -0,0 +1,295 @@
+"""Pretty printing for one-shot sampling stack dumps."""
+
+import contextlib
+import linecache
+import os
+import sys
+from traceback import _byte_offset_to_character_offset
+
+import _colorize
+
+from .collector import extract_lineno, filter_internal_frames, 
iter_async_frames
+from .constants import (
+    THREAD_STATUS_GIL_REQUESTED,
+    THREAD_STATUS_HAS_EXCEPTION,
+    THREAD_STATUS_HAS_GIL,
+    THREAD_STATUS_MAIN_THREAD,
+    THREAD_STATUS_ON_CPU,
+    THREAD_STATUS_UNKNOWN,
+)
+from .opcode_utils import format_opcode
+
+
+_STATUS_LABELS = (
+    (THREAD_STATUS_MAIN_THREAD, "main thread"),
+    (THREAD_STATUS_HAS_GIL, "has GIL"),
+    (THREAD_STATUS_ON_CPU, "on CPU"),
+    (THREAD_STATUS_GIL_REQUESTED, "waiting for GIL"),
+    (THREAD_STATUS_HAS_EXCEPTION, "has exception"),
+)
+
+
+def _theme_for(file, colorize):
+    if colorize is True:
+        return _colorize.get_theme(force_color=True).profiler_dump
+    if colorize is False:
+        return _colorize.get_theme(force_no_color=True).profiler_dump
+    return _colorize.get_theme(tty_file=file).profiler_dump
+
+
+def _color(text, color, theme):
+    if not color:
+        return text
+    return f"{color}{text}{theme.reset}"
+
+
+def _frame_fields(frame):
+    if isinstance(frame, tuple):
+        filename = frame[0] if len(frame) > 0 else ""
+        location = frame[1] if len(frame) > 1 else None
+        qualname = frame[2] if len(frame) > 2 else "<unknown>"
+        opcode = frame[3] if len(frame) > 3 else None
+    else:
+        filename = getattr(frame, "filename", "")
+        location = getattr(frame, "location", None)
+        qualname = getattr(frame, "qualname", None)
+        if qualname is None:
+            qualname = getattr(frame, "funcname", "<unknown>")
+        opcode = getattr(frame, "opcode", None)
+    return filename, location, qualname, opcode
+
+
+def _location_field(location, index, default=None):
+    if location is None:
+        return default
+    try:
+        value = location[index]
+    except (IndexError, TypeError):
+        return default
+    return default if value is None else value
+
+
+def _status_text(status):
+    labels = [label for flag, label in _STATUS_LABELS if status & flag]
+    has_state = status & (
+        THREAD_STATUS_HAS_GIL
+        | THREAD_STATUS_GIL_REQUESTED
+        | THREAD_STATUS_HAS_EXCEPTION
+    )
+    if not has_state and not status & (THREAD_STATUS_UNKNOWN | 
THREAD_STATUS_ON_CPU):
+        labels.append("idle")
+    return ", ".join(labels) if labels else None
+
+
+def _is_async_dump(stack_frames):
+    return bool(stack_frames) and hasattr(stack_frames[0], "awaited_by")
+
+
+def _iter_dump_sections(stack_frames):
+    if not stack_frames:
+        return
+
+    if _is_async_dump(stack_frames):
+        for frames, thread_id, leaf_id in iter_async_frames(stack_frames):
+            frames = filter_internal_frames(frames)
+            if frames:
+                yield None, thread_id, None, frames, f"task {leaf_id}"
+        return
+
+    for interpreter_info in stack_frames:
+        interpreter_id = getattr(interpreter_info, "interpreter_id", None)
+        for thread_info in getattr(interpreter_info, "threads", ()):
+            frames = getattr(thread_info, "frame_info", None) or []
+            frames = filter_internal_frames(frames)
+            yield (
+                interpreter_id,
+                getattr(thread_info, "thread_id", None),
+                getattr(thread_info, "status", None),
+                frames,
+                None,
+            )
+
+
+def _display_filename(filename):
+    if not filename or filename == "~":
+        return filename
+    with contextlib.suppress(ValueError):
+        relpath = os.path.relpath(filename)
+        if not relpath.startswith(".." + os.sep) and relpath != "..":
+            return relpath
+    return filename
+
+
+def _format_frame(frame, theme):
+    filename, location, qualname, opcode = _frame_fields(frame)
+    source_filename = filename
+    lineno = extract_lineno(location)
+    qualname_part = _color(qualname, theme.frame, theme)
+
+    if filename == "~" and lineno == 0:
+        line = f"  {qualname_part}"
+    else:
+        filename = _display_filename(filename)
+        if filename:
+            file_part = _color(f'"{filename}"', theme.filename, theme)
+            if lineno > 0:
+                line_part = _color(str(lineno), theme.line_no, theme)
+                line = f"  File {file_part}, line {line_part}, in 
{qualname_part}"
+            else:
+                line = f"  File {file_part}, in {qualname_part}"
+        else:
+            line = f"  {qualname_part}"
+
+    if opcode is not None:
+        line = f"{line}  {_color(f'opcode={format_opcode(opcode)}', 
theme.opcode, theme)}"
+
+    lines = [line]
+    source = _source_line(source_filename, location, lineno, theme)
+    if source:
+        lines.append(f"    {source}")
+    return lines
+
+
+def _source_offsets(line, location, lineno):
+    end_lineno = _location_field(location, 1, lineno)
+    col_offset = _location_field(location, 2, -1)
+    end_col_offset = _location_field(location, 3, -1)
+
+    if col_offset < 0 or end_col_offset < 0 or end_lineno < lineno:
+        return None
+
+    start = _byte_offset_to_character_offset(line, col_offset)
+    if end_lineno == lineno:
+        end = _byte_offset_to_character_offset(line, end_col_offset)
+    else:
+        end = len(line)
+    if start < 0 or end <= start:
+        return None
+    return start, end
+
+
+def _trim_source_line(line, offsets):
+    stripped = line.lstrip()
+    leading = len(line) - len(stripped)
+    if offsets is None:
+        return stripped, None
+
+    start, end = offsets
+    start = max(start - leading, 0)
+    end = max(end - leading, start + 1)
+    end = min(end, len(stripped))
+    return stripped, (start, end)
+
+
+def _highlight_source_line(line, offsets, theme):
+    if offsets is None or offsets[1] <= offsets[0]:
+        return _color(line, theme.source, theme)
+
+    start, end = offsets
+    parts = []
+    if line[:start]:
+        parts.append(_color(line[:start], theme.source, theme))
+    parts.append(_color(line[start:end], theme.source_highlight, theme))
+    if line[end:]:
+        parts.append(_color(line[end:], theme.source, theme))
+    return "".join(parts)
+
+
+def _source_line(filename, location, lineno, theme):
+    if not filename or filename == "~" or lineno <= 0:
+        return None
+    line = linecache.getline(filename, lineno).removesuffix("\n")
+    if not line:
+        return None
+
+    offsets = _source_offsets(line, location, lineno)
+    line, offsets = _trim_source_line(line, offsets)
+    if not line:
+        return None
+    return _highlight_source_line(line, offsets, theme)
+
+
+def _section_header(
+    *,
+    pid,
+    interpreter_id,
+    thread_id,
+    status,
+    label,
+    show_pid,
+    show_interpreter,
+    theme,
+):
+    subject = "Stack dump"
+    if show_pid and pid is not None:
+        subject = f"{subject} for PID {pid}"
+    if thread_id is not None:
+        subject = f"{subject}, thread {thread_id}"
+
+    details = []
+    if show_interpreter and interpreter_id is not None:
+        details.append(f"interpreter {interpreter_id}")
+    if label:
+        details.append(label)
+    if status is not None:
+        status_text = _status_text(status)
+        if status_text:
+            details.append(status_text)
+
+    suffix = "most recent call last"
+    if details:
+        suffix = f"{'; '.join(details)}; {suffix}"
+    return _color(f"{subject} ({suffix}):", theme.header, theme)
+
+
+def format_stack_dump(stack_frames, *, pid=None, file=None, colorize=None):
+    """Return a formatted one-shot stack dump."""
+    if file is None:
+        file = sys.stdout
+
+    theme = _theme_for(file, colorize)
+    lines = []
+    sections = list(_iter_dump_sections(stack_frames))
+    if not sections:
+        if pid is None:
+            return f"{_color('No Python stacks found', theme.warning, 
theme)}\n"
+        return f"{_color(f'No Python stacks found for PID {pid}', 
theme.warning, theme)}\n"
+
+    interpreter_ids = {
+        interpreter_id
+        for interpreter_id, _thread_id, _status, _frames, _label in sections
+        if interpreter_id is not None
+    }
+    show_interpreter = len(interpreter_ids) > 1
+
+    for section_index, (interpreter_id, thread_id, status, frames, label) in 
enumerate(sections):
+        if section_index:
+            lines.append("")
+        lines.append(
+            _section_header(
+                pid=pid,
+                interpreter_id=interpreter_id,
+                thread_id=thread_id,
+                status=status,
+                label=label,
+                show_pid=section_index == 0,
+                show_interpreter=show_interpreter,
+                theme=theme,
+            )
+        )
+
+        if not frames:
+            lines.append(_color("No Python frames", theme.warning, theme))
+            continue
+
+        for frame in reversed(frames):
+            lines.extend(_format_frame(frame, theme))
+
+    return "\n".join(lines) + "\n"
+
+
+def print_stack_dump(stack_frames, *, pid=None, file=None, colorize=None):
+    """Pretty-print a one-shot stack dump."""
+    if file is None:
+        file = sys.stdout
+    file.write(format_stack_dump(stack_frames, pid=pid, file=file, 
colorize=colorize))
diff --git a/Lib/profiling/sampling/sample.py b/Lib/profiling/sampling/sample.py
index 9195f5ee6dd390..9e315c080c353d 100644
--- a/Lib/profiling/sampling/sample.py
+++ b/Lib/profiling/sampling/sample.py
@@ -87,6 +87,18 @@ def _new_unwinder(self, native, gc, opcodes, 
skip_non_matching_threads):
             **kwargs
         )
 
+    def _get_stack_trace(self, async_aware=None):
+        with _pause_threads(self.unwinder, self.blocking):
+            if async_aware == "all":
+                return self.unwinder.get_all_awaited_by()
+            if async_aware == "running":
+                return self.unwinder.get_async_stack_trace()
+            return self.unwinder.get_stack_trace()
+
+    def dump_stack(self, *, async_aware=None):
+        """Return a single stack snapshot from the target process."""
+        return self._get_stack_trace(async_aware=async_aware)
+
     def sample(self, collector, duration_sec=None, *, async_aware=False):
         sample_interval_sec = self.sample_interval_usec / 1_000_000
         num_samples = 0
@@ -110,14 +122,10 @@ def sample(self, collector, duration_sec=None, *, 
async_aware=False):
                         time.sleep(sleep_time)
                 elif next_time < current_time:
                     try:
-                        with _pause_threads(self.unwinder, self.blocking):
-                            if async_aware == "all":
-                                stack_frames = 
self.unwinder.get_all_awaited_by()
-                            elif async_aware == "running":
-                                stack_frames = 
self.unwinder.get_async_stack_trace()
-                            else:
-                                stack_frames = self.unwinder.get_stack_trace()
-                            collector.collect(stack_frames)
+                        stack_frames = self._get_stack_trace(
+                            async_aware=async_aware
+                        )
+                        collector.collect(stack_frames)
                     except ProcessLookupError as e:
                         running_time_sec = current_time - start_time
                         break
@@ -440,6 +448,37 @@ def sample(
     return collector
 
 
+def dump_stack(
+    pid,
+    *,
+    all_threads=False,
+    mode=PROFILING_MODE_ALL,
+    async_aware=None,
+    native=False,
+    gc=True,
+    opcodes=False,
+    blocking=False,
+):
+    """Return a single stack snapshot from a process."""
+    if mode == PROFILING_MODE_ALL:
+        skip_non_matching_threads = False
+    else:
+        skip_non_matching_threads = True
+
+    profiler = SampleProfiler(
+        pid,
+        sample_interval_usec=1,
+        all_threads=all_threads,
+        mode=mode,
+        native=native,
+        gc=gc,
+        opcodes=opcodes,
+        skip_non_matching_threads=skip_non_matching_threads,
+        blocking=blocking,
+    )
+    return profiler.dump_stack(async_aware=async_aware)
+
+
 def sample_live(
     pid,
     collector,
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_cli.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_cli.py
index 6667953f29abac..c522c50d1fd5fa 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_cli.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_cli.py
@@ -15,9 +15,14 @@
         "Test only runs when _remote_debugging is available"
     )
 
-from test.support import is_emscripten, requires_remote_subprocess_debugging
+from test.support import (
+    force_not_colorized,
+    is_emscripten,
+    requires_remote_subprocess_debugging,
+)
 
 from profiling.sampling.cli import main
+from profiling.sampling.constants import PROFILING_MODE_ALL, 
PROFILING_MODE_WALL
 from profiling.sampling.errors import SamplingScriptNotFoundError, 
SamplingModuleNotFoundError, SamplingUnknownProcessError
 
 class TestSampleProfilerCLI(unittest.TestCase):
@@ -524,6 +529,154 @@ def test_argument_parsing_basic(self):
 
             mock_sample.assert_called_once()
 
+    def _run_dump_cli(
+        self,
+        *cli_args,
+        dump_return=None,
+        dump_side_effect=None,
+        process_running=True,
+    ):
+        """Run main() for a `dump` invocation, returning (mock_dump, 
mock_print)."""
+        argv = ["profiling.sampling.cli", "dump", *cli_args]
+        dump_kwargs = {}
+        if dump_side_effect is not None:
+            dump_kwargs["side_effect"] = dump_side_effect
+        else:
+            dump_kwargs["return_value"] = [] if dump_return is None else 
dump_return
+        with (
+            mock.patch("sys.argv", argv),
+            mock.patch(
+                "profiling.sampling.cli._is_process_running",
+                return_value=process_running,
+            ),
+            mock.patch("profiling.sampling.cli.dump_stack", **dump_kwargs) as 
mock_dump_stack,
+            mock.patch("profiling.sampling.cli.print_stack_dump") as 
mock_print_stack_dump,
+        ):
+            main()
+        return mock_dump_stack, mock_print_stack_dump
+
+    def _run_dump_cli_expecting_exit(self, *cli_args, capture="stderr"):
+        """Run main() for a `dump` invocation expected to SystemExit, return 
(cm, captured_text)."""
+        argv = ["profiling.sampling.cli", "dump", *cli_args]
+        buf = io.StringIO()
+        stream = "sys.stderr" if capture == "stderr" else "sys.stdout"
+        with (
+            mock.patch("sys.argv", argv),
+            mock.patch(stream, buf),
+            self.assertRaises(SystemExit) as cm,
+        ):
+            main()
+        return cm, buf.getvalue()
+
+    def test_cli_dump_subcommand(self):
+        stack_frames = [mock.sentinel.stack_frames]
+        mock_dump_stack, mock_print_stack_dump = self._run_dump_cli(
+            "12345", dump_return=stack_frames
+        )
+
+        mock_dump_stack.assert_called_once()
+        self.assertEqual(mock_dump_stack.call_args.args, (12345,))
+        call_kwargs = mock_dump_stack.call_args.kwargs
+        self.assertFalse(call_kwargs["all_threads"])
+        self.assertIsNone(call_kwargs["async_aware"])
+        self.assertFalse(call_kwargs["native"])
+        self.assertTrue(call_kwargs["gc"])
+        self.assertFalse(call_kwargs["opcodes"])
+        self.assertFalse(call_kwargs["blocking"])
+        self.assertEqual(call_kwargs["mode"], PROFILING_MODE_ALL)
+        mock_print_stack_dump.assert_called_once_with(stack_frames, pid=12345)
+
+    def test_cli_dump_subcommand_options(self):
+        mock_dump_stack, _ = self._run_dump_cli(
+            "-a", "--native", "--no-gc", "--opcodes", "--blocking", "12345"
+        )
+
+        call_kwargs = mock_dump_stack.call_args.kwargs
+        self.assertTrue(call_kwargs["all_threads"])
+        self.assertTrue(call_kwargs["native"])
+        self.assertFalse(call_kwargs["gc"])
+        self.assertTrue(call_kwargs["opcodes"])
+        self.assertTrue(call_kwargs["blocking"])
+        self.assertEqual(call_kwargs["mode"], PROFILING_MODE_ALL)
+
+    def test_cli_dump_rejects_mode_option(self):
+        cm, stderr = self._run_dump_cli_expecting_exit("12345", "--mode", 
"cpu")
+        self.assertEqual(cm.exception.code, 2)
+        self.assertIn("unrecognized arguments: --mode", stderr)
+
+    def test_cli_dump_async_aware_defaults_to_all(self):
+        mock_dump_stack, _ = self._run_dump_cli("--async-aware", "12345")
+        call_kwargs = mock_dump_stack.call_args.kwargs
+        self.assertEqual(call_kwargs["async_aware"], "all")
+        self.assertEqual(call_kwargs["mode"], PROFILING_MODE_WALL)
+
+    def test_cli_dump_async_mode_all_is_forwarded(self):
+        mock_dump_stack, _ = self._run_dump_cli(
+            "--async-aware", "--async-mode", "all", "12345"
+        )
+        call_kwargs = mock_dump_stack.call_args.kwargs
+        self.assertEqual(call_kwargs["async_aware"], "all")
+        self.assertEqual(call_kwargs["mode"], PROFILING_MODE_WALL)
+
+    def test_cli_dump_async_mode_running_is_forwarded(self):
+        mock_dump_stack, _ = self._run_dump_cli(
+            "--async-aware", "--async-mode", "running", "12345"
+        )
+        call_kwargs = mock_dump_stack.call_args.kwargs
+        self.assertEqual(call_kwargs["async_aware"], "running")
+        self.assertEqual(call_kwargs["mode"], PROFILING_MODE_WALL)
+
+    def test_cli_dump_async_mode_requires_async_aware(self):
+        cm, stderr = self._run_dump_cli_expecting_exit(
+            "--async-mode", "running", "12345"
+        )
+        self.assertEqual(cm.exception.code, 2)
+        self.assertIn("--async-mode requires --async-aware", stderr)
+
+    def test_cli_dump_rejects_async_aware_with_all_threads(self):
+        cm, stderr = self._run_dump_cli_expecting_exit(
+            "--async-aware", "-a", "12345"
+        )
+        self.assertEqual(cm.exception.code, 2)
+        self.assertIn("--all-threads", stderr)
+        self.assertIn("incompatible with --async-aware", stderr)
+
+    def test_cli_dump_rejects_async_aware_with_native(self):
+        cm, stderr = self._run_dump_cli_expecting_exit(
+            "--async-aware", "--native", "12345"
+        )
+        self.assertEqual(cm.exception.code, 2)
+        self.assertIn("--native", stderr)
+        self.assertIn("incompatible with --async-aware", stderr)
+
+    def test_cli_dump_rejects_async_aware_with_no_gc(self):
+        cm, stderr = self._run_dump_cli_expecting_exit(
+            "--async-aware", "--no-gc", "12345"
+        )
+        self.assertEqual(cm.exception.code, 2)
+        self.assertIn("--no-gc", stderr)
+        self.assertIn("incompatible with --async-aware", stderr)
+
+    def test_cli_dump_unknown_process(self):
+        with self.assertRaises(SamplingUnknownProcessError):
+            self._run_dump_cli("12345", process_running=False)
+
+    def test_cli_dump_process_exits_before_snapshot(self):
+        with self.assertRaises(SystemExit) as cm:
+            self._run_dump_cli("12345", dump_side_effect=ProcessLookupError)
+        self.assertIn(
+            "No stack dump collected - process 12345 exited",
+            str(cm.exception.code),
+        )
+
+    @force_not_colorized
+    def test_cli_dump_help_lists_dump_options_without_mode(self):
+        cm, stdout = self._run_dump_cli_expecting_exit("--help", 
capture="stdout")
+        self.assertEqual(cm.exception.code, 0)
+        self.assertIn("--async-mode {running,all}", stdout)
+        self.assertIn("--opcodes", stdout)
+        self.assertNotIn("--mode {wall,cpu,gil,exception}", stdout)
+
     def test_sort_options(self):
         sort_options = [
             ("nsamples", 0),
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_dump.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_dump.py
new file mode 100644
index 00000000000000..7a0f8df38a8533
--- /dev/null
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_dump.py
@@ -0,0 +1,454 @@
+"""Tests for one-shot sampling profiler stack dumps."""
+
+from collections import namedtuple
+import os
+import opcode
+import tempfile
+import unittest
+
+import _colorize
+
+from profiling.sampling.constants import (
+    THREAD_STATUS_GIL_REQUESTED,
+    THREAD_STATUS_HAS_EXCEPTION,
+    THREAD_STATUS_HAS_GIL,
+    THREAD_STATUS_MAIN_THREAD,
+    THREAD_STATUS_ON_CPU,
+    THREAD_STATUS_UNKNOWN,
+)
+from profiling.sampling.dump import format_stack_dump
+
+from .mocks import (
+    LocationInfo as StructseqLocationInfo,
+    MockAwaitedInfo,
+    MockCoroInfo,
+    MockFrameInfo,
+    MockInterpreterInfo,
+    MockTaskInfo,
+    MockThreadInfo,
+)
+
+try:
+    import _remote_debugging  # noqa: F401
+except ImportError:
+    _remote_debugging = None
+
+
+StructseqInterpreterInfo = namedtuple(
+    "StructseqInterpreterInfo",
+    ["interpreter_id", "threads"],
+)
+StructseqThreadInfo = namedtuple(
+    "StructseqThreadInfo",
+    ["thread_id", "status", "frame_info"],
+)
+StructseqFrameInfo = namedtuple(
+    "StructseqFrameInfo",
+    ["filename", "location", "funcname", "opcode"],
+)
+
+
+class TestStackDumpFormatting(unittest.TestCase):
+    def test_format_stack_dump_single_thread(self):
+        frames = [
+            MockFrameInfo("leaf.py", 10, "leaf"),
+            MockFrameInfo("root.py", 1, "root"),
+        ]
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        frames,
+                        status=(
+                            THREAD_STATUS_MAIN_THREAD
+                            | THREAD_STATUS_HAS_GIL
+                            | THREAD_STATUS_ON_CPU
+                        ),
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 123 "
+            "(main thread, has GIL, on CPU; most recent call last):",
+            output,
+        )
+        self.assertNotIn("Interpreter 0", output)
+        self.assertLess(
+            output.index('File "root.py", line 1, in root'),
+            output.index('File "leaf.py", line 10, in leaf'),
+        )
+        self.assertIn('  File "root.py", line 1, in root', output)
+        self.assertIn('  File "leaf.py", line 10, in leaf', output)
+        self.assertNotIn("\x1b[", output)
+
+    def test_format_stack_dump_with_structseq_tuples(self):
+        stack_frames = [
+            StructseqInterpreterInfo(
+                0,
+                [
+                    StructseqThreadInfo(
+                        123,
+                        THREAD_STATUS_HAS_GIL,
+                        [
+                            StructseqFrameInfo(
+                                "file.py",
+                                StructseqLocationInfo(5, 5, -1, -1),
+                                "func",
+                                None,
+                            )
+                        ],
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 123 "
+            "(has GIL; most recent call last):",
+            output,
+        )
+        self.assertIn('  File "file.py", line 5, in func', output)
+
+    def test_format_stack_dump_shows_interpreter_ids_when_multiple(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [MockThreadInfo(100, [MockFrameInfo("main.py", 1, "main")])],
+            ),
+            MockInterpreterInfo(
+                1,
+                [MockThreadInfo(200, [MockFrameInfo("sub.py", 2, "sub")])],
+            ),
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 100 "
+            "(interpreter 0; idle; most recent call last):",
+            output,
+        )
+        self.assertIn(
+            "Stack dump, thread 200 "
+            "(interpreter 1; idle; most recent call last):",
+            output,
+        )
+
+    def test_format_stack_dump_omits_unknown_status(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [MockFrameInfo("file.py", 5, "func")],
+                        status=THREAD_STATUS_UNKNOWN,
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 123 (most recent call last):",
+            output,
+        )
+        self.assertNotIn("unknown", output)
+
+    def test_format_stack_dump_omits_unknown_when_other_status_exists(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [MockFrameInfo("file.py", 5, "func")],
+                        status=THREAD_STATUS_UNKNOWN | THREAD_STATUS_ON_CPU,
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 123 "
+            "(on CPU; most recent call last):",
+            output,
+        )
+        self.assertNotIn("unknown", output)
+
+    def test_format_stack_dump_labels_known_idle_status(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [MockFrameInfo("file.py", 5, "func")],
+                        status=0,
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 123 "
+            "(idle; most recent call last):",
+            output,
+        )
+
+    def 
test_format_stack_dump_status_does_not_add_idle_to_waiting_thread(self):
+        status = (
+            THREAD_STATUS_MAIN_THREAD
+            | THREAD_STATUS_GIL_REQUESTED
+            | THREAD_STATUS_HAS_EXCEPTION
+        )
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [MockThreadInfo(123, [MockFrameInfo("file.py", 5, "func")], 
status=status)],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 123 "
+            "(main thread, waiting for GIL, has exception; most recent call 
last):",
+            output,
+        )
+        self.assertNotIn("idle", output)
+
+    def test_format_stack_dump_formats_opcode_name(self):
+        load_const = opcode.opmap["LOAD_CONST"]
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [
+                            StructseqFrameInfo(
+                                "file.py",
+                                StructseqLocationInfo(5, 5, -1, -1),
+                                "func",
+                                load_const,
+                            )
+                        ],
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn("opcode=LOAD_CONST", output)
+        self.assertNotIn(f"opcode={load_const}", output)
+
+    def test_format_stack_dump_formats_unknown_opcode(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [
+                            StructseqFrameInfo(
+                                "file.py",
+                                StructseqLocationInfo(5, 5, -1, -1),
+                                "func",
+                                999,
+                            )
+                        ],
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn("opcode=<999>", output)
+
+    def test_format_stack_dump_prefers_qualname_attribute(self):
+        class Frame:
+            filename = "file.py"
+            location = StructseqLocationInfo(5, 5, -1, -1)
+            funcname = "inner"
+            qualname = "Outer.inner"
+            opcode = None
+
+        stack_frames = [
+            MockInterpreterInfo(0, [MockThreadInfo(123, [Frame()])])
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn('  File "file.py", line 5, in Outer.inner', output)
+        self.assertNotIn("in inner", output)
+
+    def test_format_stack_dump_filters_internal_frames(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [
+                            MockFrameInfo("user.py", 10, "user"),
+                            MockFrameInfo("_sync_coordinator.py", 20, 
"internal"),
+                        ],
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn('  File "user.py", line 10, in user', output)
+        self.assertNotIn("_sync_coordinator.py", output)
+
+    @unittest.skipIf(_remote_debugging is None, "requires _remote_debugging")
+    def test_format_stack_dump_async_task(self):
+        task = MockTaskInfo(
+            task_id=1,
+            task_name="Task-1",
+            coroutine_stack=[
+                MockCoroInfo(
+                    "Task-1",
+                    [MockFrameInfo("task.py", 5, "waiter")],
+                )
+            ],
+        )
+        stack_frames = [MockAwaitedInfo(thread_id=123, awaited_by=[task])]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn(
+            "Stack dump for PID 42, thread 123 "
+            "(task 1; most recent call last):",
+            output,
+        )
+        self.assertIn('  File "task.py", line 5, in waiter', output)
+
+    def test_format_stack_dump_strips_source_like_traceback(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [
+                            StructseqFrameInfo(
+                                __file__,
+                                StructseqLocationInfo(1, 1, -1, -1),
+                                "<module>",
+                                None,
+                            )
+                        ],
+                    )
+                ],
+            )
+        ]
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=False)
+
+        self.assertIn('  File "', output)
+        self.assertIn('", line 1, in <module>', output)
+        self.assertIn('"""Tests for one-shot sampling profiler stack 
dumps."""', output)
+        self.assertNotIn("^^^^^^^^", output)
+
+    def test_format_stack_dump_highlights_source_range(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        123,
+                        [
+                            StructseqFrameInfo(
+                                __file__,
+                                StructseqLocationInfo(1, 1, 0, 3),
+                                "<module>",
+                                None,
+                            )
+                        ],
+                    )
+                ],
+            )
+        ]
+        theme = _colorize.get_theme(force_color=True).profiler_dump
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=True)
+
+        self.assertIn(
+            f"{theme.source_highlight}\"\"\"{theme.reset}",
+            output,
+        )
+        self.assertNotIn("^^^^^^^^", _colorize.decolor(output))
+
+    def test_format_stack_dump_highlights_source_range_after_trimming(self):
+        with tempfile.TemporaryDirectory() as tmp_dir:
+            filename = os.path.join(tmp_dir, "target.py")
+            with open(filename, "w", encoding="utf-8") as file:
+                file.write("    result = call(arg)\n")
+            stack_frames = [
+                MockInterpreterInfo(
+                    0,
+                    [
+                        MockThreadInfo(
+                            123,
+                            [
+                                StructseqFrameInfo(
+                                    filename,
+                                    StructseqLocationInfo(1, 1, 13, 17),
+                                    "<module>",
+                                    None,
+                                )
+                            ],
+                        )
+                    ],
+                )
+            ]
+            theme = _colorize.get_theme(force_color=True).profiler_dump
+
+            output = format_stack_dump(stack_frames, pid=42, colorize=True)
+
+        self.assertIn(f"{theme.source_highlight}call{theme.reset}", output)
+        self.assertIn("\n    result = call(arg)\n", _colorize.decolor(output))
+        self.assertNotIn("\n        result = call(arg)\n", 
_colorize.decolor(output))
+
+    def test_format_stack_dump_empty(self):
+        output = format_stack_dump([], pid=42, colorize=False)
+
+        self.assertEqual("No Python stacks found for PID 42\n", output)
+
+    def test_format_stack_dump_colorized(self):
+        stack_frames = [
+            MockInterpreterInfo(
+                0,
+                [MockThreadInfo(123, [MockFrameInfo("file.py", 5, "func")])],
+            )
+        ]
+        theme = _colorize.get_theme(force_color=True).profiler_dump
+
+        output = format_stack_dump(stack_frames, pid=42, colorize=True)
+
+        self.assertIn(theme.header, output)
+        self.assertIn(theme.filename, output)
+        self.assertIn(theme.reset, output)
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
index 8d70a1d2ef8cfc..68bc59a5414a05 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
@@ -1,13 +1,16 @@
 """Tests for sampling profiler core functionality."""
 
+import contextlib
 import io
 import re
+import types
 from unittest import mock
 import unittest
 
 try:
     import _remote_debugging  # noqa: F401
-    from profiling.sampling.sample import SampleProfiler
+    from profiling.sampling.constants import PROFILING_MODE_ALL, 
PROFILING_MODE_WALL
+    from profiling.sampling.sample import SampleProfiler, dump_stack
     from profiling.sampling.pstats_collector import PstatsCollector
 except ImportError:
     raise unittest.SkipTest(
@@ -63,6 +66,92 @@ def test_sample_profiler_initialization(self):
             self.assertEqual(profiler.sample_interval_usec, 5000)
             self.assertEqual(profiler.all_threads, True)
 
+    @staticmethod
+    @contextlib.contextmanager
+    def _patched_unwinder():
+        """Yield a namespace exposing the mock unwinder ``instance`` and 
``cls``."""
+        instance = mock.MagicMock()
+        with mock.patch(
+            "_remote_debugging.RemoteUnwinder", return_value=instance
+        ) as cls:
+            yield types.SimpleNamespace(instance=instance, cls=cls)
+
+    def test_dump_stack_uses_single_unwinder_snapshot(self):
+        stack_frames = [mock.sentinel.stack_frames]
+        with self._patched_unwinder() as u:
+            u.instance.get_stack_trace.return_value = stack_frames
+            result = dump_stack(12345)
+
+        self.assertIs(result, stack_frames)
+        self.assertEqual(u.cls.call_args.kwargs["mode"], PROFILING_MODE_ALL)
+        u.instance.get_stack_trace.assert_called_once_with()
+
+    def test_dump_stack_returns_empty_async_snapshot(self):
+        with self._patched_unwinder() as u:
+            u.instance.get_async_stack_trace.return_value = []
+            result = dump_stack(12345, async_aware="running")
+
+        self.assertEqual(result, [])
+        u.instance.get_async_stack_trace.assert_called_once_with()
+        u.instance.get_stack_trace.assert_not_called()
+
+    def test_dump_stack_async_all_uses_all_awaited_by(self):
+        stack_frames = [mock.sentinel.awaited_info]
+        with self._patched_unwinder() as u:
+            u.instance.get_all_awaited_by.return_value = stack_frames
+            result = dump_stack(12345, async_aware="all")
+
+        self.assertIs(result, stack_frames)
+        u.instance.get_all_awaited_by.assert_called_once_with()
+        u.instance.get_stack_trace.assert_not_called()
+        u.instance.get_async_stack_trace.assert_not_called()
+
+    def test_dump_stack_passes_unwinder_options(self):
+        with self._patched_unwinder() as u:
+            u.instance.get_stack_trace.return_value = []
+            dump_stack(
+                12345,
+                all_threads=True,
+                native=True,
+                gc=False,
+                opcodes=True,
+            )
+
+        call_kwargs = u.cls.call_args.kwargs
+        self.assertTrue(call_kwargs["all_threads"])
+        self.assertEqual(call_kwargs["mode"], PROFILING_MODE_ALL)
+        self.assertTrue(call_kwargs["native"])
+        self.assertFalse(call_kwargs["gc"])
+        self.assertTrue(call_kwargs["opcodes"])
+        self.assertFalse(call_kwargs["skip_non_matching_threads"])
+        self.assertTrue(call_kwargs["cache_frames"])
+
+    def test_dump_stack_wall_mode_skips_non_matching_threads(self):
+        with self._patched_unwinder() as u:
+            u.instance.get_stack_trace.return_value = []
+            dump_stack(12345, mode=PROFILING_MODE_WALL)
+
+        self.assertTrue(u.cls.call_args.kwargs["skip_non_matching_threads"])
+
+    def test_dump_stack_blocking_pauses_and_resumes_threads(self):
+        stack_frames = [mock.sentinel.stack_frames]
+        with self._patched_unwinder() as u:
+            u.instance.get_stack_trace.return_value = stack_frames
+            result = dump_stack(12345, blocking=True)
+
+        self.assertIs(result, stack_frames)
+        u.instance.pause_threads.assert_called_once_with()
+        u.instance.resume_threads.assert_called_once_with()
+
+    def test_dump_stack_blocking_resumes_threads_after_failure(self):
+        with self._patched_unwinder() as u:
+            u.instance.get_stack_trace.side_effect = RuntimeError("boom")
+            with self.assertRaises(RuntimeError):
+                dump_stack(12345, blocking=True)
+
+        u.instance.pause_threads.assert_called_once_with()
+        u.instance.resume_threads.assert_called_once_with()
+
     def test_sample_profiler_sample_method_timing(self):
         """Test that the sample method respects duration and handles timing 
correctly."""
 
diff --git 
a/Misc/NEWS.d/next/Library/2026-05-02-19-09-04.gh-issue-149296.DuKF0j.rst 
b/Misc/NEWS.d/next/Library/2026-05-02-19-09-04.gh-issue-149296.DuKF0j.rst
new file mode 100644
index 00000000000000..f238693720696b
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2026-05-02-19-09-04.gh-issue-149296.DuKF0j.rst
@@ -0,0 +1,4 @@
+Add a ``dump`` subcommand to :mod:`profiling.sampling` that prints a single
+traceback-style snapshot of a running process's Python stack, including
+per-thread status, source line highlighting, optional bytecode opcode names,
+and async-aware task reconstruction. Patch by Pablo Galindo.

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]

Reply via email to