https://github.com/python/cpython/commit/7707b14489644073ab0153f5751c6ddbf3fc6f91
commit: 7707b14489644073ab0153f5751c6ddbf3fc6f91
branch: main
author: Duprat <ydup...@gmail.com>
committer: gvanrossum <gvanros...@gmail.com>
date: 2024-03-18T16:15:29Z
summary:

gh-115258: Fix hanging tests for threading queue shutdown (#115940)

This reinstates `test_shutdown_immediate_all_methods_in_many_threads`
and improves `test_shutdown_all_methods_in_many_threads`.

files:
M Lib/test/test_queue.py

diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py
index ad31ba1af03b6f..c4d10110132393 100644
--- a/Lib/test/test_queue.py
+++ b/Lib/test/test_queue.py
@@ -317,97 +317,107 @@ def test_shutdown_all_methods_in_one_thread(self):
     def test_shutdown_immediate_all_methods_in_one_thread(self):
         return self._shutdown_all_methods_in_one_thread(True)
 
-    def _write_msg_thread(self, q, n, results, delay,
-                            i_when_exec_shutdown,
-                            event_start, event_end):
-        event_start.wait()
-        for i in range(1, n+1):
+    def _write_msg_thread(self, q, n, results,
+                            i_when_exec_shutdown, event_shutdown,
+                            barrier_start):
+        # All `write_msg_threads`
+        # put several items into the queue.
+        for i in range(0, i_when_exec_shutdown//2):
+            q.put((i, 'LOYD'))
+        # Wait for the barrier to be complete.
+        barrier_start.wait()
+
+        for i in range(i_when_exec_shutdown//2, n):
             try:
                 q.put((i, "YDLO"))
-                results.append(True)
             except self.queue.ShutDown:
                 results.append(False)
-            # triggers shutdown of queue
-            if i == i_when_exec_shutdown:
-                event_end.set()
-            time.sleep(delay)
-        # end of all puts
-        q.join()
+                break
 
-    def _read_msg_thread(self, q, nb, results, delay, event_start):
-        event_start.wait()
-        block = True
-        while nb:
-            time.sleep(delay)
+            # Trigger queue shutdown.
+            if i == i_when_exec_shutdown:
+                # Only one thread should call shutdown().
+                if not event_shutdown.is_set():
+                    event_shutdown.set()
+                    results.append(True)
+
+    def _read_msg_thread(self, q, results, barrier_start):
+        # Get at least one item.
+        q.get(True)
+        q.task_done()
+        # Wait for the barrier to be complete.
+        barrier_start.wait()
+        while True:
             try:
-                # Get at least one message
-                q.get(block)
-                block = False
+                q.get(False)
                 q.task_done()
-                results.append(True)
-                nb -= 1
             except self.queue.ShutDown:
-                results.append(False)
-                nb -= 1
+                results.append(True)
+                break
             except self.queue.Empty:
                 pass
-        q.join()
 
-    def _shutdown_thread(self, q, event_end, immediate):
+    def _shutdown_thread(self, q, results, event_end, immediate):
         event_end.wait()
         q.shutdown(immediate)
-        q.join()
+        results.append(q.qsize() == 0)
 
-    def _join_thread(self, q, delay, event_start):
-        event_start.wait()
-        time.sleep(delay)
+    def _join_thread(self, q, barrier_start):
+        # Wait for the barrier to be complete.
+        barrier_start.wait()
         q.join()
 
     def _shutdown_all_methods_in_many_threads(self, immediate):
+        # Run a 'multi-producers/consumers queue' use case,
+        # with enough items into the queue.
+        # When shutdown, all running threads will be joined.
         q = self.type2test()
         ps = []
-        ev_start = threading.Event()
-        ev_exec_shutdown = threading.Event()
         res_puts = []
         res_gets = []
-        delay = 1e-4
-        read_process = 4
-        nb_msgs = read_process * 16
-        nb_msgs_r = nb_msgs // read_process
-        when_exec_shutdown = nb_msgs // 2
-        lprocs = (
-            (self._write_msg_thread, 1,  (q, nb_msgs, res_puts, delay,
-                                            when_exec_shutdown,
-                                            ev_start, ev_exec_shutdown)),
-            (self._read_msg_thread, read_process, (q, nb_msgs_r,
-                                                    res_gets, delay*2,
-                                                    ev_start)),
-            (self._join_thread, 2, (q, delay*2, ev_start)),
-            (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),
-            )
-        # start all threds
+        res_shutdown = []
+        write_threads = 4
+        read_threads = 6
+        join_threads = 2
+        nb_msgs = 1024*64
+        nb_msgs_w = nb_msgs // write_threads
+        when_exec_shutdown = nb_msgs_w // 2
+        # Use of a Barrier to ensure that
+        # - all write threads put all their items into the queue,
+        # - all read thread get at least one item from the queue,
+        #   and keep on running until shutdown.
+        # The join thread is started only when shutdown is immediate.
+        nparties = write_threads + read_threads
+        if immediate:
+            nparties += join_threads
+        barrier_start = threading.Barrier(nparties)
+        ev_exec_shutdown = threading.Event()
+        lprocs = [
+            (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
+                                            when_exec_shutdown, 
ev_exec_shutdown,
+                                            barrier_start)),
+            (self._read_msg_thread, read_threads, (q, res_gets, 
barrier_start)),
+            (self._shutdown_thread, 1, (q, res_shutdown, ev_exec_shutdown, 
immediate)),
+            ]
+        if immediate:
+            lprocs.append((self._join_thread, join_threads, (q, 
barrier_start)))
+        # start all threads.
         for func, n, args in lprocs:
             for i in range(n):
                 ps.append(threading.Thread(target=func, args=args))
                 ps[-1].start()
-        # set event in order to run q.shutdown()
-        ev_start.set()
-
-        if not immediate:
-            assert(len(res_gets) == len(res_puts))
-            assert(res_gets.count(True) == res_puts.count(True))
-        else:
-            assert(len(res_gets) <= len(res_puts))
-            assert(res_gets.count(True) <= res_puts.count(True))
-
-        for thread in ps[1:]:
+        for thread in ps:
             thread.join()
 
-    @unittest.skip("test times out (gh-115258)")
+        self.assertTrue(True in res_puts)
+        self.assertEqual(res_gets.count(True), read_threads)
+        if immediate:
+            self.assertListEqual(res_shutdown, [True])
+            self.assertTrue(q.empty())
+
     def test_shutdown_all_methods_in_many_threads(self):
         return self._shutdown_all_methods_in_many_threads(False)
 
-    @unittest.skip("test times out (gh-115258)")
     def test_shutdown_immediate_all_methods_in_many_threads(self):
         return self._shutdown_all_methods_in_many_threads(True)
 

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to