https://github.com/python/cpython/commit/7acf1fb5a70776429bd99e741d69471eb2d1c1bb
commit: 7acf1fb5a70776429bd99e741d69471eb2d1c1bb
branch: main
author: Petr Viktorin <encu...@gmail.com>
committer: encukou <encu...@gmail.com>
date: 2024-02-28T12:53:48+01:00
summary:

gh-114911: Add CPUStopwatch test helper (GH-114912)

A few of our tests measure the time of CPU-bound operation, mainly
to avoid quadratic or worse behaviour.
Add a helper to ignore GC and time spent in other processes.

files:
M Lib/test/support/__init__.py
M Lib/test/test_int.py
M Lib/test/test_re.py

diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
index 1d03ec0f5bd12b..401b2ce1fe213c 100644
--- a/Lib/test/support/__init__.py
+++ b/Lib/test/support/__init__.py
@@ -2381,6 +2381,46 @@ def sleeping_retry(timeout, err_msg=None, /,
         delay = min(delay * 2, max_delay)
 
 
+class CPUStopwatch:
+    """Context manager to roughly time a CPU-bound operation.
+
+    Disables GC. Uses CPU time if it can (i.e. excludes sleeps & time of
+    other processes).
+
+    N.B.:
+    - This *includes* time spent in other threads.
+    - Some systems only have a coarse resolution; check
+      stopwatch.clock_info.rseolution if.
+
+    Usage:
+
+    with ProcessStopwatch() as stopwatch:
+        ...
+    elapsed = stopwatch.seconds
+    resolution = stopwatch.clock_info.resolution
+    """
+    def __enter__(self):
+        get_time = time.process_time
+        clock_info = time.get_clock_info('process_time')
+        if get_time() <= 0:  # some platforms like WASM lack process_time()
+            get_time = time.monotonic
+            clock_info = time.get_clock_info('monotonic')
+        self.context = disable_gc()
+        self.context.__enter__()
+        self.get_time = get_time
+        self.clock_info = clock_info
+        self.start_time = get_time()
+        return self
+
+    def __exit__(self, *exc):
+        try:
+            end_time = self.get_time()
+        finally:
+            result = self.context.__exit__(*exc)
+        self.seconds = end_time - self.start_time
+        return result
+
+
 @contextlib.contextmanager
 def adjust_int_max_str_digits(max_digits):
     """Temporarily change the integer string conversion length limit."""
diff --git a/Lib/test/test_int.py b/Lib/test/test_int.py
index 0bf55facad9fed..47fc50a0e20349 100644
--- a/Lib/test/test_int.py
+++ b/Lib/test/test_int.py
@@ -664,84 +664,78 @@ def test_denial_of_service_prevented_int_to_str(self):
         """Regression test: ensure we fail before performing O(N**2) work."""
         maxdigits = sys.get_int_max_str_digits()
         assert maxdigits < 50_000, maxdigits  # A test prerequisite.
-        get_time = time.process_time
-        if get_time() <= 0:  # some platforms like WASM lack process_time()
-            get_time = time.monotonic
 
         huge_int = int(f'0x{"c"*65_000}', base=16)  # 78268 decimal digits.
         digits = 78_268
-        with support.adjust_int_max_str_digits(digits):
-            start = get_time()
+        with (
+                support.adjust_int_max_str_digits(digits),
+                support.CPUStopwatch() as sw_convert):
             huge_decimal = str(huge_int)
-        seconds_to_convert = get_time() - start
         self.assertEqual(len(huge_decimal), digits)
         # Ensuring that we chose a slow enough conversion to measure.
         # It takes 0.1 seconds on a Zen based cloud VM in an opt build.
         # Some OSes have a low res 1/64s timer, skip if hard to measure.
-        if seconds_to_convert < 1/64:
+        if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
             raise unittest.SkipTest('"slow" conversion took only '
-                                    f'{seconds_to_convert} seconds.')
+                                    f'{sw_convert.seconds} seconds.')
 
         # We test with the limit almost at the size needed to check 
performance.
         # The performant limit check is slightly fuzzy, give it a some room.
         with support.adjust_int_max_str_digits(int(.995 * digits)):
-            with self.assertRaises(ValueError) as err:
-                start = get_time()
+            with (
+                    self.assertRaises(ValueError) as err,
+                    support.CPUStopwatch() as sw_fail_huge):
                 str(huge_int)
-            seconds_to_fail_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
+        self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)
 
         # Now we test that a conversion that would take 30x as long also fails
         # in a similarly fast fashion.
         extra_huge_int = int(f'0x{"c"*500_000}', base=16)  # 602060 digits.
-        with self.assertRaises(ValueError) as err:
-            start = get_time()
+        with (
+                self.assertRaises(ValueError) as err,
+                support.CPUStopwatch() as sw_fail_extra_huge):
             # If not limited, 8 seconds said Zen based cloud VM.
             str(extra_huge_int)
-        seconds_to_fail_extra_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLess(seconds_to_fail_extra_huge, seconds_to_convert/2)
+        self.assertLess(sw_fail_extra_huge.seconds, sw_convert.seconds/2)
 
     def test_denial_of_service_prevented_str_to_int(self):
         """Regression test: ensure we fail before performing O(N**2) work."""
         maxdigits = sys.get_int_max_str_digits()
         assert maxdigits < 100_000, maxdigits  # A test prerequisite.
-        get_time = time.process_time
-        if get_time() <= 0:  # some platforms like WASM lack process_time()
-            get_time = time.monotonic
 
         digits = 133700
         huge = '8'*digits
-        with support.adjust_int_max_str_digits(digits):
-            start = get_time()
+        with (
+                support.adjust_int_max_str_digits(digits),
+                support.CPUStopwatch() as sw_convert):
             int(huge)
-        seconds_to_convert = get_time() - start
         # Ensuring that we chose a slow enough conversion to measure.
         # It takes 0.1 seconds on a Zen based cloud VM in an opt build.
         # Some OSes have a low res 1/64s timer, skip if hard to measure.
-        if seconds_to_convert < 1/64:
+        if sw_convert.seconds < sw_convert.clock_info.resolution * 2:
             raise unittest.SkipTest('"slow" conversion took only '
-                                    f'{seconds_to_convert} seconds.')
+                                    f'{sw_convert.seconds} seconds.')
 
         with support.adjust_int_max_str_digits(digits - 1):
-            with self.assertRaises(ValueError) as err:
-                start = get_time()
+            with (
+                    self.assertRaises(ValueError) as err,
+                    support.CPUStopwatch() as sw_fail_huge):
                 int(huge)
-            seconds_to_fail_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLessEqual(seconds_to_fail_huge, seconds_to_convert/2)
+        self.assertLessEqual(sw_fail_huge.seconds, sw_convert.seconds/2)
 
         # Now we test that a conversion that would take 30x as long also fails
         # in a similarly fast fashion.
         extra_huge = '7'*1_200_000
-        with self.assertRaises(ValueError) as err:
-            start = get_time()
+        with (
+                self.assertRaises(ValueError) as err,
+                support.CPUStopwatch() as sw_fail_extra_huge):
             # If not limited, 8 seconds in the Zen based cloud VM.
             int(extra_huge)
-        seconds_to_fail_extra_huge = get_time() - start
         self.assertIn('conversion', str(err.exception))
-        self.assertLessEqual(seconds_to_fail_extra_huge, seconds_to_convert/2)
+        self.assertLessEqual(sw_fail_extra_huge.seconds, sw_convert.seconds/2)
 
     def test_power_of_two_bases_unlimited(self):
         """The limit does not apply to power of 2 bases."""
diff --git a/Lib/test/test_re.py b/Lib/test/test_re.py
index 993a7d6e264a1f..b1ac22c28cf7c1 100644
--- a/Lib/test/test_re.py
+++ b/Lib/test/test_re.py
@@ -1,7 +1,7 @@
 from test.support import (gc_collect, bigmemtest, _2G,
                           cpython_only, captured_stdout,
                           check_disallow_instantiation, is_emscripten, is_wasi,
-                          warnings_helper, SHORT_TIMEOUT)
+                          warnings_helper, SHORT_TIMEOUT, CPUStopwatch)
 import locale
 import re
 import string
@@ -2284,17 +2284,16 @@ def test_bug_40736(self):
 
     def test_search_anchor_at_beginning(self):
         s = 'x'*10**7
-        start = time.perf_counter()
-        for p in r'\Ay', r'^y':
-            self.assertIsNone(re.search(p, s))
-            self.assertEqual(re.split(p, s), [s])
-            self.assertEqual(re.findall(p, s), [])
-            self.assertEqual(list(re.finditer(p, s)), [])
-            self.assertEqual(re.sub(p, '', s), s)
-        t = time.perf_counter() - start
+        with CPUStopwatch() as stopwatch:
+            for p in r'\Ay', r'^y':
+                self.assertIsNone(re.search(p, s))
+                self.assertEqual(re.split(p, s), [s])
+                self.assertEqual(re.findall(p, s), [])
+                self.assertEqual(list(re.finditer(p, s)), [])
+                self.assertEqual(re.sub(p, '', s), s)
         # Without optimization it takes 1 second on my computer.
         # With optimization -- 0.0003 seconds.
-        self.assertLess(t, 0.1)
+        self.assertLess(stopwatch.seconds, 0.1)
 
     def test_possessive_quantifiers(self):
         """Test Possessive Quantifiers

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to