https://github.com/python/cpython/commit/0c088e44428d74d701fe1fc80a4cb4fe124c43f0
commit: 0c088e44428d74d701fe1fc80a4cb4fe124c43f0
branch: 3.13
author: Sam Gross <colesb...@gmail.com>
committer: colesbury <colesb...@gmail.com>
date: 2025-03-06T13:49:03-05:00
summary:

[3.13] gh-128364: Fix flaky `test_concurrent_futures.test_wait` tests 
(gh-130742) (#130922)

Use events instead of relying on `time.sleep()`. The tests are also now about
four times faster.
(cherry picked from commit c4d37eefb7b33f10f776183447af44faf8efa7f6)

files:
M Lib/test/test_concurrent_futures/test_wait.py
M Lib/test/test_concurrent_futures/util.py

diff --git a/Lib/test/test_concurrent_futures/test_wait.py 
b/Lib/test/test_concurrent_futures/test_wait.py
index 108cf54bf79e6f..cc387883141b0e 100644
--- a/Lib/test/test_concurrent_futures/test_wait.py
+++ b/Lib/test/test_concurrent_futures/test_wait.py
@@ -1,9 +1,9 @@
 import sys
 import threading
-import time
 import unittest
 from concurrent import futures
 from test import support
+from test.support import threading_helper
 
 from .util import (
     CANCELLED_FUTURE, CANCELLED_AND_NOTIFIED_FUTURE, EXCEPTION_FUTURE,
@@ -16,15 +16,15 @@
 def mul(x, y):
     return x * y
 
-def sleep_and_raise(t):
-    time.sleep(t)
+def wait_and_raise(e):
+    e.wait()
     raise Exception('this is an exception')
 
 
 class WaitTests:
     def test_20369(self):
         # See https://bugs.python.org/issue20369
-        future = self.executor.submit(time.sleep, 1.5)
+        future = self.executor.submit(mul, 1, 2)
         done, not_done = futures.wait([future, future],
                             return_when=futures.ALL_COMPLETED)
         self.assertEqual({future}, done)
@@ -32,66 +32,102 @@ def test_20369(self):
 
 
     def test_first_completed(self):
+        event = self.create_event()
         future1 = self.executor.submit(mul, 21, 2)
-        future2 = self.executor.submit(time.sleep, 1.5)
+        future2 = self.executor.submit(event.wait)
 
-        done, not_done = futures.wait(
-                [CANCELLED_FUTURE, future1, future2],
-                 return_when=futures.FIRST_COMPLETED)
+        try:
+            done, not_done = futures.wait(
+                    [CANCELLED_FUTURE, future1, future2],
+                     return_when=futures.FIRST_COMPLETED)
 
-        self.assertEqual(set([future1]), done)
-        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+            self.assertEqual(set([future1]), done)
+            self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
+        finally:
+            event.set()
+        future2.result()  # wait for job to finish
 
     def test_first_completed_some_already_completed(self):
-        future1 = self.executor.submit(time.sleep, 1.5)
-
-        finished, pending = futures.wait(
-                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
-                 return_when=futures.FIRST_COMPLETED)
+        event = self.create_event()
+        future1 = self.executor.submit(event.wait)
 
-        self.assertEqual(
-                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
-                finished)
-        self.assertEqual(set([future1]), pending)
+        try:
+            finished, pending = futures.wait(
+                     [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, 
future1],
+                     return_when=futures.FIRST_COMPLETED)
+
+            self.assertEqual(
+                    set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
+                    finished)
+            self.assertEqual(set([future1]), pending)
+        finally:
+            event.set()
+        future1.result()  # wait for job to finish
 
-    @support.requires_resource('walltime')
     def test_first_exception(self):
-        future1 = self.executor.submit(mul, 2, 21)
-        future2 = self.executor.submit(sleep_and_raise, 1.5)
-        future3 = self.executor.submit(time.sleep, 3)
+        event1 = self.create_event()
+        event2 = self.create_event()
+        try:
+            future1 = self.executor.submit(mul, 2, 21)
+            future2 = self.executor.submit(wait_and_raise, event1)
+            future3 = self.executor.submit(event2.wait)
 
-        finished, pending = futures.wait(
-                [future1, future2, future3],
-                return_when=futures.FIRST_EXCEPTION)
+            # Ensure that future1 is completed before future2 finishes
+            def wait_for_future1():
+                future1.result()
+                event1.set()
+
+            t = threading.Thread(target=wait_for_future1)
+            t.start()
+
+            finished, pending = futures.wait(
+                    [future1, future2, future3],
+                    return_when=futures.FIRST_EXCEPTION)
 
-        self.assertEqual(set([future1, future2]), finished)
-        self.assertEqual(set([future3]), pending)
+            self.assertEqual(set([future1, future2]), finished)
+            self.assertEqual(set([future3]), pending)
+
+            threading_helper.join_thread(t)
+        finally:
+            event1.set()
+            event2.set()
+        future3.result()  # wait for job to finish
 
     def test_first_exception_some_already_complete(self):
+        event = self.create_event()
         future1 = self.executor.submit(divmod, 21, 0)
-        future2 = self.executor.submit(time.sleep, 1.5)
-
-        finished, pending = futures.wait(
-                [SUCCESSFUL_FUTURE,
-                 CANCELLED_FUTURE,
-                 CANCELLED_AND_NOTIFIED_FUTURE,
-                 future1, future2],
-                return_when=futures.FIRST_EXCEPTION)
+        future2 = self.executor.submit(event.wait)
 
-        self.assertEqual(set([SUCCESSFUL_FUTURE,
-                              CANCELLED_AND_NOTIFIED_FUTURE,
-                              future1]), finished)
-        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+        try:
+            finished, pending = futures.wait(
+                    [SUCCESSFUL_FUTURE,
+                     CANCELLED_FUTURE,
+                     CANCELLED_AND_NOTIFIED_FUTURE,
+                     future1, future2],
+                    return_when=futures.FIRST_EXCEPTION)
+
+            self.assertEqual(set([SUCCESSFUL_FUTURE,
+                                  CANCELLED_AND_NOTIFIED_FUTURE,
+                                  future1]), finished)
+            self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
+        finally:
+            event.set()
+        future2.result()  # wait for job to finish
 
     def test_first_exception_one_already_failed(self):
-        future1 = self.executor.submit(time.sleep, 2)
+        event = self.create_event()
+        future1 = self.executor.submit(event.wait)
 
-        finished, pending = futures.wait(
-                 [EXCEPTION_FUTURE, future1],
-                 return_when=futures.FIRST_EXCEPTION)
+        try:
+            finished, pending = futures.wait(
+                     [EXCEPTION_FUTURE, future1],
+                     return_when=futures.FIRST_EXCEPTION)
 
-        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
-        self.assertEqual(set([future1]), pending)
+            self.assertEqual(set([EXCEPTION_FUTURE]), finished)
+            self.assertEqual(set([future1]), pending)
+        finally:
+            event.set()
+        future1.result()  # wait for job to finish
 
     def test_all_completed(self):
         future1 = self.executor.submit(divmod, 2, 0)
@@ -114,23 +150,27 @@ def test_all_completed(self):
 
     def test_timeout(self):
         short_timeout = 0.050
-        long_timeout = short_timeout * 10
 
-        future = self.executor.submit(time.sleep, long_timeout)
+        event = self.create_event()
+        future = self.executor.submit(event.wait)
 
-        finished, pending = futures.wait(
-                [CANCELLED_AND_NOTIFIED_FUTURE,
-                 EXCEPTION_FUTURE,
-                 SUCCESSFUL_FUTURE,
-                 future],
-                timeout=short_timeout,
-                return_when=futures.ALL_COMPLETED)
-
-        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
-                              EXCEPTION_FUTURE,
-                              SUCCESSFUL_FUTURE]),
-                         finished)
-        self.assertEqual(set([future]), pending)
+        try:
+            finished, pending = futures.wait(
+                    [CANCELLED_AND_NOTIFIED_FUTURE,
+                     EXCEPTION_FUTURE,
+                     SUCCESSFUL_FUTURE,
+                     future],
+                    timeout=short_timeout,
+                    return_when=futures.ALL_COMPLETED)
+
+            self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
+                                  EXCEPTION_FUTURE,
+                                  SUCCESSFUL_FUTURE]),
+                             finished)
+            self.assertEqual(set([future]), pending)
+        finally:
+            event.set()
+        future.result()  # wait for job to finish
 
 
 class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
diff --git a/Lib/test/test_concurrent_futures/util.py 
b/Lib/test/test_concurrent_futures/util.py
index 3b8ec3e205d5aa..e85ef3b1c91681 100644
--- a/Lib/test/test_concurrent_futures/util.py
+++ b/Lib/test/test_concurrent_futures/util.py
@@ -1,5 +1,6 @@
 import multiprocessing
 import sys
+import threading
 import time
 import unittest
 from concurrent import futures
@@ -50,14 +51,19 @@ def setUp(self):
                 max_workers=self.worker_count,
                 mp_context=self.get_context(),
                 **self.executor_kwargs)
+            self.manager = self.get_context().Manager()
         else:
             self.executor = self.executor_type(
                 max_workers=self.worker_count,
                 **self.executor_kwargs)
+            self.manager = None
 
     def tearDown(self):
         self.executor.shutdown(wait=True)
         self.executor = None
+        if self.manager is not None:
+            self.manager.shutdown()
+            self.manager = None
 
         dt = time.monotonic() - self.t1
         if support.verbose:
@@ -73,6 +79,9 @@ def get_context(self):
 class ThreadPoolMixin(ExecutorMixin):
     executor_type = futures.ThreadPoolExecutor
 
+    def create_event(self):
+        return threading.Event()
+
 
 class ProcessPoolForkMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
@@ -89,6 +98,9 @@ def get_context(self):
             self.skipTest("TSAN doesn't support threads after fork")
         return super().get_context()
 
+    def create_event(self):
+        return self.manager.Event()
+
 
 class ProcessPoolSpawnMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
@@ -101,6 +113,9 @@ def get_context(self):
             self.skipTest("ProcessPoolExecutor unavailable on this system")
         return super().get_context()
 
+    def create_event(self):
+        return self.manager.Event()
+
 
 class ProcessPoolForkserverMixin(ExecutorMixin):
     executor_type = futures.ProcessPoolExecutor
@@ -117,6 +132,9 @@ def get_context(self):
             self.skipTest("TSAN doesn't support threads after fork")
         return super().get_context()
 
+    def create_event(self):
+        return self.manager.Event()
+
 
 def create_executor_tests(remote_globals, mixin, bases=(BaseTestCase,),
                           executor_mixins=(ThreadPoolMixin,

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to