https://github.com/python/cpython/commit/ce0cf7a73a96e65bb6dd80e7555e531bd0e27037
commit: ce0cf7a73a96e65bb6dd80e7555e531bd0e27037
branch: main
author: Kumar Aditya <[email protected]>
committer: kumaraditya303 <[email protected]>
date: 2025-02-09T13:02:11Z
summary:

gh-129874: improve `test_tasks` in asyncio to use correct internal functions 
(#129890)

files:
M Lib/test/test_asyncio/test_tasks.py

diff --git a/Lib/test/test_asyncio/test_tasks.py 
b/Lib/test/test_asyncio/test_tasks.py
index 881a7aeeb2fbe5..5a28d4c185bf96 100644
--- a/Lib/test/test_asyncio/test_tasks.py
+++ b/Lib/test/test_asyncio/test_tasks.py
@@ -546,7 +546,7 @@ async def task():
             try:
                 await asyncio.sleep(10)
             except asyncio.CancelledError:
-                asyncio.current_task().uncancel()
+                self.current_task().uncancel()
                 await asyncio.sleep(10)
 
         try:
@@ -598,7 +598,7 @@ def test_uncancel_structured_blocks(self):
         loop = asyncio.new_event_loop()
 
         async def make_request_with_timeout(*, sleep: float, timeout: float):
-            task = asyncio.current_task()
+            task = self.current_task()
             loop = task.get_loop()
 
             timed_out = False
@@ -1987,41 +1987,41 @@ async def coro():
         self.assertIsNone(t2.result())
 
     def test_current_task(self):
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
         async def coro(loop):
-            self.assertIs(asyncio.current_task(), task)
+            self.assertIs(self.current_task(), task)
 
-            self.assertIs(asyncio.current_task(None), task)
-            self.assertIs(asyncio.current_task(), task)
+            self.assertIs(self.current_task(None), task)
+            self.assertIs(self.current_task(), task)
 
         task = self.new_task(self.loop, coro(self.loop))
         self.loop.run_until_complete(task)
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
     def test_current_task_with_interleaving_tasks(self):
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
         fut1 = self.new_future(self.loop)
         fut2 = self.new_future(self.loop)
 
         async def coro1(loop):
-            self.assertTrue(asyncio.current_task() is task1)
+            self.assertTrue(self.current_task() is task1)
             await fut1
-            self.assertTrue(asyncio.current_task() is task1)
+            self.assertTrue(self.current_task() is task1)
             fut2.set_result(True)
 
         async def coro2(loop):
-            self.assertTrue(asyncio.current_task() is task2)
+            self.assertTrue(self.current_task() is task2)
             fut1.set_result(True)
             await fut2
-            self.assertTrue(asyncio.current_task() is task2)
+            self.assertTrue(self.current_task() is task2)
 
         task1 = self.new_task(self.loop, coro1(self.loop))
         task2 = self.new_task(self.loop, coro2(self.loop))
 
         self.loop.run_until_complete(asyncio.wait((task1, task2)))
-        self.assertIsNone(asyncio.current_task(loop=self.loop))
+        self.assertIsNone(self.current_task(loop=self.loop))
 
     # Some thorough tests for cancellation propagation through
     # coroutines, tasks and wait().
@@ -2833,6 +2833,7 @@ class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
     Task = getattr(tasks, '_CTask', None)
     Future = getattr(futures, '_CFuture', None)
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
     @support.refcount_test
     def test_refleaks_in_task___init__(self):
@@ -2865,6 +2866,7 @@ class CTask_CFuture_SubclassTests(BaseTaskTests, 
test_utils.TestCase):
     Task = getattr(tasks, '_CTask', None)
     Future = getattr(futures, '_CFuture', None)
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
@@ -2875,6 +2877,7 @@ class CTaskSubclass_PyFuture_Tests(BaseTaskTests, 
test_utils.TestCase):
     Task = getattr(tasks, '_CTask', None)
     Future = futures._PyFuture
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
 
 @unittest.skipUnless(hasattr(futures, '_CFuture'),
@@ -2885,6 +2888,7 @@ class PyTask_CFutureSubclass_Tests(BaseTaskTests, 
test_utils.TestCase):
     Future = getattr(futures, '_CFuture', None)
     Task = tasks._PyTask
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
@@ -2894,6 +2898,7 @@ class CTask_PyFuture_Tests(BaseTaskTests, 
test_utils.TestCase):
     Task = getattr(tasks, '_CTask', None)
     Future = futures._PyFuture
     all_tasks = getattr(tasks, '_c_all_tasks', None)
+    current_task = staticmethod(getattr(tasks, '_c_current_task', None))
 
 
 @unittest.skipUnless(hasattr(futures, '_CFuture'),
@@ -2903,6 +2908,7 @@ class PyTask_CFuture_Tests(BaseTaskTests, 
test_utils.TestCase):
     Task = tasks._PyTask
     Future = getattr(futures, '_CFuture', None)
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
@@ -2911,6 +2917,7 @@ class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
     Task = tasks._PyTask
     Future = futures._PyFuture
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 @add_subclass_tests
@@ -2918,6 +2925,7 @@ class PyTask_PyFuture_SubclassTests(BaseTaskTests, 
test_utils.TestCase):
     Task = tasks._PyTask
     Future = futures._PyFuture
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 @unittest.skipUnless(hasattr(tasks, '_CTask'),
                      'requires the C _asyncio module')
@@ -3004,44 +3012,60 @@ def done(self):
     def test__enter_task(self):
         task = mock.Mock()
         loop = mock.Mock()
-        self.assertIsNone(asyncio.current_task(loop))
+        # _enter_task is called by Task.__step while the loop
+        # is running, so set the loop as the running loop
+        # for a more realistic test.
+        asyncio._set_running_loop(loop)
+        self.assertIsNone(self.current_task(loop))
         self._enter_task(loop, task)
-        self.assertIs(asyncio.current_task(loop), task)
+        self.assertIs(self.current_task(loop), task)
         self._leave_task(loop, task)
+        asyncio._set_running_loop(None)
 
     def test__enter_task_failure(self):
         task1 = mock.Mock()
         task2 = mock.Mock()
         loop = mock.Mock()
+        asyncio._set_running_loop(loop)
         self._enter_task(loop, task1)
         with self.assertRaises(RuntimeError):
             self._enter_task(loop, task2)
-        self.assertIs(asyncio.current_task(loop), task1)
+        self.assertIs(self.current_task(loop), task1)
         self._leave_task(loop, task1)
+        asyncio._set_running_loop(None)
 
     def test__leave_task(self):
         task = mock.Mock()
         loop = mock.Mock()
+        asyncio._set_running_loop(loop)
         self._enter_task(loop, task)
         self._leave_task(loop, task)
-        self.assertIsNone(asyncio.current_task(loop))
+        self.assertIsNone(self.current_task(loop))
+        asyncio._set_running_loop(None)
 
     def test__leave_task_failure1(self):
         task1 = mock.Mock()
         task2 = mock.Mock()
         loop = mock.Mock()
+        # _leave_task is called by Task.__step while the loop
+        # is running, so set the loop as the running loop
+        # for a more realistic test.
+        asyncio._set_running_loop(loop)
         self._enter_task(loop, task1)
         with self.assertRaises(RuntimeError):
             self._leave_task(loop, task2)
-        self.assertIs(asyncio.current_task(loop), task1)
+        self.assertIs(self.current_task(loop), task1)
         self._leave_task(loop, task1)
+        asyncio._set_running_loop(None)
 
     def test__leave_task_failure2(self):
         task = mock.Mock()
         loop = mock.Mock()
+        asyncio._set_running_loop(loop)
         with self.assertRaises(RuntimeError):
             self._leave_task(loop, task)
-        self.assertIsNone(asyncio.current_task(loop))
+        self.assertIsNone(self.current_task(loop))
+        asyncio._set_running_loop(None)
 
     def test__unregister_task(self):
         task = mock.Mock()
@@ -3064,6 +3088,7 @@ class PyIntrospectionTests(test_utils.TestCase, 
BaseTaskIntrospectionTests):
     _enter_task = staticmethod(tasks._py_enter_task)
     _leave_task = staticmethod(tasks._py_leave_task)
     all_tasks = staticmethod(tasks._py_all_tasks)
+    current_task = staticmethod(tasks._py_current_task)
 
 
 @unittest.skipUnless(hasattr(tasks, '_c_register_task'),
@@ -3075,6 +3100,7 @@ class CIntrospectionTests(test_utils.TestCase, 
BaseTaskIntrospectionTests):
         _enter_task = staticmethod(tasks._c_enter_task)
         _leave_task = staticmethod(tasks._c_leave_task)
         all_tasks = staticmethod(tasks._c_all_tasks)
+        current_task = staticmethod(tasks._c_current_task)
     else:
         _register_task = _unregister_task = _enter_task = _leave_task = None
 

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: [email protected]

Reply via email to