https://github.com/python/cpython/commit/6b9a6c6ec3bbc9795df67b87340e2ea58f42b3d4
commit: 6b9a6c6ec3bbc9795df67b87340e2ea58f42b3d4
branch: main
author: Pablo Galindo Salgado <[email protected]>
committer: pablogsal <[email protected]>
date: 2026-01-02T02:31:39Z
summary:

gh-138122: Move local imports to module level in sampling profiler (#143257)

files:
M Lib/profiling/sampling/binary_reader.py
M Lib/profiling/sampling/cli.py
M Lib/profiling/sampling/heatmap_collector.py
M Lib/profiling/sampling/live_collector/collector.py
M Lib/profiling/sampling/live_collector/widgets.py
M Lib/profiling/sampling/pstats_collector.py
M Lib/profiling/sampling/stack_collector.py
M Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
M Lib/test/test_profiling/test_sampling_profiler/test_async.py
M Lib/test/test_profiling/test_sampling_profiler/test_children.py
M Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
M Lib/test/test_profiling/test_sampling_profiler/test_integration.py
M 
Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
M Lib/test/test_profiling/test_sampling_profiler/test_modes.py
M Lib/test/test_profiling/test_sampling_profiler/test_profiler.py

diff --git a/Lib/profiling/sampling/binary_reader.py 
b/Lib/profiling/sampling/binary_reader.py
index 50c96668cc585b..a11be3652597a6 100644
--- a/Lib/profiling/sampling/binary_reader.py
+++ b/Lib/profiling/sampling/binary_reader.py
@@ -1,5 +1,11 @@
 """Thin Python wrapper around C binary reader for profiling data."""
 
+import _remote_debugging
+
+from .gecko_collector import GeckoCollector
+from .stack_collector import FlamegraphCollector, CollapsedStackCollector
+from .pstats_collector import PstatsCollector
+
 
 class BinaryReader:
     """High-performance binary reader using C implementation.
@@ -23,7 +29,6 @@ def __init__(self, filename):
         self._reader = None
 
     def __enter__(self):
-        import _remote_debugging
         self._reader = _remote_debugging.BinaryReader(self.filename)
         return self
 
@@ -99,10 +104,6 @@ def convert_binary_to_format(input_file, output_file, 
output_format,
     Returns:
         int: Number of samples converted
     """
-    from .gecko_collector import GeckoCollector
-    from .stack_collector import FlamegraphCollector, CollapsedStackCollector
-    from .pstats_collector import PStatsCollector
-
     with BinaryReader(input_file) as reader:
         info = reader.get_info()
         interval = sample_interval_usec or info['sample_interval_us']
@@ -113,7 +114,7 @@ def convert_binary_to_format(input_file, output_file, 
output_format,
         elif output_format == 'collapsed':
             collector = CollapsedStackCollector(interval)
         elif output_format == 'pstats':
-            collector = PStatsCollector(interval)
+            collector = PstatsCollector(interval)
         elif output_format == 'gecko':
             collector = GeckoCollector(interval)
         else:
diff --git a/Lib/profiling/sampling/cli.py b/Lib/profiling/sampling/cli.py
index ea3926c9565809..c0dcda46fc29d3 100644
--- a/Lib/profiling/sampling/cli.py
+++ b/Lib/profiling/sampling/cli.py
@@ -36,6 +36,12 @@
     SORT_MODE_NSAMPLES_CUMUL,
 )
 
+try:
+    from ._child_monitor import ChildProcessMonitor
+except ImportError:
+    # _remote_debugging module not available on this platform (e.g., WASI)
+    ChildProcessMonitor = None
+
 try:
     from .live_collector import LiveStatsCollector
 except ImportError:
@@ -94,8 +100,6 @@ class CustomFormatter(
 }
 
 def _setup_child_monitor(args, parent_pid):
-    from ._child_monitor import ChildProcessMonitor
-
     # Build CLI args for child profilers (excluding --subprocesses to avoid 
recursion)
     child_cli_args = _build_child_profiler_args(args)
 
@@ -691,6 +695,11 @@ def _validate_args(args, parser):
 
     # --subprocesses is incompatible with --live
     if hasattr(args, 'subprocesses') and args.subprocesses:
+        if ChildProcessMonitor is None:
+            parser.error(
+                "--subprocesses is not available on this platform "
+                "(requires _remote_debugging module)."
+            )
         if hasattr(args, 'live') and args.live:
             parser.error("--subprocesses is incompatible with --live mode.")
 
@@ -1160,8 +1169,6 @@ def _handle_live_run(args):
 
 def _handle_replay(args):
     """Handle the 'replay' command - convert binary profile to another 
format."""
-    import os
-
     if not os.path.exists(args.input_file):
         sys.exit(f"Error: Input file not found: {args.input_file}")
 
diff --git a/Lib/profiling/sampling/heatmap_collector.py 
b/Lib/profiling/sampling/heatmap_collector.py
index 022e94d014f9b7..b6d9ff79e8ceec 100644
--- a/Lib/profiling/sampling/heatmap_collector.py
+++ b/Lib/profiling/sampling/heatmap_collector.py
@@ -18,6 +18,7 @@
 from ._css_utils import get_combined_css
 from ._format_utils import fmt
 from .collector import normalize_location, extract_lineno
+from .opcode_utils import get_opcode_info, format_opcode
 from .stack_collector import StackTraceCollector
 
 
@@ -642,8 +643,6 @@ def _get_bytecode_data_for_line(self, filename, lineno):
         Returns:
             List of dicts with instruction info, sorted by samples descending
         """
-        from .opcode_utils import get_opcode_info, format_opcode
-
         key = (filename, lineno)
         opcode_data = self.line_opcodes.get(key, {})
 
@@ -1046,8 +1045,6 @@ def _render_source_with_highlights(self, line_content: 
str, line_num: int,
         Simple: collect ranges with sample counts, assign each byte position to
         smallest covering range, then emit spans for contiguous runs with 
sample data.
         """
-        import html as html_module
-
         content = line_content.rstrip('\n')
         if not content:
             return ''
@@ -1070,7 +1067,7 @@ def _render_source_with_highlights(self, line_content: 
str, line_num: int,
                             range_data[key]['opcodes'].append(opname)
 
         if not range_data:
-            return html_module.escape(content)
+            return html.escape(content)
 
         # For each byte position, find the smallest covering range
         byte_to_range = {}
@@ -1098,7 +1095,7 @@ def _render_source_with_highlights(self, line_content: 
str, line_num: int,
         def flush_span():
             nonlocal span_chars, current_range
             if span_chars:
-                text = html_module.escape(''.join(span_chars))
+                text = html.escape(''.join(span_chars))
                 if current_range:
                     data = range_data.get(current_range, {'samples': 0, 
'opcodes': []})
                     samples = data['samples']
@@ -1112,7 +1109,7 @@ def flush_span():
                                   f'data-samples="{samples}" '
                                   f'data-max-samples="{max_range_samples}" '
                                   f'data-pct="{pct}" '
-                                  
f'data-opcodes="{html_module.escape(opcodes)}">{text}</span>')
+                                  
f'data-opcodes="{html.escape(opcodes)}">{text}</span>')
                 else:
                     result.append(text)
                 span_chars = []
diff --git a/Lib/profiling/sampling/live_collector/collector.py 
b/Lib/profiling/sampling/live_collector/collector.py
index c91ed9e0ea9367..c03df4075277cd 100644
--- a/Lib/profiling/sampling/live_collector/collector.py
+++ b/Lib/profiling/sampling/live_collector/collector.py
@@ -916,8 +916,6 @@ def _show_terminal_size_warning_and_wait(self, height, 
width):
 
     def _handle_input(self):
         """Handle keyboard input (non-blocking)."""
-        from . import constants
-
         self.display.set_nodelay(True)
         ch = self.display.get_input()
 
diff --git a/Lib/profiling/sampling/live_collector/widgets.py 
b/Lib/profiling/sampling/live_collector/widgets.py
index ac215dbfeb896e..86d2649f875e62 100644
--- a/Lib/profiling/sampling/live_collector/widgets.py
+++ b/Lib/profiling/sampling/live_collector/widgets.py
@@ -31,6 +31,7 @@
     PROFILING_MODE_GIL,
     PROFILING_MODE_WALL,
 )
+from ..opcode_utils import get_opcode_info, format_opcode
 
 
 class Widget(ABC):
@@ -1013,8 +1014,6 @@ def render(self, line, width, **kwargs):
         Returns:
             Next available line number
         """
-        from ..opcode_utils import get_opcode_info, format_opcode
-
         stats_list = kwargs.get("stats_list", [])
         height = kwargs.get("height", 24)
         selected_row = self.collector.selected_row
diff --git a/Lib/profiling/sampling/pstats_collector.py 
b/Lib/profiling/sampling/pstats_collector.py
index e0dc9ab6bb7edb..6be1d698ffaa9a 100644
--- a/Lib/profiling/sampling/pstats_collector.py
+++ b/Lib/profiling/sampling/pstats_collector.py
@@ -1,9 +1,10 @@
 import collections
 import marshal
+import pstats
 
 from _colorize import ANSIColors
 from .collector import Collector, extract_lineno
-from .constants import MICROSECONDS_PER_SECOND
+from .constants import MICROSECONDS_PER_SECOND, PROFILING_MODE_CPU
 
 
 class PstatsCollector(Collector):
@@ -86,9 +87,6 @@ def create_stats(self):
 
     def print_stats(self, sort=-1, limit=None, show_summary=True, mode=None):
         """Print formatted statistics to stdout."""
-        import pstats
-        from .constants import PROFILING_MODE_CPU
-
         # Create stats object
         stats = pstats.SampledStats(self).strip_dirs()
         if not stats.stats:
diff --git a/Lib/profiling/sampling/stack_collector.py 
b/Lib/profiling/sampling/stack_collector.py
index 55e643d0e9c8cb..4e213cfe41ca24 100644
--- a/Lib/profiling/sampling/stack_collector.py
+++ b/Lib/profiling/sampling/stack_collector.py
@@ -6,6 +6,7 @@
 import linecache
 import os
 import sys
+import sysconfig
 
 from ._css_utils import get_combined_css
 from .collector import Collector, extract_lineno
@@ -244,7 +245,6 @@ def convert_children(children, min_samples):
             }
 
         # Calculate thread status percentages for display
-        import sysconfig
         is_free_threaded = bool(sysconfig.get_config_var("Py_GIL_DISABLED"))
         total_threads = max(1, self.thread_status_counts["total"])
         thread_stats = {
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
index bcd4de7f5d7ebe..11b1ad84242fd4 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_advanced.py
@@ -11,6 +11,8 @@
     import _remote_debugging  # noqa: F401
     import profiling.sampling
     import profiling.sampling.sample
+    from profiling.sampling.pstats_collector import PstatsCollector
+    from profiling.sampling.stack_collector import CollapsedStackCollector
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -61,7 +63,6 @@ def test_gc_frames_enabled(self):
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.pstats_collector import PstatsCollector
             collector = PstatsCollector(sample_interval_usec=5000, 
skip_idle=False)
             profiling.sampling.sample.sample(
                 subproc.process.pid,
@@ -88,7 +89,6 @@ def test_gc_frames_disabled(self):
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.pstats_collector import PstatsCollector
             collector = PstatsCollector(sample_interval_usec=5000, 
skip_idle=False)
             profiling.sampling.sample.sample(
                 subproc.process.pid,
@@ -140,7 +140,6 @@ def test_native_frames_enabled(self):
                 io.StringIO() as captured_output,
                 mock.patch("sys.stdout", captured_output),
             ):
-                from profiling.sampling.stack_collector import 
CollapsedStackCollector
                 collector = CollapsedStackCollector(1000, skip_idle=False)
                 profiling.sampling.sample.sample(
                     subproc.process.pid,
@@ -176,7 +175,6 @@ def test_native_frames_disabled(self):
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.pstats_collector import PstatsCollector
             collector = PstatsCollector(sample_interval_usec=5000, 
skip_idle=False)
             profiling.sampling.sample.sample(
                 subproc.process.pid,
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_async.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_async.py
index d8ca86c996bffa..1f5685717b6273 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_async.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_async.py
@@ -6,11 +6,14 @@
 3. Stack traversal: _build_linear_stacks() with BFS
 """
 
+import inspect
 import unittest
 
 try:
     import _remote_debugging  # noqa: F401
     from profiling.sampling.pstats_collector import PstatsCollector
+    from profiling.sampling.stack_collector import FlamegraphCollector
+    from profiling.sampling.sample import sample, sample_live, SampleProfiler
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -561,8 +564,6 @@ class TestFlamegraphCollectorAsync(unittest.TestCase):
 
     def test_flamegraph_with_async_frames(self):
         """Test FlamegraphCollector correctly processes async task frames."""
-        from profiling.sampling.stack_collector import FlamegraphCollector
-
         collector = FlamegraphCollector(sample_interval_usec=1000)
 
         # Build async task tree: Root -> Child
@@ -607,8 +608,6 @@ def test_flamegraph_with_async_frames(self):
 
     def test_flamegraph_with_task_markers(self):
         """Test FlamegraphCollector includes <task> boundary markers."""
-        from profiling.sampling.stack_collector import FlamegraphCollector
-
         collector = FlamegraphCollector(sample_interval_usec=1000)
 
         task = MockTaskInfo(
@@ -643,8 +642,6 @@ def find_task_marker(node, depth=0):
 
     def test_flamegraph_multiple_async_samples(self):
         """Test FlamegraphCollector aggregates multiple async samples 
correctly."""
-        from profiling.sampling.stack_collector import FlamegraphCollector
-
         collector = FlamegraphCollector(sample_interval_usec=1000)
 
         task = MockTaskInfo(
@@ -675,25 +672,16 @@ class TestAsyncAwareParameterFlow(unittest.TestCase):
 
     def test_sample_function_accepts_async_aware(self):
         """Test that sample() function accepts async_aware parameter."""
-        from profiling.sampling.sample import sample
-        import inspect
-
         sig = inspect.signature(sample)
         self.assertIn("async_aware", sig.parameters)
 
     def test_sample_live_function_accepts_async_aware(self):
         """Test that sample_live() function accepts async_aware parameter."""
-        from profiling.sampling.sample import sample_live
-        import inspect
-
         sig = inspect.signature(sample_live)
         self.assertIn("async_aware", sig.parameters)
 
     def test_sample_profiler_sample_accepts_async_aware(self):
         """Test that SampleProfiler.sample() accepts async_aware parameter."""
-        from profiling.sampling.sample import SampleProfiler
-        import inspect
-
         sig = inspect.signature(SampleProfiler.sample)
         self.assertIn("async_aware", sig.parameters)
 
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_children.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_children.py
index 84d50cd2088a9e..bb49faa890f348 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_children.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_children.py
@@ -10,6 +10,7 @@
 import threading
 import time
 import unittest
+from unittest.mock import MagicMock, patch
 
 from test.support import (
     SHORT_TIMEOUT,
@@ -17,6 +18,40 @@
     requires_remote_subprocess_debugging,
 )
 
+# Guard imports that require _remote_debugging module.
+# This module is not available on all platforms (e.g., WASI).
+try:
+    from profiling.sampling._child_monitor import (
+        get_child_pids,
+        ChildProcessMonitor,
+        is_python_process,
+        _MAX_CHILD_PROFILERS,
+        _CLEANUP_INTERVAL_CYCLES,
+    )
+except ImportError:
+    # Module will be skipped via @requires_remote_subprocess_debugging 
decorators
+    get_child_pids = None
+    ChildProcessMonitor = None
+    is_python_process = None
+    _MAX_CHILD_PROFILERS = None
+    _CLEANUP_INTERVAL_CYCLES = None
+
+try:
+    from profiling.sampling.cli import (
+        _add_sampling_options,
+        _validate_args,
+        _build_child_profiler_args,
+        _build_output_pattern,
+        _setup_child_monitor,
+    )
+except ImportError:
+    # cli module imports sample module which requires _remote_debugging
+    _add_sampling_options = None
+    _validate_args = None
+    _build_child_profiler_args = None
+    _build_output_pattern = None
+    _setup_child_monitor = None
+
 from .helpers import _cleanup_process
 
 # String to check for in stderr when profiler lacks permissions (e.g., macOS)
@@ -100,8 +135,6 @@ def test_get_child_pids_from_remote_debugging(self):
 
     def test_get_child_pids_fallback(self):
         """Test the fallback implementation for get_child_pids."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Test with current process
         result = get_child_pids(os.getpid())
         self.assertIsInstance(result, list)
@@ -109,8 +142,6 @@ def test_get_child_pids_fallback(self):
     @unittest.skipUnless(sys.platform == "linux", "Linux only")
     def test_discover_child_process_linux(self):
         """Test that we can discover child processes on Linux."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Create a child process
         proc = subprocess.Popen(
             [sys.executable, "-c", "import time; time.sleep(10)"],
@@ -139,8 +170,6 @@ def test_discover_child_process_linux(self):
 
     def test_recursive_child_discovery(self):
         """Test that recursive=True finds grandchildren."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Create a child that spawns a grandchild and keeps a reference to it
         # so we can clean it up via the child process
         code = """
@@ -256,8 +285,6 @@ def wait_for_signal():
 
     def test_nonexistent_pid_returns_empty(self):
         """Test that nonexistent PID returns empty list."""
-        from profiling.sampling._child_monitor import get_child_pids
-
         # Use a very high PID that's unlikely to exist
         result = get_child_pids(999999999)
         self.assertEqual(result, [])
@@ -275,8 +302,6 @@ def tearDown(self):
 
     def test_monitor_creation(self):
         """Test that ChildProcessMonitor can be created."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(),
             cli_args=["-r", "10khz", "-d", "5"],
@@ -288,8 +313,6 @@ def test_monitor_creation(self):
 
     def test_monitor_lifecycle(self):
         """Test monitor lifecycle via context manager."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -307,8 +330,6 @@ def test_monitor_lifecycle(self):
 
     def test_spawned_profilers_property(self):
         """Test that spawned_profilers returns a copy of the list."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -320,8 +341,6 @@ def test_spawned_profilers_property(self):
 
     def test_context_manager(self):
         """Test that ChildProcessMonitor works as a context manager."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         with ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         ) as monitor:
@@ -344,8 +363,6 @@ def tearDown(self):
 
     def test_subprocesses_flag_parsed(self):
         """Test that --subprocesses flag is recognized."""
-        from profiling.sampling.cli import _add_sampling_options
-
         parser = argparse.ArgumentParser()
         _add_sampling_options(parser)
 
@@ -359,8 +376,6 @@ def test_subprocesses_flag_parsed(self):
 
     def test_subprocesses_incompatible_with_live(self):
         """Test that --subprocesses is incompatible with --live."""
-        from profiling.sampling.cli import _validate_args
-
         # Create mock args with both subprocesses and live
         args = argparse.Namespace(
             subprocesses=True,
@@ -383,8 +398,6 @@ def test_subprocesses_incompatible_with_live(self):
 
     def test_build_child_profiler_args(self):
         """Test building CLI args for child profilers."""
-        from profiling.sampling.cli import _build_child_profiler_args
-
         args = argparse.Namespace(
             sample_interval_usec=200,
             duration=15,
@@ -446,8 +459,6 @@ def assert_flag_value_pair(flag, value):
 
     def test_build_child_profiler_args_no_gc(self):
         """Test building CLI args with --no-gc."""
-        from profiling.sampling.cli import _build_child_profiler_args
-
         args = argparse.Namespace(
             sample_interval_usec=100,
             duration=5,
@@ -471,8 +482,6 @@ def test_build_child_profiler_args_no_gc(self):
 
     def test_build_output_pattern_with_outfile(self):
         """Test output pattern generation with user-specified output."""
-        from profiling.sampling.cli import _build_output_pattern
-
         # With extension
         args = argparse.Namespace(outfile="output.html", format="flamegraph")
         pattern = _build_output_pattern(args)
@@ -485,8 +494,6 @@ def test_build_output_pattern_with_outfile(self):
 
     def test_build_output_pattern_default(self):
         """Test output pattern generation with default output."""
-        from profiling.sampling.cli import _build_output_pattern
-
         # Flamegraph format
         args = argparse.Namespace(outfile=None, format="flamegraph")
         pattern = _build_output_pattern(args)
@@ -512,8 +519,6 @@ def tearDown(self):
 
     def test_setup_child_monitor(self):
         """Test setting up a child monitor from args."""
-        from profiling.sampling.cli import _setup_child_monitor
-
         args = argparse.Namespace(
             sample_interval_usec=100,
             duration=5,
@@ -553,8 +558,6 @@ def tearDown(self):
 
     def test_is_python_process_current_process(self):
         """Test that current process is detected as Python."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Current process should be Python
         result = is_python_process(os.getpid())
         self.assertTrue(
@@ -564,8 +567,6 @@ def test_is_python_process_current_process(self):
 
     def test_is_python_process_python_subprocess(self):
         """Test that a Python subprocess is detected as Python."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Start a Python subprocess
         proc = subprocess.Popen(
             [sys.executable, "-c", "import time; time.sleep(10)"],
@@ -598,8 +599,6 @@ def test_is_python_process_python_subprocess(self):
     @unittest.skipUnless(sys.platform == "linux", "Linux only test")
     def test_is_python_process_non_python_subprocess(self):
         """Test that a non-Python subprocess is not detected as Python."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Start a non-Python subprocess (sleep command)
         proc = subprocess.Popen(
             ["sleep", "10"],
@@ -624,8 +623,6 @@ def test_is_python_process_non_python_subprocess(self):
 
     def test_is_python_process_nonexistent_pid(self):
         """Test that nonexistent PID returns False."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Use a very high PID that's unlikely to exist
         result = is_python_process(999999999)
         self.assertFalse(
@@ -635,8 +632,6 @@ def test_is_python_process_nonexistent_pid(self):
 
     def test_is_python_process_exited_process(self):
         """Test handling of a process that exits quickly."""
-        from profiling.sampling._child_monitor import is_python_process
-
         # Start a process that exits immediately
         proc = subprocess.Popen(
             [sys.executable, "-c", "pass"],
@@ -666,8 +661,6 @@ def tearDown(self):
 
     def test_max_profilers_constant_exists(self):
         """Test that _MAX_CHILD_PROFILERS constant is defined."""
-        from profiling.sampling._child_monitor import _MAX_CHILD_PROFILERS
-
         self.assertEqual(
             _MAX_CHILD_PROFILERS,
             100,
@@ -676,8 +669,6 @@ def test_max_profilers_constant_exists(self):
 
     def test_cleanup_interval_constant_exists(self):
         """Test that _CLEANUP_INTERVAL_CYCLES constant is defined."""
-        from profiling.sampling._child_monitor import _CLEANUP_INTERVAL_CYCLES
-
         self.assertEqual(
             _CLEANUP_INTERVAL_CYCLES,
             10,
@@ -686,12 +677,6 @@ def test_cleanup_interval_constant_exists(self):
 
     def test_monitor_respects_max_limit(self):
         """Test that monitor refuses to spawn more than 
_MAX_CHILD_PROFILERS."""
-        from profiling.sampling._child_monitor import (
-            ChildProcessMonitor,
-            _MAX_CHILD_PROFILERS,
-        )
-        from unittest.mock import MagicMock, patch
-
         # Create a monitor
         monitor = ChildProcessMonitor(
             pid=os.getpid(),
@@ -744,8 +729,6 @@ def tearDown(self):
 
     def test_wait_for_profilers_empty_list(self):
         """Test that wait_for_profilers returns immediately with no 
profilers."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -772,8 +755,6 @@ def test_wait_for_profilers_empty_list(self):
 
     def test_wait_for_profilers_with_completed_process(self):
         """Test waiting for profilers that complete quickly."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -812,8 +793,6 @@ def test_wait_for_profilers_with_completed_process(self):
 
     def test_wait_for_profilers_timeout(self):
         """Test that wait_for_profilers respects timeout."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
@@ -852,8 +831,6 @@ def test_wait_for_profilers_timeout(self):
 
     def test_wait_for_profilers_multiple(self):
         """Test waiting for multiple profilers."""
-        from profiling.sampling._child_monitor import ChildProcessMonitor
-
         monitor = ChildProcessMonitor(
             pid=os.getpid(), cli_args=[], output_pattern=None
         )
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
index 30615a7d31d86c..13bdb4e111364c 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
@@ -2,6 +2,7 @@
 
 import json
 import marshal
+import opcode
 import os
 import tempfile
 import unittest
@@ -1437,7 +1438,6 @@ class TestOpcodeFormatting(unittest.TestCase):
 
     def test_get_opcode_info_standard_opcode(self):
         """Test get_opcode_info for a standard opcode."""
-        import opcode
         # LOAD_CONST is a standard opcode
         load_const = opcode.opmap.get('LOAD_CONST')
         if load_const is not None:
@@ -1455,7 +1455,6 @@ def test_get_opcode_info_unknown_opcode(self):
 
     def test_format_opcode_standard(self):
         """Test format_opcode for a standard opcode."""
-        import opcode
         load_const = opcode.opmap.get('LOAD_CONST')
         if load_const is not None:
             formatted = format_opcode(load_const)
@@ -1463,7 +1462,6 @@ def test_format_opcode_standard(self):
 
     def test_format_opcode_specialized(self):
         """Test format_opcode for a specialized opcode shows base in parens."""
-        import opcode
         if not hasattr(opcode, '_specialized_opmap'):
             self.skipTest("No specialized opcodes in this Python version")
         if not hasattr(opcode, '_specializations'):
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
index b82474858ddd4a..c6731e956391a9 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
@@ -18,6 +18,7 @@
     from profiling.sampling.pstats_collector import PstatsCollector
     from profiling.sampling.stack_collector import CollapsedStackCollector
     from profiling.sampling.sample import SampleProfiler, _is_process_running
+    from profiling.sampling.cli import main
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -547,7 +548,6 @@ def test_sample_target_script(self):
             io.StringIO() as captured_output,
             mock.patch("sys.stdout", captured_output),
         ):
-            from profiling.sampling.cli import main
             main()
 
             output = captured_output.getvalue()
@@ -585,7 +585,6 @@ def test_sample_target_module(self):
             # Change to temp directory so subprocess can find the module
             contextlib.chdir(tempdir.name),
         ):
-            from profiling.sampling.cli import main
             main()
 
             output = captured_output.getvalue()
@@ -714,8 +713,7 @@ def test_live_incompatible_with_pstats_options(self):
                 test_args = ["profiling.sampling.cli", "run", "--live"] + args 
+ ["test.py"]
                 with mock.patch("sys.argv", test_args):
                     with self.assertRaises(SystemExit) as cm:
-                        from profiling.sampling.cli import main
-                        main()
+                                    main()
                     self.assertNotEqual(cm.exception.code, 0)
 
     def test_live_incompatible_with_multiple_pstats_options(self):
@@ -727,8 +725,7 @@ def 
test_live_incompatible_with_multiple_pstats_options(self):
 
         with mock.patch("sys.argv", test_args):
             with self.assertRaises(SystemExit) as cm:
-                from profiling.sampling.cli import main
-                main()
+                    main()
             self.assertNotEqual(cm.exception.code, 0)
 
     def test_live_incompatible_with_pstats_default_values(self):
@@ -738,8 +735,7 @@ def test_live_incompatible_with_pstats_default_values(self):
 
         with mock.patch("sys.argv", test_args):
             with self.assertRaises(SystemExit) as cm:
-                from profiling.sampling.cli import main
-                main()
+                    main()
             self.assertNotEqual(cm.exception.code, 0)
 
         # Test with --limit=15 (the default value)
@@ -747,8 +743,7 @@ def test_live_incompatible_with_pstats_default_values(self):
 
         with mock.patch("sys.argv", test_args):
             with self.assertRaises(SystemExit) as cm:
-                from profiling.sampling.cli import main
-                main()
+                    main()
             self.assertNotEqual(cm.exception.code, 0)
 
 
diff --git 
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
 
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
index 38f1d03e4939f1..8342faffb94762 100644
--- 
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
+++ 
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_interaction.py
@@ -383,10 +383,9 @@ def test_finished_state_footer_message(self):
 
     def test_finished_state_freezes_time(self):
         """Test that time displays are frozen when finished."""
-        import time as time_module
 
         # Set up collector with known start time
-        self.collector.start_time = time_module.perf_counter() - 10.0  # 10 
seconds ago
+        self.collector.start_time = time.perf_counter() - 10.0  # 10 seconds 
ago
 
         # Mark as finished - this should freeze the time
         self.collector.mark_finished()
@@ -396,7 +395,7 @@ def test_finished_state_freezes_time(self):
         frozen_time_display = self.collector.current_time_display
 
         # Wait a bit to ensure time would advance
-        time_module.sleep(0.1)
+        time.sleep(0.1)
 
         # Time should remain frozen
         self.assertEqual(self.collector.elapsed_time, frozen_elapsed)
@@ -1215,7 +1214,6 @@ def test_reset_blocked_when_finished(self):
 
     def test_time_display_fix_when_finished(self):
         """Test that time display shows correct frozen time when finished."""
-        import time as time_module
 
         # Mark as finished to freeze time
         self.collector.mark_finished()
@@ -1228,7 +1226,7 @@ def test_time_display_fix_when_finished(self):
         frozen_time = self.collector.current_time_display
 
         # Wait a bit
-        time_module.sleep(0.1)
+        time.sleep(0.1)
 
         # Should still show the same frozen time (not jump to wrong time)
         self.assertEqual(self.collector.current_time_display, frozen_time)
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py
index 877237866b1e65..0b38fb4ad4bcf6 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_modes.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_modes.py
@@ -9,6 +9,12 @@
     import profiling.sampling
     import profiling.sampling.sample
     from profiling.sampling.pstats_collector import PstatsCollector
+    from profiling.sampling.cli import main, _parse_mode
+    from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
+    from _remote_debugging import (
+        THREAD_STATUS_HAS_GIL,
+        THREAD_STATUS_ON_CPU,
+    )
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _remote_debugging is available"
@@ -40,7 +46,6 @@ def test_mode_validation(self):
             mock.patch("sys.stderr", io.StringIO()) as mock_stderr,
             self.assertRaises(SystemExit) as cm,
         ):
-            from profiling.sampling.cli import main
             main()
 
         self.assertEqual(cm.exception.code, 2)  # argparse error
@@ -49,16 +54,6 @@ def test_mode_validation(self):
 
     def test_frames_filtered_with_skip_idle(self):
         """Test that frames are actually filtered when skip_idle=True."""
-        # Import thread status flags
-        try:
-            from _remote_debugging import (
-                THREAD_STATUS_HAS_GIL,
-                THREAD_STATUS_ON_CPU,
-            )
-        except ImportError:
-            THREAD_STATUS_HAS_GIL = 1 << 0
-            THREAD_STATUS_ON_CPU = 1 << 1
-
         # Create mock frames with different thread statuses
         class MockThreadInfoWithStatus:
             def __init__(self, thread_id, frame_info, status):
@@ -240,7 +235,6 @@ class TestGilModeFiltering(unittest.TestCase):
 
     def test_gil_mode_validation(self):
         """Test that CLI accepts gil mode choice correctly."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -298,7 +292,6 @@ def test_gil_mode_sample_function_call(self):
 
     def test_gil_mode_cli_argument_parsing(self):
         """Test CLI argument parsing for GIL mode with various options."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -405,7 +398,6 @@ def test_mode_constants_are_defined(self):
 
     def test_parse_mode_function(self):
         """Test the _parse_mode function with all valid modes."""
-        from profiling.sampling.cli import _parse_mode
         self.assertEqual(_parse_mode("wall"), 0)
         self.assertEqual(_parse_mode("cpu"), 1)
         self.assertEqual(_parse_mode("gil"), 2)
@@ -422,7 +414,6 @@ class TestExceptionModeFiltering(unittest.TestCase):
 
     def test_exception_mode_validation(self):
         """Test that CLI accepts exception mode choice correctly."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -480,7 +471,6 @@ def test_exception_mode_sample_function_call(self):
 
     def test_exception_mode_cli_argument_parsing(self):
         """Test CLI argument parsing for exception mode with various 
options."""
-        from profiling.sampling.cli import main
 
         test_args = [
             "profiling.sampling.cli",
@@ -512,7 +502,6 @@ def test_exception_mode_cli_argument_parsing(self):
 
     def test_exception_mode_constants_are_defined(self):
         """Test that exception mode constant is properly defined."""
-        from profiling.sampling.constants import PROFILING_MODE_EXCEPTION
         self.assertEqual(PROFILING_MODE_EXCEPTION, 4)
 
     def test_exception_mode_integration_filtering(self):
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
index 822f559561eb0a..8d70a1d2ef8cfc 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_profiler.py
@@ -1,6 +1,7 @@
 """Tests for sampling profiler core functionality."""
 
 import io
+import re
 from unittest import mock
 import unittest
 
@@ -591,7 +592,6 @@ def test_print_sampled_stats_sort_by_name(self):
 
         # Extract just the function names for comparison
         func_names = []
-        import re
 
         for line in data_lines:
             # Function name is between the last ( and ), accounting for ANSI 
color codes

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]

Reply via email to