https://github.com/python/cpython/commit/53da1e8c8ccbe3161ebc42e8b8b7ebd1ab70e05b
commit: 53da1e8c8ccbe3161ebc42e8b8b7ebd1ab70e05b
branch: main
author: J. Nick Koston <n...@koston.org>
committer: kumaraditya303 <kumaradi...@python.org>
date: 2025-05-18T21:26:20+05:30
summary:

gh-134173: optimize state transfer between `concurrent.futures.Future` and 
`asyncio.Future` (#134174)

Co-authored-by: Kumar Aditya <kumaradi...@python.org>

files:
A Misc/NEWS.d/next/Library/2025-05-18-07-25-15.gh-issue-134173.53oOoF.rst
M Lib/asyncio/futures.py
M Lib/concurrent/futures/_base.py
M Lib/test/test_asyncio/test_futures.py
M Lib/test/test_concurrent_futures/test_future.py

diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index d1df6707302277..6bd00a644789f1 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -351,22 +351,19 @@ def _set_concurrent_future_state(concurrent, source):
 def _copy_future_state(source, dest):
     """Internal helper to copy state from another Future.
 
-    The other Future may be a concurrent.futures.Future.
+    The other Future must be a concurrent.futures.Future.
     """
-    assert source.done()
     if dest.cancelled():
         return
     assert not dest.done()
-    if source.cancelled():
+    done, cancelled, result, exception = source._get_snapshot()
+    assert done
+    if cancelled:
         dest.cancel()
+    elif exception is not None:
+        dest.set_exception(_convert_future_exc(exception))
     else:
-        exception = source.exception()
-        if exception is not None:
-            dest.set_exception(_convert_future_exc(exception))
-        else:
-            result = source.result()
-            dest.set_result(result)
-
+        dest.set_result(result)
 
 def _chain_future(source, destination):
     """Chain two futures so that when one completes, so does the other.
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index d98b1ebdd584b5..f506ce68aea5b2 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -558,6 +558,33 @@ def set_exception(self, exception):
             self._condition.notify_all()
         self._invoke_callbacks()
 
+    def _get_snapshot(self):
+        """Get a snapshot of the future's current state.
+
+        This method atomically retrieves the state in one lock acquisition,
+        which is significantly faster than multiple method calls.
+
+        Returns:
+            Tuple of (done, cancelled, result, exception)
+            - done: True if the future is done (cancelled or finished)
+            - cancelled: True if the future was cancelled
+            - result: The result if available and not cancelled
+            - exception: The exception if available and not cancelled
+        """
+        # Fast path: check if already finished without lock
+        if self._state == FINISHED:
+            return True, False, self._result, self._exception
+
+        # Need lock for other states since they can change
+        with self._condition:
+            # We have to check the state again after acquiring the lock
+            # because it may have changed in the meantime.
+            if self._state == FINISHED:
+                return True, False, self._result, self._exception
+            if self._state in {CANCELLED, CANCELLED_AND_NOTIFIED}:
+                return True, True, None, None
+            return False, False, None, None
+
     __class_getitem__ = classmethod(types.GenericAlias)
 
 class Executor(object):
diff --git a/Lib/test/test_asyncio/test_futures.py 
b/Lib/test/test_asyncio/test_futures.py
index 8b51522278aaa6..39bef465bdb717 100644
--- a/Lib/test/test_asyncio/test_futures.py
+++ b/Lib/test/test_asyncio/test_futures.py
@@ -413,7 +413,7 @@ def func_repr(func):
     def test_copy_state(self):
         from asyncio.futures import _copy_future_state
 
-        f = self._new_future(loop=self.loop)
+        f = concurrent.futures.Future()
         f.set_result(10)
 
         newf = self._new_future(loop=self.loop)
@@ -421,7 +421,7 @@ def test_copy_state(self):
         self.assertTrue(newf.done())
         self.assertEqual(newf.result(), 10)
 
-        f_exception = self._new_future(loop=self.loop)
+        f_exception = concurrent.futures.Future()
         f_exception.set_exception(RuntimeError())
 
         newf_exception = self._new_future(loop=self.loop)
@@ -429,7 +429,7 @@ def test_copy_state(self):
         self.assertTrue(newf_exception.done())
         self.assertRaises(RuntimeError, newf_exception.result)
 
-        f_cancelled = self._new_future(loop=self.loop)
+        f_cancelled = concurrent.futures.Future()
         f_cancelled.cancel()
 
         newf_cancelled = self._new_future(loop=self.loop)
@@ -441,7 +441,7 @@ def test_copy_state(self):
         except BaseException as e:
             f_exc = e
 
-        f_conexc = self._new_future(loop=self.loop)
+        f_conexc = concurrent.futures.Future()
         f_conexc.set_exception(f_exc)
 
         newf_conexc = self._new_future(loop=self.loop)
@@ -454,6 +454,56 @@ def test_copy_state(self):
         newf_tb = ''.join(traceback.format_tb(newf_exc.__traceback__))
         self.assertEqual(newf_tb.count('raise 
concurrent.futures.InvalidStateError'), 1)
 
+    def test_copy_state_from_concurrent_futures(self):
+        """Test _copy_future_state from concurrent.futures.Future.
+
+        This tests the optimized path using _get_snapshot when available.
+        """
+        from asyncio.futures import _copy_future_state
+
+        # Test with a result
+        f_concurrent = concurrent.futures.Future()
+        f_concurrent.set_result(42)
+        f_asyncio = self._new_future(loop=self.loop)
+        _copy_future_state(f_concurrent, f_asyncio)
+        self.assertTrue(f_asyncio.done())
+        self.assertEqual(f_asyncio.result(), 42)
+
+        # Test with an exception
+        f_concurrent_exc = concurrent.futures.Future()
+        f_concurrent_exc.set_exception(ValueError("test exception"))
+        f_asyncio_exc = self._new_future(loop=self.loop)
+        _copy_future_state(f_concurrent_exc, f_asyncio_exc)
+        self.assertTrue(f_asyncio_exc.done())
+        with self.assertRaises(ValueError) as cm:
+            f_asyncio_exc.result()
+        self.assertEqual(str(cm.exception), "test exception")
+
+        # Test with cancelled state
+        f_concurrent_cancelled = concurrent.futures.Future()
+        f_concurrent_cancelled.cancel()
+        f_asyncio_cancelled = self._new_future(loop=self.loop)
+        _copy_future_state(f_concurrent_cancelled, f_asyncio_cancelled)
+        self.assertTrue(f_asyncio_cancelled.cancelled())
+
+        # Test that destination already cancelled prevents copy
+        f_concurrent_result = concurrent.futures.Future()
+        f_concurrent_result.set_result(10)
+        f_asyncio_precancelled = self._new_future(loop=self.loop)
+        f_asyncio_precancelled.cancel()
+        _copy_future_state(f_concurrent_result, f_asyncio_precancelled)
+        self.assertTrue(f_asyncio_precancelled.cancelled())
+
+        # Test exception type conversion
+        f_concurrent_invalid = concurrent.futures.Future()
+        
f_concurrent_invalid.set_exception(concurrent.futures.InvalidStateError("invalid"))
+        f_asyncio_invalid = self._new_future(loop=self.loop)
+        _copy_future_state(f_concurrent_invalid, f_asyncio_invalid)
+        self.assertTrue(f_asyncio_invalid.done())
+        with self.assertRaises(asyncio.exceptions.InvalidStateError) as cm:
+            f_asyncio_invalid.result()
+        self.assertEqual(str(cm.exception), "invalid")
+
     def test_iter(self):
         fut = self._new_future(loop=self.loop)
 
diff --git a/Lib/test/test_concurrent_futures/test_future.py 
b/Lib/test/test_concurrent_futures/test_future.py
index 4066ea1ee4b367..06b11a3bacf13a 100644
--- a/Lib/test/test_concurrent_futures/test_future.py
+++ b/Lib/test/test_concurrent_futures/test_future.py
@@ -6,6 +6,7 @@
     PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
 
 from test import support
+from test.support import threading_helper
 
 from .util import (
     PENDING_FUTURE, RUNNING_FUTURE, CANCELLED_FUTURE,
@@ -282,6 +283,62 @@ def test_multiple_set_exception(self):
 
         self.assertEqual(f.exception(), e)
 
+    def test_get_snapshot(self):
+        """Test the _get_snapshot method for atomic state retrieval."""
+        # Test with a pending future
+        f = Future()
+        done, cancelled, result, exception = f._get_snapshot()
+        self.assertFalse(done)
+        self.assertFalse(cancelled)
+        self.assertIsNone(result)
+        self.assertIsNone(exception)
+
+        # Test with a finished future (successful result)
+        f = Future()
+        f.set_result(42)
+        done, cancelled, result, exception = f._get_snapshot()
+        self.assertTrue(done)
+        self.assertFalse(cancelled)
+        self.assertEqual(result, 42)
+        self.assertIsNone(exception)
+
+        # Test with a finished future (exception)
+        f = Future()
+        exc = ValueError("test error")
+        f.set_exception(exc)
+        done, cancelled, result, exception = f._get_snapshot()
+        self.assertTrue(done)
+        self.assertFalse(cancelled)
+        self.assertIsNone(result)
+        self.assertIs(exception, exc)
+
+        # Test with a cancelled future
+        f = Future()
+        f.cancel()
+        done, cancelled, result, exception = f._get_snapshot()
+        self.assertTrue(done)
+        self.assertTrue(cancelled)
+        self.assertIsNone(result)
+        self.assertIsNone(exception)
+
+        # Test concurrent access (basic thread safety check)
+        f = Future()
+        f.set_result(100)
+        results = []
+
+        def get_snapshot():
+            for _ in range(1000):
+                snapshot = f._get_snapshot()
+                results.append(snapshot)
+
+        threads = [threading.Thread(target=get_snapshot) for _ in range(4)]
+        with threading_helper.start_threads(threads):
+            pass
+        # All snapshots should be identical for a finished future
+        expected = (True, False, 100, None)
+        for result in results:
+            self.assertEqual(result, expected)
+
 
 def setUpModule():
     setup_module()
diff --git 
a/Misc/NEWS.d/next/Library/2025-05-18-07-25-15.gh-issue-134173.53oOoF.rst 
b/Misc/NEWS.d/next/Library/2025-05-18-07-25-15.gh-issue-134173.53oOoF.rst
new file mode 100644
index 00000000000000..57fba5e21a3e9a
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2025-05-18-07-25-15.gh-issue-134173.53oOoF.rst
@@ -0,0 +1,3 @@
+Speed up :mod:`asyncio` performance of transferring state from thread
+pool :class:`concurrent.futures.Future` by up to 4.4x. Patch by J. Nick
+Koston.

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to