https://github.com/python/cpython/commit/6b9a6c6ec3bbc9795df67b87340e2ea58f42b3d4
commit: 6b9a6c6ec3bbc9795df67b87340e2ea58f42b3d4
branch: main
author: Pablo Galindo Salgado <[email protected]>
committer: pablogsal <[email protected]>
date: 2026-01-02T02:31:39Z
summary:
gh-138122: Move local imports to module level in sampling profiler (#143257)
files:
M Lib/profiling/sampling/binary_reader.py
M Lib/profiling/sampling/cli.py
M Lib/profiling/sampling/heatmap_collector.py
M Lib/profiling/sampling/live_collector/collector.py
M Lib/profiling/sampling/live_collector/widgets.py
M Lib/profiling/sampling/pstats_collector.py
M Lib/profiling/sampling/stack_collector.py
M Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
M Lib/test/test_profiling/test_sampling_profiler/test_async.py
M Lib/test/test_profiling/test_sampling_profiler/test_children.py
M Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
M Lib/test/test_profiling/test_sampling_profiler/test_integration.py
M
Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
M Lib/test/test_profiling/test_sampling_profiler/test_modes.py
M Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
diff --git a/Lib/profiling/sampling/binary_reader.py
b/Lib/profiling/sampling/binary_reader.py
index 50c96668cc585b..a11be3652597a6 100644
--- a/Lib/profiling/sampling/binary_reader.py
+++ b/Lib/profiling/sampling/binary_reader.py
@@ -1,5 +1,11 @@
"""Thin Python wrapper around C binary reader for profiling data."""
+import _remote_debugging
+
+from .gecko_collector import GeckoCollector
+from .stack_collector import FlamegraphCollector, CollapsedStackCollector
+from .pstats_collector import PstatsCollector
+
class BinaryReader:
"""High-performance binary reader using C implementation.
@@ -23,7 +29,6 @@ def __init__(self, filename):
self._reader = None
def __enter__(self):
- import _remote_debugging
self._reader = _remote_debugging.BinaryReader(self.filename)
return self
@@ -99,10 +104,6 @@ def convert_binary_to_format(input_file, output_file,
output_format,
Returns:
int: Number of samples converted
"""
- from .gecko_collector import GeckoCollector
- from .stack_collector import FlamegraphCollector, CollapsedStackCollector
- from .pstats_collector import PStatsCollector
-
with BinaryReader(input_file) as reader:
info = reader.get_info()
interval = sample_interval_usec or info['sample_interval_us']
@@ -113,7 +114,7 @@ def convert_binary_to_format(input_file, output_file,
output_format,
elif output_format == 'collapsed':
collector = CollapsedStackCollector(interval)
elif output_format == 'pstats':
- collector = PStatsCollector(interval)
+ collector = PstatsCollector(interval)
elif output_format == 'gecko':
collector = GeckoCollector(interval)
else:
diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py
index ea3926c9565809..c0dcda46fc29d3 100644
--- a/Lib/profiling/sampling/cli.py
+++ b/Lib/profiling/sampling/cli.py
@@ -36,6 +36,12 @@
SORT_MODE_NSAMPLES_CUMUL,
)
+try:
+ from ._child_monitor import ChildProcessMonitor
+except ImportError:
+ # _remote_debugging module not available on this platform (e.g., WASI)
+ ChildProcessMonitor = None
+
try:
from .live_collector import LiveStatsCollector
except ImportError:
@@ -94,8 +100,6 @@ class CustomFormatter(
}
def _setup_child_monitor(args, parent_pid):
- from ._child_monitor import ChildProcessMonitor
-
# Build CLI args for child profilers (excluding --subprocesses to avoid
recursion)
child_cli_args = _build_child_profiler_args(args)
@@ -691,6 +695,11 @@ def _validate_args(args, parser):
# --subprocesses is incompatible with --live
if hasattr(args, 'subprocesses') and args.subprocesses:
+ if ChildProcessMonitor is None:
+ parser.error(
+ "--subprocesses is not available on this platform "
+ "(requires _remote_debugging module)."
+ )
if hasattr(args, 'live') and args.live:
parser.error("--subprocesses is incompatible with --live mode.")
@@ -1160,8 +1169,6 @@ def _handle_live_run(args):
def _handle_replay(args):
"""Handle the 'replay' command - convert binary profile to another
format."""
- import os
-
if not os.path.exists(args.input_file):
sys.exit(f"Error: Input file not found: {args.input_file}")
diff --git a/Lib/profiling/sampling/heatmap_collector.py
b/Lib/profiling/sampling/heatmap_collector.py
index 022e94d014f9b7..b6d9ff79e8ceec 100644
--- a/Lib/profiling/sampling/heatmap_collector.py
+++ b/Lib/profiling/sampling/heatmap_collector.py
@@ -18,6 +18,7 @@
from ._css_utils import get_combined_css
from ._format_utils import fmt
from .collector import normalize_location, extract_lineno
+from .opcode_utils import get_opcode_info, format_opcode
from .stack_collector import StackTraceCollector
@@ -642,8 +643,6 @@ def _get_bytecode_data_for_line(self, filename, lineno):
Returns:
List of dicts with instruction info, sorted by samples descending
"""
- from .opcode_utils import get_opcode_info, format_opcode
-
key = (filename, lineno)
opcode_data = self.line_opcodes.get(key, {})
@@ -1046,8 +1045,6 @@ def _render_source_with_highlights(self, line_content:
str, line_num: int,
Simple: collect ranges with sample counts, assign each byte position to
smallest covering range, then emit spans for contiguous runs with
sample data.
"""
- import html as html_module
-
content = line_content.rstrip('\n')
if not content:
return ''
@@ -1070,7 +1067,7 @@ def _render_source_with_highlights(self, line_content:
str, line_num: int,
range_data[key]['opcodes'].append(opname)
if not range_data:
- return html_module.escape(content)
+ return html.escape(content)
# For each byte position, find the smallest covering range
byte_to_range = {}
@@ -1098,7 +1095,7 @@ def _render_source_with_highlights(self, line_content:
str, line_num: int,
def flush_span():
nonlocal span_chars, current_range
if span_chars:
- text = html_module.escape(''.join(span_chars))
+ text = html.escape(''.join(span_chars))
if current_range:
data = range_data.get(current_range, {'samples': 0,
'opcodes': []})
samples = data['samples']
@@ -1112,7 +1109,7 @@ def flush_span():
f'data-samples="{samples}" '
f'data-max-samples="{max_range_samples}" '
f'data-pct="{pct}" '
-
f'data-opcodes="{html_module.escape(opcodes)}">{text}</span>')
+
f'data-opcodes="{html.escape(opcodes)}">{text}</span>')
else:
result.append(text)
span_chars = []
diff --git a/Lib/profiling/sampling/live_collector/collector.py
b/Lib/profiling/sampling/live_collector/collector.py
index c91ed9e0ea9367..c03df4075277cd 100644
--- a/Lib/profiling/sampling/live_collector/collector.py
+++ b/Lib/profiling/sampling/live_collector/collector.py
@@ -916,8 +916,6 @@ def _show_terminal_size_warning_and_wait(self, height,
width):
def _handle_input(self):
"""Handle keyboard input (non-blocking)."""
- from . import constants
-
self.display.set_nodelay(True)
ch = self.display.get_input()
diff --git a/Lib/profiling/sampling/live_collector/widgets.py
b/Lib/profiling/sampling/live_collector/widgets.py
index ac215dbfeb896e..86d2649f875e62 100644
--- a/Lib/profiling/sampling/live_collector/widgets.py
+++ b/Lib/profiling/sampling/live_collector/widgets.py
@@ -31,6 +31,7 @@
PROFILING_MODE_GIL,
PROFILING_MODE_WALL,
)
+from ..opcode_utils import get_opcode_info, format_opcode
class Widget(ABC):
@@ -1013,8 +1014,6 @@ def render(self, line, width, **kwargs):
Returns:
Next available line number
"""
- from ..opcode_utils import get_opcode_info, format_opcode
-
stats_list = kwargs.get("stats_list", [])
height = kwargs.get("height", 24)
selected_row = self.collector.selected_row
diff --git a/Lib/profiling/sampling/pstats_collector.py
b/Lib/profiling/sampling/pstats_collector.py
index e0dc9ab6bb7edb..6be1d698ffaa9a 100644
--- a/Lib/profiling/sampling/pstats_collector.py
+++ b/Lib/profiling/sampling/pstats_collector.py
@@ -1,9 +1,10 @@
import collections
import marshal
+import pstats
from _colorize import ANSIColors
from .collector import Collector, extract_lineno
-from .constants import MICROSECONDS_PER_SECOND
+from .constants import MICROSECONDS_PER_SECOND, PROFILING_MODE_CPU
class PstatsCollector(Collector):
@@ -86,9 +87,6 @@ def create_stats(self):
def print_stats(self, sort=-1, limit=None, show_summary=True, mode=None):
"""Print formatted statistics to stdout."""
- import pstats
- from .constants import PROFILING_MODE_CPU
-
# Create stats object
stats = pstats.SampledStats(self).strip_dirs()
if not stats.stats:
diff --git a/Lib/profiling/sampling/stack_collector.py
b/Lib/profiling/sampling/stack_collector.py
index 55e643d0e9c8cb..4e213cfe41ca24 100644
--- a/Lib/profiling/sampling/stack_collector.py
+++ b/Lib/profiling/sampling/stack_collector.py
@@ -6,6 +6,7 @@
import linecache
import os
import sys
+import sysconfig
from ._css_utils import get_combined_css
from .collector import Collector, extract_lineno
@@ -244,7 +245,6 @@ def convert_children(children, min_samples):
}
# Calculate thread status percentages for display
- import sysconfig
is_free_threaded = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
total_threads = max(1, self.thread_status_counts["total"])
thread_stats = {
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
index bcd4de7f5d7ebe..11b1ad84242fd4 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
@@ -11,6 +11,8 @@
import _remote_debugging # noqa: F401
import profiling.sampling
import profiling.sampling.sample
+ from profiling.sampling.pstats_collector import PstatsCollector
+ from profiling.sampling.stack_collector import CollapsedStackCollector
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
@@ -61,7 +63,6 @@ def test_gc_frames_enabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000,
skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
@@ -88,7 +89,6 @@ def test_gc_frames_disabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000,
skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
@@ -140,7 +140,6 @@ def test_native_frames_enabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.stack_collector import
CollapsedStackCollector
collector = CollapsedStackCollector(1000, skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
@@ -176,7 +175,6 @@ def test_native_frames_disabled(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.pstats_collector import PstatsCollector
collector = PstatsCollector(sample_interval_usec=5000,
skip_idle=False)
profiling.sampling.sample.sample(
subproc.process.pid,
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_async.py
b/Lib/test/test_profiling/test_sampling_profiler/test_async.py
index d8ca86c996bffa..1f5685717b6273 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_async.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_async.py
@@ -6,11 +6,14 @@
3. Stack traversal: _build_linear_stacks() with BFS
"""
+import inspect
import unittest
try:
import _remote_debugging # noqa: F401
from profiling.sampling.pstats_collector import PstatsCollector
+ from profiling.sampling.stack_collector import FlamegraphCollector
+ from profiling.sampling.sample import sample, sample_live, SampleProfiler
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
@@ -561,8 +564,6 @@ class TestFlamegraphCollectorAsync(unittest.TestCase):
def test_flamegraph_with_async_frames(self):
"""Test FlamegraphCollector correctly processes async task frames."""
- from profiling.sampling.stack_collector import FlamegraphCollector
-
collector = FlamegraphCollector(sample_interval_usec=1000)
# Build async task tree: Root -> Child
@@ -607,8 +608,6 @@ def test_flamegraph_with_async_frames(self):
def test_flamegraph_with_task_markers(self):
"""Test FlamegraphCollector includes <task> boundary markers."""
- from profiling.sampling.stack_collector import FlamegraphCollector
-
collector = FlamegraphCollector(sample_interval_usec=1000)
task = MockTaskInfo(
@@ -643,8 +642,6 @@ def find_task_marker(node, depth=0):
def test_flamegraph_multiple_async_samples(self):
"""Test FlamegraphCollector aggregates multiple async samples
correctly."""
- from profiling.sampling.stack_collector import FlamegraphCollector
-
collector = FlamegraphCollector(sample_interval_usec=1000)
task = MockTaskInfo(
@@ -675,25 +672,16 @@ class TestAsyncAwareParameterFlow(unittest.TestCase):
def test_sample_function_accepts_async_aware(self):
"""Test that sample() function accepts async_aware parameter."""
- from profiling.sampling.sample import sample
- import inspect
-
sig = inspect.signature(sample)
self.assertIn("async_aware", sig.parameters)
def test_sample_live_function_accepts_async_aware(self):
"""Test that sample_live() function accepts async_aware parameter."""
- from profiling.sampling.sample import sample_live
- import inspect
-
sig = inspect.signature(sample_live)
self.assertIn("async_aware", sig.parameters)
def test_sample_profiler_sample_accepts_async_aware(self):
"""Test that SampleProfiler.sample() accepts async_aware parameter."""
- from profiling.sampling.sample import SampleProfiler
- import inspect
-
sig = inspect.signature(SampleProfiler.sample)
self.assertIn("async_aware", sig.parameters)
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_children.py
b/Lib/test/test_profiling/test_sampling_profiler/test_children.py
index 84d50cd2088a9e..bb49faa890f348 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_children.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_children.py
@@ -10,6 +10,7 @@
import threading
import time
import unittest
+from unittest.mock import MagicMock, patch
from test.support import (
SHORT_TIMEOUT,
@@ -17,6 +18,40 @@
requires_remote_subprocess_debugging,
)
+# Guard imports that require _remote_debugging module.
+# This module is not available on all platforms (e.g., WASI).
+try:
+ from profiling.sampling._child_monitor import (
+ get_child_pids,
+ ChildProcessMonitor,
+ is_python_process,
+ _MAX_CHILD_PROFILERS,
+ _CLEANUP_INTERVAL_CYCLES,
+ )
+except ImportError:
+ # Module will be skipped via @requires_remote_subprocess_debugging
decorators
+ get_child_pids = None
+ ChildProcessMonitor = None
+ is_python_process = None
+ _MAX_CHILD_PROFILERS = None
+ _CLEANUP_INTERVAL_CYCLES = None
+
+try:
+ from profiling.sampling.cli import (
+ _add_sampling_options,
+ _validate_args,
+ _build_child_profiler_args,
+ _build_output_pattern,
+ _setup_child_monitor,
+ )
+except ImportError:
+ # cli module imports sample module which requires _remote_debugging
+ _add_sampling_options = None
+ _validate_args = None
+ _build_child_profiler_args = None
+ _build_output_pattern = None
+ _setup_child_monitor = None
+
from .helpers import _cleanup_process
# String to check for in stderr when profiler lacks permissions (e.g., macOS)
@@ -100,8 +135,6 @@ def test_get_child_pids_from_remote_debugging(self):
def test_get_child_pids_fallback(self):
"""Test the fallback implementation for get_child_pids."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Test with current process
result = get_child_pids(os.getpid())
self.assertIsInstance(result, list)
@@ -109,8 +142,6 @@ def test_get_child_pids_fallback(self):
@unittest.skipUnless(sys.platform == "linux", "Linux only")
def test_discover_child_process_linux(self):
"""Test that we can discover child processes on Linux."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Create a child process
proc = subprocess.Popen(
[sys.executable, "-c", "import time; time.sleep(10)"],
@@ -139,8 +170,6 @@ def test_discover_child_process_linux(self):
def test_recursive_child_discovery(self):
"""Test that recursive=True finds grandchildren."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Create a child that spawns a grandchild and keeps a reference to it
# so we can clean it up via the child process
code = """
@@ -256,8 +285,6 @@ def wait_for_signal():
def test_nonexistent_pid_returns_empty(self):
"""Test that nonexistent PID returns empty list."""
- from profiling.sampling._child_monitor import get_child_pids
-
# Use a very high PID that's unlikely to exist
result = get_child_pids(999999999)
self.assertEqual(result, [])
@@ -275,8 +302,6 @@ def tearDown(self):
def test_monitor_creation(self):
"""Test that ChildProcessMonitor can be created."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(),
cli_args=["-r", "10khz", "-d", "5"],
@@ -288,8 +313,6 @@ def test_monitor_creation(self):
def test_monitor_lifecycle(self):
"""Test monitor lifecycle via context manager."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
@@ -307,8 +330,6 @@ def test_monitor_lifecycle(self):
def test_spawned_profilers_property(self):
"""Test that spawned_profilers returns a copy of the list."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
@@ -320,8 +341,6 @@ def test_spawned_profilers_property(self):
def test_context_manager(self):
"""Test that ChildProcessMonitor works as a context manager."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
with ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
) as monitor:
@@ -344,8 +363,6 @@ def tearDown(self):
def test_subprocesses_flag_parsed(self):
"""Test that --subprocesses flag is recognized."""
- from profiling.sampling.cli import _add_sampling_options
-
parser = argparse.ArgumentParser()
_add_sampling_options(parser)
@@ -359,8 +376,6 @@ def test_subprocesses_flag_parsed(self):
def test_subprocesses_incompatible_with_live(self):
"""Test that --subprocesses is incompatible with --live."""
- from profiling.sampling.cli import _validate_args
-
# Create mock args with both subprocesses and live
args = argparse.Namespace(
subprocesses=True,
@@ -383,8 +398,6 @@ def test_subprocesses_incompatible_with_live(self):
def test_build_child_profiler_args(self):
"""Test building CLI args for child profilers."""
- from profiling.sampling.cli import _build_child_profiler_args
-
args = argparse.Namespace(
sample_interval_usec=200,
duration=15,
@@ -446,8 +459,6 @@ def assert_flag_value_pair(flag, value):
def test_build_child_profiler_args_no_gc(self):
"""Test building CLI args with --no-gc."""
- from profiling.sampling.cli import _build_child_profiler_args
-
args = argparse.Namespace(
sample_interval_usec=100,
duration=5,
@@ -471,8 +482,6 @@ def test_build_child_profiler_args_no_gc(self):
def test_build_output_pattern_with_outfile(self):
"""Test output pattern generation with user-specified output."""
- from profiling.sampling.cli import _build_output_pattern
-
# With extension
args = argparse.Namespace(outfile="output.html", format="flamegraph")
pattern = _build_output_pattern(args)
@@ -485,8 +494,6 @@ def test_build_output_pattern_with_outfile(self):
def test_build_output_pattern_default(self):
"""Test output pattern generation with default output."""
- from profiling.sampling.cli import _build_output_pattern
-
# Flamegraph format
args = argparse.Namespace(outfile=None, format="flamegraph")
pattern = _build_output_pattern(args)
@@ -512,8 +519,6 @@ def tearDown(self):
def test_setup_child_monitor(self):
"""Test setting up a child monitor from args."""
- from profiling.sampling.cli import _setup_child_monitor
-
args = argparse.Namespace(
sample_interval_usec=100,
duration=5,
@@ -553,8 +558,6 @@ def tearDown(self):
def test_is_python_process_current_process(self):
"""Test that current process is detected as Python."""
- from profiling.sampling._child_monitor import is_python_process
-
# Current process should be Python
result = is_python_process(os.getpid())
self.assertTrue(
@@ -564,8 +567,6 @@ def test_is_python_process_current_process(self):
def test_is_python_process_python_subprocess(self):
"""Test that a Python subprocess is detected as Python."""
- from profiling.sampling._child_monitor import is_python_process
-
# Start a Python subprocess
proc = subprocess.Popen(
[sys.executable, "-c", "import time; time.sleep(10)"],
@@ -598,8 +599,6 @@ def test_is_python_process_python_subprocess(self):
@unittest.skipUnless(sys.platform == "linux", "Linux only test")
def test_is_python_process_non_python_subprocess(self):
"""Test that a non-Python subprocess is not detected as Python."""
- from profiling.sampling._child_monitor import is_python_process
-
# Start a non-Python subprocess (sleep command)
proc = subprocess.Popen(
["sleep", "10"],
@@ -624,8 +623,6 @@ def test_is_python_process_non_python_subprocess(self):
def test_is_python_process_nonexistent_pid(self):
"""Test that nonexistent PID returns False."""
- from profiling.sampling._child_monitor import is_python_process
-
# Use a very high PID that's unlikely to exist
result = is_python_process(999999999)
self.assertFalse(
@@ -635,8 +632,6 @@ def test_is_python_process_nonexistent_pid(self):
def test_is_python_process_exited_process(self):
"""Test handling of a process that exits quickly."""
- from profiling.sampling._child_monitor import is_python_process
-
# Start a process that exits immediately
proc = subprocess.Popen(
[sys.executable, "-c", "pass"],
@@ -666,8 +661,6 @@ def tearDown(self):
def test_max_profilers_constant_exists(self):
"""Test that _MAX_CHILD_PROFILERS constant is defined."""
- from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS
-
self.assertEqual(
_MAX_CHILD_PROFILERS,
100,
@@ -676,8 +669,6 @@ def test_max_profilers_constant_exists(self):
def test_cleanup_interval_constant_exists(self):
"""Test that _CLEANUP_INTERVAL_CYCLES constant is defined."""
- from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES
-
self.assertEqual(
_CLEANUP_INTERVAL_CYCLES,
10,
@@ -686,12 +677,6 @@ def test_cleanup_interval_constant_exists(self):
def test_monitor_respects_max_limit(self):
"""Test that monitor refuses to spawn more than
_MAX_CHILD_PROFILERS."""
- from profiling.sampling._child_monitor import (
- ChildProcessMonitor,
- _MAX_CHILD_PROFILERS,
- )
- from unittest.mock import MagicMock, patch
-
# Create a monitor
monitor = ChildProcessMonitor(
pid=os.getpid(),
@@ -744,8 +729,6 @@ def tearDown(self):
def test_wait_for_profilers_empty_list(self):
"""Test that wait_for_profilers returns immediately with no
profilers."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
@@ -772,8 +755,6 @@ def test_wait_for_profilers_empty_list(self):
def test_wait_for_profilers_with_completed_process(self):
"""Test waiting for profilers that complete quickly."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
@@ -812,8 +793,6 @@ def test_wait_for_profilers_with_completed_process(self):
def test_wait_for_profilers_timeout(self):
"""Test that wait_for_profilers respects timeout."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
@@ -852,8 +831,6 @@ def test_wait_for_profilers_timeout(self):
def test_wait_for_profilers_multiple(self):
"""Test waiting for multiple profilers."""
- from profiling.sampling._child_monitor import ChildProcessMonitor
-
monitor = ChildProcessMonitor(
pid=os.getpid(), cli_args=[], output_pattern=None
)
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
index 30615a7d31d86c..13bdb4e111364c 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
@@ -2,6 +2,7 @@
import json
import marshal
+import opcode
import os
import tempfile
import unittest
@@ -1437,7 +1438,6 @@ class TestOpcodeFormatting(unittest.TestCase):
def test_get_opcode_info_standard_opcode(self):
"""Test get_opcode_info for a standard opcode."""
- import opcode
# LOAD_CONST is a standard opcode
load_const = opcode.opmap.get('LOAD_CONST')
if load_const is not None:
@@ -1455,7 +1455,6 @@ def test_get_opcode_info_unknown_opcode(self):
def test_format_opcode_standard(self):
"""Test format_opcode for a standard opcode."""
- import opcode
load_const = opcode.opmap.get('LOAD_CONST')
if load_const is not None:
formatted = format_opcode(load_const)
@@ -1463,7 +1462,6 @@ def test_format_opcode_standard(self):
def test_format_opcode_specialized(self):
"""Test format_opcode for a specialized opcode shows base in parens."""
- import opcode
if not hasattr(opcode, '_specialized_opmap'):
self.skipTest("No specialized opcodes in this Python version")
if not hasattr(opcode, '_specializations'):
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
index b82474858ddd4a..c6731e956391a9 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
@@ -18,6 +18,7 @@
from profiling.sampling.pstats_collector import PstatsCollector
from profiling.sampling.stack_collector import CollapsedStackCollector
from profiling.sampling.sample import SampleProfiler, _is_process_running
+ from profiling.sampling.cli import main
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
@@ -547,7 +548,6 @@ def test_sample_target_script(self):
io.StringIO() as captured_output,
mock.patch("sys.stdout", captured_output),
):
- from profiling.sampling.cli import main
main()
output = captured_output.getvalue()
@@ -585,7 +585,6 @@ def test_sample_target_module(self):
# Change to temp directory so subprocess can find the module
contextlib.chdir(tempdir.name),
):
- from profiling.sampling.cli import main
main()
output = captured_output.getvalue()
@@ -714,8 +713,7 @@ def test_live_incompatible_with_pstats_options(self):
test_args = ["profiling.sampling.cli", "run", "--live"] + args
+ ["test.py"]
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
def test_live_incompatible_with_multiple_pstats_options(self):
@@ -727,8 +725,7 @@ def
test_live_incompatible_with_multiple_pstats_options(self):
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
def test_live_incompatible_with_pstats_default_values(self):
@@ -738,8 +735,7 @@ def test_live_incompatible_with_pstats_default_values(self):
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
# Test with --limit=15 (the default value)
@@ -747,8 +743,7 @@ def test_live_incompatible_with_pstats_default_values(self):
with mock.patch("sys.argv", test_args):
with self.assertRaises(SystemExit) as cm:
- from profiling.sampling.cli import main
- main()
+ main()
self.assertNotEqual(cm.exception.code, 0)
diff --git
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
index 38f1d03e4939f1..8342faffb94762 100644
---
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
+++
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
@@ -383,10 +383,9 @@ def test_finished_state_footer_message(self):
def test_finished_state_freezes_time(self):
"""Test that time displays are frozen when finished."""
- import time as time_module
# Set up collector with known start time
- self.collector.start_time = time_module.perf_counter() - 10.0 # 10
seconds ago
+ self.collector.start_time = time.perf_counter() - 10.0 # 10 seconds
ago
# Mark as finished - this should freeze the time
self.collector.mark_finished()
@@ -396,7 +395,7 @@ def test_finished_state_freezes_time(self):
frozen_time_display = self.collector.current_time_display
# Wait a bit to ensure time would advance
- time_module.sleep(0.1)
+ time.sleep(0.1)
# Time should remain frozen
self.assertEqual(self.collector.elapsed_time, frozen_elapsed)
@@ -1215,7 +1214,6 @@ def test_reset_blocked_when_finished(self):
def test_time_display_fix_when_finished(self):
"""Test that time display shows correct frozen time when finished."""
- import time as time_module
# Mark as finished to freeze time
self.collector.mark_finished()
@@ -1228,7 +1226,7 @@ def test_time_display_fix_when_finished(self):
frozen_time = self.collector.current_time_display
# Wait a bit
- time_module.sleep(0.1)
+ time.sleep(0.1)
# Should still show the same frozen time (not jump to wrong time)
self.assertEqual(self.collector.current_time_display, frozen_time)
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py
b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py
index 877237866b1e65..0b38fb4ad4bcf6 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py
@@ -9,6 +9,12 @@
import profiling.sampling
import profiling.sampling.sample
from profiling.sampling.pstats_collector import PstatsCollector
+ from profiling.sampling.cli import main, _parse_mode
+ from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
+ from _remote_debugging import (
+ THREAD_STATUS_HAS_GIL,
+ THREAD_STATUS_ON_CPU,
+ )
except ImportError:
raise unittest.SkipTest(
"Test only runs when _remote_debugging is available"
@@ -40,7 +46,6 @@ def test_mode_validation(self):
mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
self.assertRaises(SystemExit) as cm,
):
- from profiling.sampling.cli import main
main()
self.assertEqual(cm.exception.code, 2) # argparse error
@@ -49,16 +54,6 @@ def test_mode_validation(self):
def test_frames_filtered_with_skip_idle(self):
"""Test that frames are actually filtered when skip_idle=True."""
- # Import thread status flags
- try:
- from _remote_debugging import (
- THREAD_STATUS_HAS_GIL,
- THREAD_STATUS_ON_CPU,
- )
- except ImportError:
- THREAD_STATUS_HAS_GIL = 1 << 0
- THREAD_STATUS_ON_CPU = 1 << 1
-
# Create mock frames with different thread statuses
class MockThreadInfoWithStatus:
def __init__(self, thread_id, frame_info, status):
@@ -240,7 +235,6 @@ class TestGilModeFiltering(unittest.TestCase):
def test_gil_mode_validation(self):
"""Test that CLI accepts gil mode choice correctly."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
@@ -298,7 +292,6 @@ def test_gil_mode_sample_function_call(self):
def test_gil_mode_cli_argument_parsing(self):
"""Test CLI argument parsing for GIL mode with various options."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
@@ -405,7 +398,6 @@ def test_mode_constants_are_defined(self):
def test_parse_mode_function(self):
"""Test the _parse_mode function with all valid modes."""
- from profiling.sampling.cli import _parse_mode
self.assertEqual(_parse_mode("wall"), 0)
self.assertEqual(_parse_mode("cpu"), 1)
self.assertEqual(_parse_mode("gil"), 2)
@@ -422,7 +414,6 @@ class TestExceptionModeFiltering(unittest.TestCase):
def test_exception_mode_validation(self):
"""Test that CLI accepts exception mode choice correctly."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
@@ -480,7 +471,6 @@ def test_exception_mode_sample_function_call(self):
def test_exception_mode_cli_argument_parsing(self):
"""Test CLI argument parsing for exception mode with various
options."""
- from profiling.sampling.cli import main
test_args = [
"profiling.sampling.cli",
@@ -512,7 +502,6 @@ def test_exception_mode_cli_argument_parsing(self):
def test_exception_mode_constants_are_defined(self):
"""Test that exception mode constant is properly defined."""
- from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
self.assertEqual(PROFILING_MODE_EXCEPTION, 4)
def test_exception_mode_integration_filtering(self):
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
index 822f559561eb0a..8d70a1d2ef8cfc 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
@@ -1,6 +1,7 @@
"""Tests for sampling profiler core functionality."""
import io
+import re
from unittest import mock
import unittest
@@ -591,7 +592,6 @@ def test_print_sampled_stats_sort_by_name(self):
# Extract just the function names for comparison
func_names = []
- import re
for line in data_lines:
# Function name is between the last ( and ), accounting for ANSI
color codes
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]