https://github.com/python/cpython/commit/1356fbed7b1bc0356f0ee3a4bbe140abb25d788d
commit: 1356fbed7b1bc0356f0ee3a4bbe140abb25d788d
branch: main
author: Pablo Galindo Salgado <[email protected]>
committer: pablogsal <[email protected]>
date: 2025-12-12T00:50:17Z
summary:

gh-142374: Fix recursive function cumulative over-counting in sampling profiler 
(#142378)

files:
A Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst
M Lib/profiling/sampling/heatmap_collector.py
M Lib/profiling/sampling/live_collector/collector.py
M Lib/profiling/sampling/live_collector/widgets.py
M Lib/profiling/sampling/pstats_collector.py
M Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
M Lib/test/test_profiling/test_sampling_profiler/test_integration.py
M Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
M Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py

diff --git a/Lib/profiling/sampling/heatmap_collector.py 
b/Lib/profiling/sampling/heatmap_collector.py
index a860ed870e3e40..45649ce2009bb6 100644
--- a/Lib/profiling/sampling/heatmap_collector.py
+++ b/Lib/profiling/sampling/heatmap_collector.py
@@ -491,6 +491,10 @@ def __init__(self, *args, **kwargs):
         # File index (populated during export)
         self.file_index = {}
 
+        # Reusable set for deduplicating line locations within a single sample.
+        # This avoids over-counting recursive functions in cumulative stats.
+        self._seen_lines = set()
+
     def set_stats(self, sample_interval_usec, duration_sec, sample_rate, 
error_rate=None, missed_samples=None, **kwargs):
         """Set profiling statistics to include in heatmap output.
 
@@ -524,6 +528,7 @@ def process_frames(self, frames, thread_id):
             thread_id: Thread ID for this stack trace
         """
         self._total_samples += 1
+        self._seen_lines.clear()
 
         for i, (filename, location, funcname, opcode) in enumerate(frames):
             # Normalize location to 4-tuple format
@@ -533,7 +538,14 @@ def process_frames(self, frames, thread_id):
                 continue
 
             # frames[0] is the leaf - where execution is actually happening
-            self._record_line_sample(filename, lineno, funcname, is_leaf=(i == 
0))
+            is_leaf = (i == 0)
+            line_key = (filename, lineno)
+            count_cumulative = line_key not in self._seen_lines
+            if count_cumulative:
+                self._seen_lines.add(line_key)
+
+            self._record_line_sample(filename, lineno, funcname, 
is_leaf=is_leaf,
+                                     count_cumulative=count_cumulative)
 
             if opcode is not None:
                 # Set opcodes_enabled flag when we first encounter opcode data
@@ -562,11 +574,13 @@ def _is_valid_frame(self, filename, lineno):
 
         return True
 
-    def _record_line_sample(self, filename, lineno, funcname, is_leaf=False):
+    def _record_line_sample(self, filename, lineno, funcname, is_leaf=False,
+                            count_cumulative=True):
         """Record a sample for a specific line."""
         # Track cumulative samples (all occurrences in stack)
-        self.line_samples[(filename, lineno)] += 1
-        self.file_samples[filename][lineno] += 1
+        if count_cumulative:
+            self.line_samples[(filename, lineno)] += 1
+            self.file_samples[filename][lineno] += 1
 
         # Track self/leaf samples (only when at top of stack)
         if is_leaf:
diff --git a/Lib/profiling/sampling/live_collector/collector.py 
b/Lib/profiling/sampling/live_collector/collector.py
index 1652089ad3f52d..de541a75db61c1 100644
--- a/Lib/profiling/sampling/live_collector/collector.py
+++ b/Lib/profiling/sampling/live_collector/collector.py
@@ -210,6 +210,8 @@ def __init__(
         # Trend tracking (initialized after colors are set up)
         self._trend_tracker = None
 
+        self._seen_locations = set()
+
     @property
     def elapsed_time(self):
         """Get the elapsed time, frozen when finished."""
@@ -305,15 +307,18 @@ def process_frames(self, frames, thread_id=None):
 
         # Get per-thread data if tracking per-thread
         thread_data = self._get_or_create_thread_data(thread_id) if thread_id 
is not None else None
+        self._seen_locations.clear()
 
         # Process each frame in the stack to track cumulative calls
         # frame.location is (lineno, end_lineno, col_offset, end_col_offset), 
int, or None
         for frame in frames:
             lineno = extract_lineno(frame.location)
             location = (frame.filename, lineno, frame.funcname)
-            self.result[location]["cumulative_calls"] += 1
-            if thread_data:
-                thread_data.result[location]["cumulative_calls"] += 1
+            if location not in self._seen_locations:
+                self._seen_locations.add(location)
+                self.result[location]["cumulative_calls"] += 1
+                if thread_data:
+                    thread_data.result[location]["cumulative_calls"] += 1
 
         # The top frame gets counted as an inline call (directly executing)
         top_frame = frames[0]
@@ -371,11 +376,13 @@ def collect(self, stack_frames):
                     thread_data.gc_frame_samples += stats["gc_samples"]
 
         # Process frames using pre-selected iterator
+        frames_processed = False
         for frames, thread_id in self._get_frame_iterator(stack_frames):
             if not frames:
                 continue
 
             self.process_frames(frames, thread_id=thread_id)
+            frames_processed = True
 
             # Track thread IDs
             if thread_id is not None and thread_id not in self.thread_ids:
@@ -388,7 +395,11 @@ def collect(self, stack_frames):
         if has_gc_frame:
             self.gc_frame_samples += 1
 
-        self.successful_samples += 1
+        # Only count as successful if we actually processed frames
+        # This is important for modes like --mode exception where most samples
+        # may be filtered out at the C level
+        if frames_processed:
+            self.successful_samples += 1
         self.total_samples += 1
 
         # Handle input on every sample for instant responsiveness
@@ -659,9 +670,11 @@ def build_stats_list(self):
             total_time = direct_calls * self.sample_interval_sec
             cumulative_time = cumulative_calls * self.sample_interval_sec
 
-            # Calculate sample percentages
-            sample_pct = (direct_calls / self.total_samples * 100) if 
self.total_samples > 0 else 0
-            cumul_pct = (cumulative_calls / self.total_samples * 100) if 
self.total_samples > 0 else 0
+            # Calculate sample percentages using successful_samples as 
denominator
+            # This ensures percentages are relative to samples that actually 
had data,
+            # not all sampling attempts (important for filtered modes like 
--mode exception)
+            sample_pct = (direct_calls / self.successful_samples * 100) if 
self.successful_samples > 0 else 0
+            cumul_pct = (cumulative_calls / self.successful_samples * 100) if 
self.successful_samples > 0 else 0
 
             # Calculate trends for all columns using TrendTracker
             trends = {}
@@ -684,7 +697,9 @@ def build_stats_list(self):
                     "cumulative_calls": cumulative_calls,
                     "total_time": total_time,
                     "cumulative_time": cumulative_time,
-                    "trends": trends,  # Dictionary of trends for all columns
+                    "sample_pct": sample_pct,
+                    "cumul_pct": cumul_pct,
+                    "trends": trends,
                 }
             )
 
@@ -696,21 +711,9 @@ def build_stats_list(self):
         elif self.sort_by == "cumtime":
             stats_list.sort(key=lambda x: x["cumulative_time"], reverse=True)
         elif self.sort_by == "sample_pct":
-            stats_list.sort(
-                key=lambda x: (x["direct_calls"] / self.total_samples * 100)
-                if self.total_samples > 0
-                else 0,
-                reverse=True,
-            )
+            stats_list.sort(key=lambda x: x["sample_pct"], reverse=True)
         elif self.sort_by == "cumul_pct":
-            stats_list.sort(
-                key=lambda x: (
-                    x["cumulative_calls"] / self.total_samples * 100
-                )
-                if self.total_samples > 0
-                else 0,
-                reverse=True,
-            )
+            stats_list.sort(key=lambda x: x["cumul_pct"], reverse=True)
 
         return stats_list
 
diff --git a/Lib/profiling/sampling/live_collector/widgets.py 
b/Lib/profiling/sampling/live_collector/widgets.py
index 8f72f69b057628..0ee72119b2faf6 100644
--- a/Lib/profiling/sampling/live_collector/widgets.py
+++ b/Lib/profiling/sampling/live_collector/widgets.py
@@ -396,6 +396,8 @@ def draw_thread_status(self, line, width):
             total_samples = max(1, thread_data.sample_count)
             pct_gc = (thread_data.gc_frame_samples / total_samples) * 100
         else:
+            # Use total_samples for GC percentage since gc_frame_samples is 
tracked
+            # across ALL samples (via thread status), not just successful ones
             total_samples = max(1, self.collector.total_samples)
             pct_gc = (self.collector.gc_frame_samples / total_samples) * 100
 
@@ -529,10 +531,7 @@ def draw_top_functions(self, line, width, stats_list):
                 continue
 
             func_name = func_data["func"][2]
-            func_pct = (
-                func_data["direct_calls"]
-                / max(1, self.collector.total_samples)
-            ) * 100
+            func_pct = func_data["sample_pct"]
 
             # Medal emoji
             if col + 3 < width - 15:
@@ -765,19 +764,10 @@ def draw_stats_rows(self, line, height, width, 
stats_list, column_flags):
             cumulative_calls = stat["cumulative_calls"]
             total_time = stat["total_time"]
             cumulative_time = stat["cumulative_time"]
+            sample_pct = stat["sample_pct"]
+            cum_pct = stat["cumul_pct"]
             trends = stat.get("trends", {})
 
-            sample_pct = (
-                (direct_calls / self.collector.total_samples * 100)
-                if self.collector.total_samples > 0
-                else 0
-            )
-            cum_pct = (
-                (cumulative_calls / self.collector.total_samples * 100)
-                if self.collector.total_samples > 0
-                else 0
-            )
-
             # Check if this row is selected
             is_selected = show_opcodes and row_idx == selected_row
 
diff --git a/Lib/profiling/sampling/pstats_collector.py 
b/Lib/profiling/sampling/pstats_collector.py
index 8d787c62bb0677..7c154e25828a8f 100644
--- a/Lib/profiling/sampling/pstats_collector.py
+++ b/Lib/profiling/sampling/pstats_collector.py
@@ -16,18 +16,23 @@ def __init__(self, sample_interval_usec, *, 
skip_idle=False):
             lambda: collections.defaultdict(int)
         )
         self.skip_idle = skip_idle
+        self._seen_locations = set()
 
     def _process_frames(self, frames):
         """Process a single thread's frame stack."""
         if not frames:
             return
 
+        self._seen_locations.clear()
+
         # Process each frame in the stack to track cumulative calls
         # frame.location is int, tuple (lineno, end_lineno, col_offset, 
end_col_offset), or None
         for frame in frames:
             lineno = extract_lineno(frame.location)
-            loc = (frame.filename, lineno, frame.funcname)
-            self.result[loc]["cumulative_calls"] += 1
+            location = (frame.filename, lineno, frame.funcname)
+            if location not in self._seen_locations:
+                self._seen_locations.add(location)
+                self.result[location]["cumulative_calls"] += 1
 
         # The top frame gets counted as an inline call (directly executing)
         top_lineno = extract_lineno(frames[0].location)
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
index 75c4e79591000b..30615a7d31d86c 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_collectors.py
@@ -87,7 +87,7 @@ def 
test_pstats_collector_with_extreme_intervals_and_empty_data(self):
         # Should still process the frames
         self.assertEqual(len(collector.result), 1)
 
-        # Test collecting duplicate frames in same sample
+        # Test collecting duplicate frames in same sample (recursive function)
         test_frames = [
             MockInterpreterInfo(
                 0,  # interpreter_id
@@ -96,7 +96,7 @@ def 
test_pstats_collector_with_extreme_intervals_and_empty_data(self):
                         1,
                         [
                             MockFrameInfo("file.py", 10, "func1"),
-                            MockFrameInfo("file.py", 10, "func1"),  # Duplicate
+                            MockFrameInfo("file.py", 10, "func1"),  # 
Duplicate (recursion)
                         ],
                     )
                 ],
@@ -104,9 +104,9 @@ def 
test_pstats_collector_with_extreme_intervals_and_empty_data(self):
         ]
         collector = PstatsCollector(sample_interval_usec=1000)
         collector.collect(test_frames)
-        # Should count both occurrences
+        # Should count only once per sample to avoid over-counting recursive 
functions
         self.assertEqual(
-            collector.result[("file.py", 10, "func1")]["cumulative_calls"], 2
+            collector.result[("file.py", 10, "func1")]["cumulative_calls"], 1
         )
 
     def test_pstats_collector_single_frame_stacks(self):
@@ -1205,6 +1205,197 @@ def 
test_flamegraph_collector_per_thread_gc_percentage(self):
         self.assertAlmostEqual(per_thread_stats[2]["gc_pct"], 10.0, places=1)
 
 
+class TestRecursiveFunctionHandling(unittest.TestCase):
+    """Tests for correct handling of recursive functions in cumulative 
stats."""
+
+    def test_pstats_collector_recursive_function_single_sample(self):
+        """Test that recursive functions are counted once per sample, not per 
occurrence."""
+        collector = PstatsCollector(sample_interval_usec=1000)
+
+        # Simulate a recursive function appearing 5 times in one sample
+        recursive_frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        1,
+                        [
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                        ],
+                    )
+                ],
+            )
+        ]
+        collector.collect(recursive_frames)
+
+        location = ("test.py", 10, "recursive_func")
+        # Should count as 1 cumulative call (present in 1 sample), not 5
+        self.assertEqual(collector.result[location]["cumulative_calls"], 1)
+        # Direct calls should be 1 (top of stack)
+        self.assertEqual(collector.result[location]["direct_calls"], 1)
+
+    def test_pstats_collector_recursive_function_multiple_samples(self):
+        """Test cumulative counting across multiple samples with recursion."""
+        collector = PstatsCollector(sample_interval_usec=1000)
+
+        # Sample 1: recursive function at depth 3
+        sample1 = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        1,
+                        [
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                        ],
+                    )
+                ],
+            )
+        ]
+        # Sample 2: recursive function at depth 2
+        sample2 = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        1,
+                        [
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                        ],
+                    )
+                ],
+            )
+        ]
+        # Sample 3: recursive function at depth 4
+        sample3 = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        1,
+                        [
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                            MockFrameInfo("test.py", 10, "recursive_func"),
+                        ],
+                    )
+                ],
+            )
+        ]
+
+        collector.collect(sample1)
+        collector.collect(sample2)
+        collector.collect(sample3)
+
+        location = ("test.py", 10, "recursive_func")
+        # Should count as 3 cumulative calls (present in 3 samples)
+        # Not 3+2+4=9 which would be the buggy behavior
+        self.assertEqual(collector.result[location]["cumulative_calls"], 3)
+        self.assertEqual(collector.result[location]["direct_calls"], 3)
+
+    def test_pstats_collector_mixed_recursive_and_nonrecursive(self):
+        """Test a call stack with both recursive and non-recursive 
functions."""
+        collector = PstatsCollector(sample_interval_usec=1000)
+
+        # Stack: main -> foo (recursive x3) -> bar
+        frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        1,
+                        [
+                            MockFrameInfo("test.py", 50, "bar"),       # top 
of stack
+                            MockFrameInfo("test.py", 20, "foo"),      # 
recursive
+                            MockFrameInfo("test.py", 20, "foo"),      # 
recursive
+                            MockFrameInfo("test.py", 20, "foo"),      # 
recursive
+                            MockFrameInfo("test.py", 10, "main"),     # bottom
+                        ],
+                    )
+                ],
+            )
+        ]
+        collector.collect(frames)
+
+        # bar: 1 cumulative (in stack), 1 direct (top)
+        self.assertEqual(collector.result[("test.py", 50, 
"bar")]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[("test.py", 50, 
"bar")]["direct_calls"], 1)
+
+        # foo: 1 cumulative (counted once despite 3 occurrences), 0 direct
+        self.assertEqual(collector.result[("test.py", 20, 
"foo")]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[("test.py", 20, 
"foo")]["direct_calls"], 0)
+
+        # main: 1 cumulative, 0 direct
+        self.assertEqual(collector.result[("test.py", 10, 
"main")]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[("test.py", 10, 
"main")]["direct_calls"], 0)
+
+    def test_pstats_collector_cumulative_percentage_cannot_exceed_100(self):
+        """Test that cumulative percentage stays <= 100% even with deep 
recursion."""
+        collector = PstatsCollector(sample_interval_usec=1000000)  # 1 second 
for easy math
+
+        # Collect 10 samples, each with recursive function at depth 100
+        for _ in range(10):
+            frames = [
+                MockInterpreterInfo(
+                    0,
+                    [
+                        MockThreadInfo(
+                            1,
+                            [MockFrameInfo("test.py", 10, "deep_recursive")] * 
100,
+                        )
+                    ],
+                )
+            ]
+            collector.collect(frames)
+
+        location = ("test.py", 10, "deep_recursive")
+        # Cumulative calls should be 10 (number of samples), not 1000
+        self.assertEqual(collector.result[location]["cumulative_calls"], 10)
+
+        # Verify stats calculation gives correct percentage
+        collector.create_stats()
+        stats = collector.stats[location]
+        # stats format: (direct_calls, cumulative_calls, total_time, 
cumulative_time, callers)
+        cumulative_calls = stats[1]
+        self.assertEqual(cumulative_calls, 10)
+
+    def 
test_pstats_collector_different_lines_same_function_counted_separately(self):
+        """Test that different line numbers in same function are tracked 
separately."""
+        collector = PstatsCollector(sample_interval_usec=1000)
+
+        # Function with multiple line numbers (e.g., different call sites 
within recursion)
+        frames = [
+            MockInterpreterInfo(
+                0,
+                [
+                    MockThreadInfo(
+                        1,
+                        [
+                            MockFrameInfo("test.py", 15, "func"),  # line 15
+                            MockFrameInfo("test.py", 12, "func"),  # line 12
+                            MockFrameInfo("test.py", 15, "func"),  # line 15 
again
+                            MockFrameInfo("test.py", 10, "func"),  # line 10
+                        ],
+                    )
+                ],
+            )
+        ]
+        collector.collect(frames)
+
+        # Each unique (file, line, func) should be counted once
+        self.assertEqual(collector.result[("test.py", 15, 
"func")]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[("test.py", 12, 
"func")]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[("test.py", 10, 
"func")]["cumulative_calls"], 1)
+
+
 class TestLocationHelpers(unittest.TestCase):
     """Tests for location handling helper functions."""
 
diff --git a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
index 029952da697751..b98f1e1191429e 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_integration.py
@@ -121,16 +121,17 @@ def test_recursive_function_call_counting(self):
         self.assertIn(fib_key, collector.stats)
         self.assertIn(main_key, collector.stats)
 
-        # Fibonacci should have many calls due to recursion
+        # Fibonacci: counted once per sample, not per occurrence
         fib_stats = collector.stats[fib_key]
         direct_calls, cumulative_calls, tt, ct, callers = fib_stats
 
-        # Should have recorded multiple calls (9 total appearances in samples)
-        self.assertEqual(cumulative_calls, 9)
-        self.assertGreater(tt, 0)  # Should have some total time
-        self.assertGreater(ct, 0)  # Should have some cumulative time
+        # Should count 3 (present in 3 samples), not 9 (total occurrences)
+        self.assertEqual(cumulative_calls, 3)
+        self.assertEqual(direct_calls, 3)  # Top of stack in all samples
+        self.assertGreater(tt, 0)
+        self.assertGreater(ct, 0)
 
-        # Main should have fewer calls
+        # Main should also have 3 cumulative calls (in all 3 samples)
         main_stats = collector.stats[main_key]
         main_direct_calls, main_cumulative_calls = main_stats[0], main_stats[1]
         self.assertEqual(main_direct_calls, 0)  # Never directly executing
diff --git 
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
index 04e6cd2f1fcb8b..8115ca5528fd65 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_core.py
@@ -157,6 +157,70 @@ def test_process_frames_multiple_threads(self):
         )
         self.assertNotIn(loc1, collector.per_thread_data[456].result)
 
+    def test_process_recursive_frames_counted_once(self):
+        """Test that recursive functions are counted once per sample."""
+        collector = LiveStatsCollector(1000)
+        # Simulate recursive function appearing 5 times in stack
+        frames = [
+            MockFrameInfo("test.py", 10, "recursive_func"),
+            MockFrameInfo("test.py", 10, "recursive_func"),
+            MockFrameInfo("test.py", 10, "recursive_func"),
+            MockFrameInfo("test.py", 10, "recursive_func"),
+            MockFrameInfo("test.py", 10, "recursive_func"),
+        ]
+        collector.process_frames(frames)
+
+        location = ("test.py", 10, "recursive_func")
+        # Should count as 1 cumulative (present in 1 sample), not 5
+        self.assertEqual(collector.result[location]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[location]["direct_calls"], 1)
+
+    def test_process_recursive_frames_multiple_samples(self):
+        """Test cumulative counting across multiple samples with recursion."""
+        collector = LiveStatsCollector(1000)
+
+        # Sample 1: depth 3
+        frames1 = [
+            MockFrameInfo("test.py", 10, "recursive_func"),
+            MockFrameInfo("test.py", 10, "recursive_func"),
+            MockFrameInfo("test.py", 10, "recursive_func"),
+        ]
+        # Sample 2: depth 2
+        frames2 = [
+            MockFrameInfo("test.py", 10, "recursive_func"),
+            MockFrameInfo("test.py", 10, "recursive_func"),
+        ]
+
+        collector.process_frames(frames1)
+        collector.process_frames(frames2)
+
+        location = ("test.py", 10, "recursive_func")
+        # Should count as 2 (present in 2 samples), not 5
+        self.assertEqual(collector.result[location]["cumulative_calls"], 2)
+        self.assertEqual(collector.result[location]["direct_calls"], 2)
+
+    def test_process_mixed_recursive_nonrecursive(self):
+        """Test stack with both recursive and non-recursive functions."""
+        collector = LiveStatsCollector(1000)
+
+        # Stack: main -> foo (recursive x3) -> bar
+        frames = [
+            MockFrameInfo("test.py", 50, "bar"),
+            MockFrameInfo("test.py", 20, "foo"),
+            MockFrameInfo("test.py", 20, "foo"),
+            MockFrameInfo("test.py", 20, "foo"),
+            MockFrameInfo("test.py", 10, "main"),
+        ]
+        collector.process_frames(frames)
+
+        # foo: 1 cumulative despite 3 occurrences
+        self.assertEqual(collector.result[("test.py", 20, 
"foo")]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[("test.py", 20, 
"foo")]["direct_calls"], 0)
+
+        # bar and main: 1 cumulative each
+        self.assertEqual(collector.result[("test.py", 50, 
"bar")]["cumulative_calls"], 1)
+        self.assertEqual(collector.result[("test.py", 10, 
"main")]["cumulative_calls"], 1)
+
 
 class TestLiveStatsCollectorCollect(unittest.TestCase):
     """Tests for the collect method."""
@@ -211,8 +275,11 @@ def test_collect_with_empty_frames(self):
 
         collector.collect(stack_frames)
 
-        # Empty frames still count as successful since collect() was called 
successfully
-        self.assertEqual(collector.successful_samples, 1)
+        # Empty frames do NOT count as successful - this is important for
+        # filtered modes like --mode exception where most samples may have
+        # no matching data. Only samples with actual frame data are counted.
+        self.assertEqual(collector.successful_samples, 0)
+        self.assertEqual(collector.total_samples, 1)
         self.assertEqual(collector.failed_samples, 0)
 
     def test_collect_skip_idle_threads(self):
@@ -257,6 +324,124 @@ def test_collect_multiple_threads(self):
         self.assertIn(123, collector.thread_ids)
         self.assertIn(124, collector.thread_ids)
 
+    def test_collect_filtered_mode_percentage_calculation(self):
+        """Test that percentages use successful_samples, not total_samples.
+
+        This is critical for filtered modes like --mode exception where most
+        samples may be filtered out at the C level. The percentages should
+        be relative to samples that actually had frame data, not all attempts.
+        """
+        collector = LiveStatsCollector(1000)
+
+        # Simulate 10 samples where only 2 had matching data (e.g., exception 
mode)
+        frames_with_data = [MockFrameInfo("test.py", 10, "exception_handler")]
+        thread_with_data = MockThreadInfo(123, frames_with_data)
+        interpreter_with_data = MockInterpreterInfo(0, [thread_with_data])
+
+        # Empty thread simulates filtered-out data
+        thread_empty = MockThreadInfo(456, [])
+        interpreter_empty = MockInterpreterInfo(0, [thread_empty])
+
+        # 2 samples with data
+        collector.collect([interpreter_with_data])
+        collector.collect([interpreter_with_data])
+
+        # 8 samples without data (filtered out)
+        for _ in range(8):
+            collector.collect([interpreter_empty])
+
+        # Verify counts
+        self.assertEqual(collector.total_samples, 10)
+        self.assertEqual(collector.successful_samples, 2)
+
+        # Build stats and check percentage
+        stats_list = collector.build_stats_list()
+        self.assertEqual(len(stats_list), 1)
+
+        # The function appeared in 2 out of 2 successful samples = 100%
+        # NOT 2 out of 10 total samples = 20%
+        location = ("test.py", 10, "exception_handler")
+        self.assertEqual(collector.result[location]["direct_calls"], 2)
+
+        # Verify the percentage calculation in build_stats_list
+        # direct_calls / successful_samples * 100 = 2/2 * 100 = 100%
+        # This would be 20% if using total_samples incorrectly
+
+    def test_percentage_values_use_successful_samples(self):
+        """Test that percentages are calculated from successful_samples.
+
+        This verifies the fix where percentages use successful_samples 
(samples with
+        frame data) instead of total_samples (all sampling attempts). Critical 
for
+        filtered modes like --mode exception.
+        """
+        collector = LiveStatsCollector(1000)
+
+        # Simulate scenario: 100 total samples, only 20 had frame data
+        collector.total_samples = 100
+        collector.successful_samples = 20
+
+        # Function appeared in 10 out of 20 successful samples
+        collector.result[("test.py", 10, "handler")] = {
+            "direct_calls": 10,
+            "cumulative_calls": 15,
+            "total_rec_calls": 0,
+        }
+
+        stats_list = collector.build_stats_list()
+        self.assertEqual(len(stats_list), 1)
+
+        stat = stats_list[0]
+        # Calculate expected percentages using successful_samples
+        expected_sample_pct = stat["direct_calls"] / 
collector.successful_samples * 100
+        expected_cumul_pct = stat["cumulative_calls"] / 
collector.successful_samples * 100
+
+        # Percentage should be 10/20 * 100 = 50%, NOT 10/100 * 100 = 10%
+        self.assertAlmostEqual(expected_sample_pct, 50.0)
+        # Cumulative percentage should be 15/20 * 100 = 75%, NOT 15/100 * 100 
= 15%
+        self.assertAlmostEqual(expected_cumul_pct, 75.0)
+
+        # Verify sorting by percentage works correctly
+        collector.result[("test.py", 20, "other")] = {
+            "direct_calls": 5,  # 25% of successful samples
+            "cumulative_calls": 8,
+            "total_rec_calls": 0,
+        }
+        collector.sort_by = "sample_pct"
+        stats_list = collector.build_stats_list()
+        # handler (50%) should come before other (25%)
+        self.assertEqual(stats_list[0]["func"][2], "handler")
+        self.assertEqual(stats_list[1]["func"][2], "other")
+
+    def test_build_stats_list_zero_successful_samples(self):
+        """Test build_stats_list handles zero successful_samples without 
division by zero.
+
+        When all samples are filtered out (e.g., exception mode with no 
exceptions),
+        percentage calculations should return 0 without raising 
ZeroDivisionError.
+        """
+        collector = LiveStatsCollector(1000)
+
+        # Edge case: data exists but no successful samples
+        collector.result[("test.py", 10, "func")] = {
+            "direct_calls": 10,
+            "cumulative_calls": 10,
+            "total_rec_calls": 0,
+        }
+        collector.total_samples = 100
+        collector.successful_samples = 0  # All samples filtered out
+
+        # Should not raise ZeroDivisionError
+        stats_list = collector.build_stats_list()
+        self.assertEqual(len(stats_list), 1)
+
+        # Verify percentage-based sorting also works with zero 
successful_samples
+        collector.sort_by = "sample_pct"
+        stats_list = collector.build_stats_list()
+        self.assertEqual(len(stats_list), 1)
+
+        collector.sort_by = "cumul_pct"
+        stats_list = collector.build_stats_list()
+        self.assertEqual(len(stats_list), 1)
+
 
 class TestLiveStatsCollectorStatisticsBuilding(unittest.TestCase):
     """Tests for statistics building and sorting."""
@@ -281,6 +466,8 @@ def setUp(self):
             "total_rec_calls": 0,
         }
         self.collector.total_samples = 300
+        # successful_samples is used for percentage calculations
+        self.collector.successful_samples = 300
 
     def test_build_stats_list(self):
         """Test that stats list is built correctly."""
diff --git 
a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py 
b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
index b5a387fa3a3a71..2ed9d82a4a4aa2 100644
--- a/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
+++ b/Lib/test/test_profiling/test_sampling_profiler/test_live_collector_ui.py
@@ -148,6 +148,7 @@ def test_efficiency_bar_visualization(self):
     def test_stats_display_with_different_sort_modes(self):
         """Test that stats are displayed correctly with different sort 
modes."""
         self.collector.total_samples = 100
+        self.collector.successful_samples = 100  # For percentage calculations
         self.collector.result[("a.py", 1, "func_a")] = {
             "direct_calls": 10,
             "cumulative_calls": 20,
diff --git 
a/Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst 
b/Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst
new file mode 100644
index 00000000000000..c19100caa36aa5
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-12-07-13-37-18.gh-issue-142374.m3EF9E.rst
@@ -0,0 +1,7 @@
+Fix cumulative percentage calculation for recursive functions in the new
+sampling profiler. When profiling recursive functions, cumulative statistics
+(cumul%, cumtime) could exceed 100% because each recursive frame in a stack
+was counted separately. For example, a function recursing 500 times in every
+sample would show 50000% cumulative presence. The fix deduplicates locations
+within each sample so cumulative stats correctly represent "percentage of
+samples where this function was on the stack". Patch by Pablo Galindo.

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]

Reply via email to