https://github.com/python/cpython/commit/c476410dc5ae62ffb5e32f64f9a3a622485c7d93
commit: c476410dc5ae62ffb5e32f64f9a3a622485c7d93
branch: main
author: Sam Gross <colesb...@gmail.com>
committer: colesbury <colesb...@gmail.com>
date: 2025-03-07T09:58:09-05:00
summary:

gh-110206: Fix multiprocessing test_notify_all (#130933)

The test could deadlock trying join on the worker processes due to a
combination of behaviors:

* The use of `assertReachesEventually` did not ensure that workers
  actually woken.release() because the SyncManager's Semaphore does not
  implement get_value.

* This mean that the test could finish and the variable "sleeping" would
  got out of scope and be collected. This unregisters the proxy leading
  to failures in the worker or possibly the manager.

* The subsequent call to `p.join()` during cleanUp therefore never
  finished.

This takes two approaches to fix this:

1) Use woken.acquire() to ensure that the workers actually finish
   calling woken.release()

2) At the end of the test, wait until the workers are finished, while `cond`,
   `sleeping`, and `woken` are still valid.

files:
M Lib/test/_test_multiprocessing.py

diff --git a/Lib/test/_test_multiprocessing.py 
b/Lib/test/_test_multiprocessing.py
index 67ab8f098f700b..10c17b2dc7d32b 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1694,18 +1694,19 @@ def test_notify_all(self):
         woken = self.Semaphore(0)
 
         # start some threads/processes which will timeout
+        workers = []
         for i in range(3):
             p = self.Process(target=self.f,
                              args=(cond, sleeping, woken, TIMEOUT1))
             p.daemon = True
             p.start()
-            self.addCleanup(p.join)
+            workers.append(p)
 
             t = threading.Thread(target=self.f,
                                  args=(cond, sleeping, woken, TIMEOUT1))
             t.daemon = True
             t.start()
-            self.addCleanup(t.join)
+            workers.append(t)
 
         # wait for them all to sleep
         for i in range(6):
@@ -1724,12 +1725,12 @@ def test_notify_all(self):
             p = self.Process(target=self.f, args=(cond, sleeping, woken))
             p.daemon = True
             p.start()
-            self.addCleanup(p.join)
+            workers.append(p)
 
             t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
             t.daemon = True
             t.start()
-            self.addCleanup(t.join)
+            workers.append(t)
 
         # wait for them to all sleep
         for i in range(6):
@@ -1745,11 +1746,17 @@ def test_notify_all(self):
         cond.release()
 
         # check they have all woken
-        self.assertReachesEventually(lambda: get_value(woken), 6)
+        for i in range(6):
+            woken.acquire()
+        self.assertReturnsIfImplemented(0, get_value, woken)
 
         # check state is not mucked up
         self.check_invariant(cond)
 
+        for w in workers:
+            # NOTE: join_process and join_thread are the same
+            threading_helper.join_thread(w)
+
     def test_notify_n(self):
         cond = self.Condition()
         sleeping = self.Semaphore(0)

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to