https://github.com/python/cpython/commit/1356fbed7b1bc0356f0ee3a4bbe140abb25d788d
commit: 1356fbed7b1bc0356f0ee3a4bbe140abb25d788d
branch: main
author: Pablo Galindo Salgado <[email protected]>
committer: pablogsal <[email protected]>
date: 2025-12-12T00:50:17Z
summary:
gh-142374: Fix recursive function cumulative over-counting in sampling profiler
(#142378)
files:
A Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst
M Lib/profiling/sampling/heatmap_collector.py
M Lib/profiling/sampling/live_collector/collector.py
M Lib/profiling/sampling/live_collector/widgets.py
M Lib/profiling/sampling/pstats_collector.py
M Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
M Lib/test/test_profiling/test_sampling_profiler/test_integration.py
M Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
M Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
diff --git a/Lib/profiling/sampling/heatmap_collector.py
b/Lib/profiling/sampling/heatmap_collector.py
index a860ed870e3e40..45649ce2009bb6 100644
--- a/Lib/profiling/sampling/heatmap_collector.py
+++ b/Lib/profiling/sampling/heatmap_collector.py
@@ -491,6 +491,10 @@ def __init__(self, *args, **kwargs):
# File index (populated during export)
self.file_index = {}
+ # Reusable set for deduplicating line locations within a single sample.
+ # This avoids over-counting recursive functions in cumulative stats.
+ self._seen_lines = set()
+
def set_stats(self, sample_interval_usec, duration_sec, sample_rate,
error_rate=None, missed_samples=None, **kwargs):
"""Set profiling statistics to include in heatmap output.
@@ -524,6 +528,7 @@ def process_frames(self, frames, thread_id):
thread_id: Thread ID for this stack trace
"""
self._total_samples += 1
+ self._seen_lines.clear()
for i, (filename, location, funcname, opcode) in enumerate(frames):
# Normalize location to 4-tuple format
@@ -533,7 +538,14 @@ def process_frames(self, frames, thread_id):
continue
# frames[0] is the leaf - where execution is actually happening
- self._record_line_sample(filename, lineno, funcname, is_leaf=(i ==
0))
+ is_leaf = (i == 0)
+ line_key = (filename, lineno)
+ count_cumulative = line_key not in self._seen_lines
+ if count_cumulative:
+ self._seen_lines.add(line_key)
+
+ self._record_line_sample(filename, lineno, funcname,
is_leaf=is_leaf,
+ count_cumulative=count_cumulative)
if opcode is not None:
# Set opcodes_enabled flag when we first encounter opcode data
@@ -562,11 +574,13 @@ def _is_valid_frame(self, filename, lineno):
return True
- def _record_line_sample(self, filename, lineno, funcname, is_leaf=False):
+ def _record_line_sample(self, filename, lineno, funcname, is_leaf=False,
+ count_cumulative=True):
"""Record a sample for a specific line."""
# Track cumulative samples (all occurrences in stack)
- self.line_samples[(filename, lineno)] += 1
- self.file_samples[filename][lineno] += 1
+ if count_cumulative:
+ self.line_samples[(filename, lineno)] += 1
+ self.file_samples[filename][lineno] += 1
# Track self/leaf samples (only when at top of stack)
if is_leaf:
diff --git a/Lib/profiling/sampling/live_collector/collector.py
b/Lib/profiling/sampling/live_collector/collector.py
index 1652089ad3f52d..de541a75db61c1 100644
--- a/Lib/profiling/sampling/live_collector/collector.py
+++ b/Lib/profiling/sampling/live_collector/collector.py
@@ -210,6 +210,8 @@ def __init__(
# Trend tracking (initialized after colors are set up)
self._trend_tracker = None
+ self._seen_locations = set()
+
@property
def elapsed_time(self):
"""Get the elapsed time, frozen when finished."""
@@ -305,15 +307,18 @@ def process_frames(self, frames, thread_id=None):
# Get per-thread data if tracking per-thread
thread_data = self._get_or_create_thread_data(thread_id) if thread_id
is not None else None
+ self._seen_locations.clear()
# Process each frame in the stack to track cumulative calls
# frame.location is (lineno, end_lineno, col_offset, end_col_offset),
int, or None
for frame in frames:
lineno = extract_lineno(frame.location)
location = (frame.filename, lineno, frame.funcname)
- self.result[location]["cumulative_calls"] += 1
- if thread_data:
- thread_data.result[location]["cumulative_calls"] += 1
+ if location not in self._seen_locations:
+ self._seen_locations.add(location)
+ self.result[location]["cumulative_calls"] += 1
+ if thread_data:
+ thread_data.result[location]["cumulative_calls"] += 1
# The top frame gets counted as an inline call (directly executing)
top_frame = frames[0]
@@ -371,11 +376,13 @@ def collect(self, stack_frames):
thread_data.gc_frame_samples += stats["gc_samples"]
# Process frames using pre-selected iterator
+ frames_processed = False
for frames, thread_id in self._get_frame_iterator(stack_frames):
if not frames:
continue
self.process_frames(frames, thread_id=thread_id)
+ frames_processed = True
# Track thread IDs
if thread_id is not None and thread_id not in self.thread_ids:
@@ -388,7 +395,11 @@ def collect(self, stack_frames):
if has_gc_frame:
self.gc_frame_samples += 1
- self.successful_samples += 1
+ # Only count as successful if we actually processed frames
+ # This is important for modes like --mode exception where most samples
+ # may be filtered out at the C level
+ if frames_processed:
+ self.successful_samples += 1
self.total_samples += 1
# Handle input on every sample for instant responsiveness
@@ -659,9 +670,11 @@ def build_stats_list(self):
total_time = direct_calls * self.sample_interval_sec
cumulative_time = cumulative_calls * self.sample_interval_sec
- # Calculate sample percentages
- sample_pct = (direct_calls / self.total_samples * 100) if
self.total_samples > 0 else 0
- cumul_pct = (cumulative_calls / self.total_samples * 100) if
self.total_samples > 0 else 0
+ # Calculate sample percentages using successful_samples as
denominator
+ # This ensures percentages are relative to samples that actually
had data,
+ # not all sampling attempts (important for filtered modes like
--mode exception)
+ sample_pct = (direct_calls / self.successful_samples * 100) if
self.successful_samples > 0 else 0
+ cumul_pct = (cumulative_calls / self.successful_samples * 100) if
self.successful_samples > 0 else 0
# Calculate trends for all columns using TrendTracker
trends = {}
@@ -684,7 +697,9 @@ def build_stats_list(self):
"cumulative_calls": cumulative_calls,
"total_time": total_time,
"cumulative_time": cumulative_time,
- "trends": trends, # Dictionary of trends for all columns
+ "sample_pct": sample_pct,
+ "cumul_pct": cumul_pct,
+ "trends": trends,
}
)
@@ -696,21 +711,9 @@ def build_stats_list(self):
elif self.sort_by == "cumtime":
stats_list.sort(key=lambda x: x["cumulative_time"], reverse=True)
elif self.sort_by == "sample_pct":
- stats_list.sort(
- key=lambda x: (x["direct_calls"] / self.total_samples * 100)
- if self.total_samples > 0
- else 0,
- reverse=True,
- )
+ stats_list.sort(key=lambda x: x["sample_pct"], reverse=True)
elif self.sort_by == "cumul_pct":
- stats_list.sort(
- key=lambda x: (
- x["cumulative_calls"] / self.total_samples * 100
- )
- if self.total_samples > 0
- else 0,
- reverse=True,
- )
+ stats_list.sort(key=lambda x: x["cumul_pct"], reverse=True)
return stats_list
diff --git a/Lib/profiling/sampling/live_collector/widgets.py
b/Lib/profiling/sampling/live_collector/widgets.py
index 8f72f69b057628..0ee72119b2faf6 100644
--- a/Lib/profiling/sampling/live_collector/widgets.py
+++ b/Lib/profiling/sampling/live_collector/widgets.py
@@ -396,6 +396,8 @@ def draw_thread_status(self, line, width):
total_samples = max(1, thread_data.sample_count)
pct_gc = (thread_data.gc_frame_samples / total_samples) * 100
else:
+ # Use total_samples for GC percentage since gc_frame_samples is
tracked
+ # across ALL samples (via thread status), not just successful ones
total_samples = max(1, self.collector.total_samples)
pct_gc = (self.collector.gc_frame_samples / total_samples) * 100
@@ -529,10 +531,7 @@ def draw_top_functions(self, line, width, stats_list):
continue
func_name = func_data["func"][2]
- func_pct = (
- func_data["direct_calls"]
- / max(1, self.collector.total_samples)
- ) * 100
+ func_pct = func_data["sample_pct"]
# Medal emoji
if col + 3 < width - 15:
@@ -765,19 +764,10 @@ def draw_stats_rows(self, line, height, width,
stats_list, column_flags):
cumulative_calls = stat["cumulative_calls"]
total_time = stat["total_time"]
cumulative_time = stat["cumulative_time"]
+ sample_pct = stat["sample_pct"]
+ cum_pct = stat["cumul_pct"]
trends = stat.get("trends", {})
- sample_pct = (
- (direct_calls / self.collector.total_samples * 100)
- if self.collector.total_samples > 0
- else 0
- )
- cum_pct = (
- (cumulative_calls / self.collector.total_samples * 100)
- if self.collector.total_samples > 0
- else 0
- )
-
# Check if this row is selected
is_selected = show_opcodes and row_idx == selected_row
diff --git a/Lib/profiling/sampling/pstats_collector.py
b/Lib/profiling/sampling/pstats_collector.py
index 8d787c62bb0677..7c154e25828a8f 100644
--- a/Lib/profiling/sampling/pstats_collector.py
+++ b/Lib/profiling/sampling/pstats_collector.py
@@ -16,18 +16,23 @@ def __init__(self, sample_interval_usec, *,
skip_idle=False):
lambda: collections.defaultdict(int)
)
self.skip_idle = skip_idle
+ self._seen_locations = set()
def _process_frames(self, frames):
"""Process a single thread's frame stack."""
if not frames:
return
+ self._seen_locations.clear()
+
# Process each frame in the stack to track cumulative calls
# frame.location is int, tuple (lineno, end_lineno, col_offset,
end_col_offset), or None
for frame in frames:
lineno = extract_lineno(frame.location)
- loc = (frame.filename, lineno, frame.funcname)
- self.result[loc]["cumulative_calls"] += 1
+ location = (frame.filename, lineno, frame.funcname)
+ if location not in self._seen_locations:
+ self._seen_locations.add(location)
+ self.result[location]["cumulative_calls"] += 1
# The top frame gets counted as an inline call (directly executing)
top_lineno = extract_lineno(frames[0].location)
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
index 75c4e79591000b..30615a7d31d86c 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
@@ -87,7 +87,7 @@ def
test_pstats_collector_with_extreme_intervals_and_empty_data(self):
# Should still process the frames
self.assertEqual(len(collector.result), 1)
- # Test collecting duplicate frames in same sample
+ # Test collecting duplicate frames in same sample (recursive function)
test_frames = [
MockInterpreterInfo(
0, # interpreter_id
@@ -96,7 +96,7 @@ def
test_pstats_collector_with_extreme_intervals_and_empty_data(self):
1,
[
MockFrameInfo("file.py", 10, "func1"),
- MockFrameInfo("file.py", 10, "func1"), # Duplicate
+ MockFrameInfo("file.py", 10, "func1"), #
Duplicate (recursion)
],
)
],
@@ -104,9 +104,9 @@ def
test_pstats_collector_with_extreme_intervals_and_empty_data(self):
]
collector = PstatsCollector(sample_interval_usec=1000)
collector.collect(test_frames)
- # Should count both occurrences
+ # Should count only once per sample to avoid over-counting recursive
functions
self.assertEqual(
- collector.result[("file.py", 10, "func1")]["cumulative_calls"], 2
+ collector.result[("file.py", 10, "func1")]["cumulative_calls"], 1
)
def test_pstats_collector_single_frame_stacks(self):
@@ -1205,6 +1205,197 @@ def
test_flamegraph_collector_per_thread_gc_percentage(self):
self.assertAlmostEqual(per_thread_stats[2]["gc_pct"], 10.0, places=1)
+class TestRecursiveFunctionHandling(unittest.TestCase):
+ """Tests for correct handling of recursive functions in cumulative
stats."""
+
+ def test_pstats_collector_recursive_function_single_sample(self):
+ """Test that recursive functions are counted once per sample, not per
occurrence."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Simulate a recursive function appearing 5 times in one sample
+ recursive_frames = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ ],
+ )
+ ],
+ )
+ ]
+ collector.collect(recursive_frames)
+
+ location = ("test.py", 10, "recursive_func")
+ # Should count as 1 cumulative call (present in 1 sample), not 5
+ self.assertEqual(collector.result[location]["cumulative_calls"], 1)
+ # Direct calls should be 1 (top of stack)
+ self.assertEqual(collector.result[location]["direct_calls"], 1)
+
+ def test_pstats_collector_recursive_function_multiple_samples(self):
+ """Test cumulative counting across multiple samples with recursion."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Sample 1: recursive function at depth 3
+ sample1 = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ ],
+ )
+ ],
+ )
+ ]
+ # Sample 2: recursive function at depth 2
+ sample2 = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ ],
+ )
+ ],
+ )
+ ]
+ # Sample 3: recursive function at depth 4
+ sample3 = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ ],
+ )
+ ],
+ )
+ ]
+
+ collector.collect(sample1)
+ collector.collect(sample2)
+ collector.collect(sample3)
+
+ location = ("test.py", 10, "recursive_func")
+ # Should count as 3 cumulative calls (present in 3 samples)
+ # Not 3+2+4=9 which would be the buggy behavior
+ self.assertEqual(collector.result[location]["cumulative_calls"], 3)
+ self.assertEqual(collector.result[location]["direct_calls"], 3)
+
+ def test_pstats_collector_mixed_recursive_and_nonrecursive(self):
+ """Test a call stack with both recursive and non-recursive
functions."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Stack: main -> foo (recursive x3) -> bar
+ frames = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [
+ MockFrameInfo("test.py", 50, "bar"), # top
of stack
+ MockFrameInfo("test.py", 20, "foo"), #
recursive
+ MockFrameInfo("test.py", 20, "foo"), #
recursive
+ MockFrameInfo("test.py", 20, "foo"), #
recursive
+ MockFrameInfo("test.py", 10, "main"), # bottom
+ ],
+ )
+ ],
+ )
+ ]
+ collector.collect(frames)
+
+ # bar: 1 cumulative (in stack), 1 direct (top)
+ self.assertEqual(collector.result[("test.py", 50,
"bar")]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[("test.py", 50,
"bar")]["direct_calls"], 1)
+
+ # foo: 1 cumulative (counted once despite 3 occurrences), 0 direct
+ self.assertEqual(collector.result[("test.py", 20,
"foo")]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[("test.py", 20,
"foo")]["direct_calls"], 0)
+
+ # main: 1 cumulative, 0 direct
+ self.assertEqual(collector.result[("test.py", 10,
"main")]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[("test.py", 10,
"main")]["direct_calls"], 0)
+
+ def test_pstats_collector_cumulative_percentage_cannot_exceed_100(self):
+ """Test that cumulative percentage stays <= 100% even with deep
recursion."""
+ collector = PstatsCollector(sample_interval_usec=1000000) # 1 second
for easy math
+
+ # Collect 10 samples, each with recursive function at depth 100
+ for _ in range(10):
+ frames = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [MockFrameInfo("test.py", 10, "deep_recursive")] *
100,
+ )
+ ],
+ )
+ ]
+ collector.collect(frames)
+
+ location = ("test.py", 10, "deep_recursive")
+ # Cumulative calls should be 10 (number of samples), not 1000
+ self.assertEqual(collector.result[location]["cumulative_calls"], 10)
+
+ # Verify stats calculation gives correct percentage
+ collector.create_stats()
+ stats = collector.stats[location]
+ # stats format: (direct_calls, cumulative_calls, total_time,
cumulative_time, callers)
+ cumulative_calls = stats[1]
+ self.assertEqual(cumulative_calls, 10)
+
+ def
test_pstats_collector_different_lines_same_function_counted_separately(self):
+ """Test that different line numbers in same function are tracked
separately."""
+ collector = PstatsCollector(sample_interval_usec=1000)
+
+ # Function with multiple line numbers (e.g., different call sites
within recursion)
+ frames = [
+ MockInterpreterInfo(
+ 0,
+ [
+ MockThreadInfo(
+ 1,
+ [
+ MockFrameInfo("test.py", 15, "func"), # line 15
+ MockFrameInfo("test.py", 12, "func"), # line 12
+ MockFrameInfo("test.py", 15, "func"), # line 15
again
+ MockFrameInfo("test.py", 10, "func"), # line 10
+ ],
+ )
+ ],
+ )
+ ]
+ collector.collect(frames)
+
+ # Each unique (file, line, func) should be counted once
+ self.assertEqual(collector.result[("test.py", 15,
"func")]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[("test.py", 12,
"func")]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[("test.py", 10,
"func")]["cumulative_calls"], 1)
+
+
class TestLocationHelpers(unittest.TestCase):
"""Tests for location handling helper functions."""
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
index 029952da697751..b98f1e1191429e 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
@@ -121,16 +121,17 @@ def test_recursive_function_call_counting(self):
self.assertIn(fib_key, collector.stats)
self.assertIn(main_key, collector.stats)
- # Fibonacci should have many calls due to recursion
+ # Fibonacci: counted once per sample, not per occurrence
fib_stats = collector.stats[fib_key]
direct_calls, cumulative_calls, tt, ct, callers = fib_stats
- # Should have recorded multiple calls (9 total appearances in samples)
- self.assertEqual(cumulative_calls, 9)
- self.assertGreater(tt, 0) # Should have some total time
- self.assertGreater(ct, 0) # Should have some cumulative time
+ # Should count 3 (present in 3 samples), not 9 (total occurrences)
+ self.assertEqual(cumulative_calls, 3)
+ self.assertEqual(direct_calls, 3) # Top of stack in all samples
+ self.assertGreater(tt, 0)
+ self.assertGreater(ct, 0)
- # Main should have fewer calls
+ # Main should also have 3 cumulative calls (in all 3 samples)
main_stats = collector.stats[main_key]
main_direct_calls, main_cumulative_calls = main_stats[0], main_stats[1]
self.assertEqual(main_direct_calls, 0) # Never directly executing
diff --git
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
index 04e6cd2f1fcb8b..8115ca5528fd65 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
@@ -157,6 +157,70 @@ def test_process_frames_multiple_threads(self):
)
self.assertNotIn(loc1, collector.per_thread_data[456].result)
+ def test_process_recursive_frames_counted_once(self):
+ """Test that recursive functions are counted once per sample."""
+ collector = LiveStatsCollector(1000)
+ # Simulate recursive function appearing 5 times in stack
+ frames = [
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ ]
+ collector.process_frames(frames)
+
+ location = ("test.py", 10, "recursive_func")
+ # Should count as 1 cumulative (present in 1 sample), not 5
+ self.assertEqual(collector.result[location]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[location]["direct_calls"], 1)
+
+ def test_process_recursive_frames_multiple_samples(self):
+ """Test cumulative counting across multiple samples with recursion."""
+ collector = LiveStatsCollector(1000)
+
+ # Sample 1: depth 3
+ frames1 = [
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ ]
+ # Sample 2: depth 2
+ frames2 = [
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ MockFrameInfo("test.py", 10, "recursive_func"),
+ ]
+
+ collector.process_frames(frames1)
+ collector.process_frames(frames2)
+
+ location = ("test.py", 10, "recursive_func")
+ # Should count as 2 (present in 2 samples), not 5
+ self.assertEqual(collector.result[location]["cumulative_calls"], 2)
+ self.assertEqual(collector.result[location]["direct_calls"], 2)
+
+ def test_process_mixed_recursive_nonrecursive(self):
+ """Test stack with both recursive and non-recursive functions."""
+ collector = LiveStatsCollector(1000)
+
+ # Stack: main -> foo (recursive x3) -> bar
+ frames = [
+ MockFrameInfo("test.py", 50, "bar"),
+ MockFrameInfo("test.py", 20, "foo"),
+ MockFrameInfo("test.py", 20, "foo"),
+ MockFrameInfo("test.py", 20, "foo"),
+ MockFrameInfo("test.py", 10, "main"),
+ ]
+ collector.process_frames(frames)
+
+ # foo: 1 cumulative despite 3 occurrences
+ self.assertEqual(collector.result[("test.py", 20,
"foo")]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[("test.py", 20,
"foo")]["direct_calls"], 0)
+
+ # bar and main: 1 cumulative each
+ self.assertEqual(collector.result[("test.py", 50,
"bar")]["cumulative_calls"], 1)
+ self.assertEqual(collector.result[("test.py", 10,
"main")]["cumulative_calls"], 1)
+
class TestLiveStatsCollectorCollect(unittest.TestCase):
"""Tests for the collect method."""
@@ -211,8 +275,11 @@ def test_collect_with_empty_frames(self):
collector.collect(stack_frames)
- # Empty frames still count as successful since collect() was called
successfully
- self.assertEqual(collector.successful_samples, 1)
+ # Empty frames do NOT count as successful - this is important for
+ # filtered modes like --mode exception where most samples may have
+ # no matching data. Only samples with actual frame data are counted.
+ self.assertEqual(collector.successful_samples, 0)
+ self.assertEqual(collector.total_samples, 1)
self.assertEqual(collector.failed_samples, 0)
def test_collect_skip_idle_threads(self):
@@ -257,6 +324,124 @@ def test_collect_multiple_threads(self):
self.assertIn(123, collector.thread_ids)
self.assertIn(124, collector.thread_ids)
+ def test_collect_filtered_mode_percentage_calculation(self):
+ """Test that percentages use successful_samples, not total_samples.
+
+ This is critical for filtered modes like --mode exception where most
+ samples may be filtered out at the C level. The percentages should
+ be relative to samples that actually had frame data, not all attempts.
+ """
+ collector = LiveStatsCollector(1000)
+
+ # Simulate 10 samples where only 2 had matching data (e.g., exception
mode)
+ frames_with_data = [MockFrameInfo("test.py", 10, "exception_handler")]
+ thread_with_data = MockThreadInfo(123, frames_with_data)
+ interpreter_with_data = MockInterpreterInfo(0, [thread_with_data])
+
+ # Empty thread simulates filtered-out data
+ thread_empty = MockThreadInfo(456, [])
+ interpreter_empty = MockInterpreterInfo(0, [thread_empty])
+
+ # 2 samples with data
+ collector.collect([interpreter_with_data])
+ collector.collect([interpreter_with_data])
+
+ # 8 samples without data (filtered out)
+ for _ in range(8):
+ collector.collect([interpreter_empty])
+
+ # Verify counts
+ self.assertEqual(collector.total_samples, 10)
+ self.assertEqual(collector.successful_samples, 2)
+
+ # Build stats and check percentage
+ stats_list = collector.build_stats_list()
+ self.assertEqual(len(stats_list), 1)
+
+ # The function appeared in 2 out of 2 successful samples = 100%
+ # NOT 2 out of 10 total samples = 20%
+ location = ("test.py", 10, "exception_handler")
+ self.assertEqual(collector.result[location]["direct_calls"], 2)
+
+ # Verify the percentage calculation in build_stats_list
+ # direct_calls / successful_samples * 100 = 2/2 * 100 = 100%
+ # This would be 20% if using total_samples incorrectly
+
+ def test_percentage_values_use_successful_samples(self):
+ """Test that percentages are calculated from successful_samples.
+
+ This verifies the fix where percentages use successful_samples
(samples with
+ frame data) instead of total_samples (all sampling attempts). Critical
for
+ filtered modes like --mode exception.
+ """
+ collector = LiveStatsCollector(1000)
+
+ # Simulate scenario: 100 total samples, only 20 had frame data
+ collector.total_samples = 100
+ collector.successful_samples = 20
+
+ # Function appeared in 10 out of 20 successful samples
+ collector.result[("test.py", 10, "handler")] = {
+ "direct_calls": 10,
+ "cumulative_calls": 15,
+ "total_rec_calls": 0,
+ }
+
+ stats_list = collector.build_stats_list()
+ self.assertEqual(len(stats_list), 1)
+
+ stat = stats_list[0]
+ # Calculate expected percentages using successful_samples
+ expected_sample_pct = stat["direct_calls"] /
collector.successful_samples * 100
+ expected_cumul_pct = stat["cumulative_calls"] /
collector.successful_samples * 100
+
+ # Percentage should be 10/20 * 100 = 50%, NOT 10/100 * 100 = 10%
+ self.assertAlmostEqual(expected_sample_pct, 50.0)
+ # Cumulative percentage should be 15/20 * 100 = 75%, NOT 15/100 * 100
= 15%
+ self.assertAlmostEqual(expected_cumul_pct, 75.0)
+
+ # Verify sorting by percentage works correctly
+ collector.result[("test.py", 20, "other")] = {
+ "direct_calls": 5, # 25% of successful samples
+ "cumulative_calls": 8,
+ "total_rec_calls": 0,
+ }
+ collector.sort_by = "sample_pct"
+ stats_list = collector.build_stats_list()
+ # handler (50%) should come before other (25%)
+ self.assertEqual(stats_list[0]["func"][2], "handler")
+ self.assertEqual(stats_list[1]["func"][2], "other")
+
+ def test_build_stats_list_zero_successful_samples(self):
+ """Test build_stats_list handles zero successful_samples without
division by zero.
+
+ When all samples are filtered out (e.g., exception mode with no
exceptions),
+ percentage calculations should return 0 without raising
ZeroDivisionError.
+ """
+ collector = LiveStatsCollector(1000)
+
+ # Edge case: data exists but no successful samples
+ collector.result[("test.py", 10, "func")] = {
+ "direct_calls": 10,
+ "cumulative_calls": 10,
+ "total_rec_calls": 0,
+ }
+ collector.total_samples = 100
+ collector.successful_samples = 0 # All samples filtered out
+
+ # Should not raise ZeroDivisionError
+ stats_list = collector.build_stats_list()
+ self.assertEqual(len(stats_list), 1)
+
+ # Verify percentage-based sorting also works with zero
successful_samples
+ collector.sort_by = "sample_pct"
+ stats_list = collector.build_stats_list()
+ self.assertEqual(len(stats_list), 1)
+
+ collector.sort_by = "cumul_pct"
+ stats_list = collector.build_stats_list()
+ self.assertEqual(len(stats_list), 1)
+
class TestLiveStatsCollectorStatisticsBuilding(unittest.TestCase):
"""Tests for statistics building and sorting."""
@@ -281,6 +466,8 @@ def setUp(self):
"total_rec_calls": 0,
}
self.collector.total_samples = 300
+ # successful_samples is used for percentage calculations
+ self.collector.successful_samples = 300
def test_build_stats_list(self):
"""Test that stats list is built correctly."""
diff --git
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
index b5a387fa3a3a71..2ed9d82a4a4aa2 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
@@ -148,6 +148,7 @@ def test_efficiency_bar_visualization(self):
def test_stats_display_with_different_sort_modes(self):
"""Test that stats are displayed correctly with different sort
modes."""
self.collector.total_samples = 100
+ self.collector.successful_samples = 100 # For percentage calculations
self.collector.result[("a.py", 1, "func_a")] = {
"direct_calls": 10,
"cumulative_calls": 20,
diff --git
a/Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst
b/Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst
new file mode 100644
index 00000000000000..c19100caa36aa5
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst
@@ -0,0 +1,7 @@
+Fix cumulative percentage calculation for recursive functions in the new
+sampling profiler. When profiling recursive functions, cumulative statistics
+(cumul%, cumtime) could exceed 100% because each recursive frame in a stack
+was counted separately. For example, a function recursing 500 times in every
+sample would show 50000% cumulative presence. The fix deduplicates locations
+within each sample so cumulative stats correctly represent "percentage of
+samples where this function was on the stack". Patch by Pablo Galindo.
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]