https://github.com/python/cpython/commit/a005835f699b5ba44beb8c856db1f62454522e1e
commit: a005835f699b5ba44beb8c856db1f62454522e1e
branch: main
author: Enzo Bonnal <bonnal.enzo....@gmail.com>
committer: picnixz <10796600+picn...@users.noreply.github.com>
date: 2025-03-13T11:57:53+01:00
summary:

gh-74028: add `buffersize` parameter to `concurrent.futures.Executor.map` for 
lazier behavior (#125663)

`concurrent.futures.Executor.map` now supports limiting the number of submitted
tasks whose results have not yet been yielded via the new `buffersize` 
parameter.

---------

Co-authored-by: Bénédikt Tran <10796600+picn...@users.noreply.github.com>

files:
A Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst
M Doc/library/concurrent.futures.rst
M Doc/whatsnew/3.14.rst
M Lib/concurrent/futures/_base.py
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures/executor.py

diff --git a/Doc/library/concurrent.futures.rst 
b/Doc/library/concurrent.futures.rst
index dc613f2f8f00cd..68d081001b6791 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -40,11 +40,14 @@ Executor Objects
              future = executor.submit(pow, 323, 1235)
              print(future.result())
 
-   .. method:: map(fn, *iterables, timeout=None, chunksize=1)
+   .. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)
 
       Similar to :func:`map(fn, *iterables) <map>` except:
 
-      * the *iterables* are collected immediately rather than lazily;
+      * The *iterables* are collected immediately rather than lazily, unless a
+        *buffersize* is specified to limit the number of submitted tasks whose
+        results have not yet been yielded. If the buffer is full, iteration 
over
+        the *iterables* pauses until a result is yielded from the buffer.
 
       * *fn* is executed asynchronously and several calls to
         *fn* may be made concurrently.
@@ -68,7 +71,10 @@ Executor Objects
       *chunksize* has no effect.
 
       .. versionchanged:: 3.5
-         Added the *chunksize* argument.
+         Added the *chunksize* parameter.
+
+      .. versionchanged:: next
+         Added the *buffersize* parameter.
 
    .. method:: shutdown(wait=True, *, cancel_futures=False)
 
diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst
index 6898b50b2c932a..42a3bf02475b39 100644
--- a/Doc/whatsnew/3.14.rst
+++ b/Doc/whatsnew/3.14.rst
@@ -465,6 +465,13 @@ contextvars
 * Support context manager protocol by :class:`contextvars.Token`.
   (Contributed by Andrew Svetlov in :gh:`129889`.)
 
+* Add the optional ``buffersize`` parameter to
+  :meth:`concurrent.futures.Executor.map` to limit the number of submitted
+  tasks whose results have not yet been yielded. If the buffer is full,
+  iteration over the *iterables* pauses until a result is yielded from the
+  buffer.
+  (Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.)
+
 
 ctypes
 ------
diff --git a/Lib/concurrent/futures/_base.py b/Lib/concurrent/futures/_base.py
index 707fcdfde79acd..d5ba39e3d71774 100644
--- a/Lib/concurrent/futures/_base.py
+++ b/Lib/concurrent/futures/_base.py
@@ -8,6 +8,8 @@
 import threading
 import time
 import types
+import weakref
+from itertools import islice
 
 FIRST_COMPLETED = 'FIRST_COMPLETED'
 FIRST_EXCEPTION = 'FIRST_EXCEPTION'
@@ -572,7 +574,7 @@ def submit(self, fn, /, *args, **kwargs):
         """
         raise NotImplementedError()
 
-    def map(self, fn, *iterables, timeout=None, chunksize=1):
+    def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
         """Returns an iterator equivalent to map(fn, iter).
 
         Args:
@@ -584,6 +586,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
                 before being passed to a child process. This argument is only
                 used by ProcessPoolExecutor; it is ignored by
                 ThreadPoolExecutor.
+            buffersize: The number of submitted tasks whose results have not
+                yet been yielded. If the buffer is full, iteration over the
+                iterables pauses until a result is yielded from the buffer.
+                If None, all input elements are eagerly collected, and a task 
is
+                submitted for each.
 
         Returns:
             An iterator equivalent to: map(func, *iterables) but the calls may
@@ -594,10 +601,25 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
                 before the given timeout.
             Exception: If fn(*args) raises for any values.
         """
+        if buffersize is not None and not isinstance(buffersize, int):
+            raise TypeError("buffersize must be an integer or None")
+        if buffersize is not None and buffersize < 1:
+            raise ValueError("buffersize must be None or > 0")
+
         if timeout is not None:
             end_time = timeout + time.monotonic()
 
-        fs = [self.submit(fn, *args) for args in zip(*iterables)]
+        zipped_iterables = zip(*iterables)
+        if buffersize:
+            fs = collections.deque(
+                self.submit(fn, *args) for args in islice(zipped_iterables, 
buffersize)
+            )
+        else:
+            fs = [self.submit(fn, *args) for args in zipped_iterables]
+
+        # Use a weak reference to ensure that the executor can be garbage
+        # collected independently of the result_iterator closure.
+        executor_weakref = weakref.ref(self)
 
         # Yield must be hidden in closure so that the futures are submitted
         # before the first iterator value is required.
@@ -606,6 +628,12 @@ def result_iterator():
                 # reverse to keep finishing order
                 fs.reverse()
                 while fs:
+                    if (
+                        buffersize
+                        and (executor := executor_weakref())
+                        and (args := next(zipped_iterables, None))
+                    ):
+                        fs.appendleft(executor.submit(fn, *args))
                     # Careful not to keep a reference to the popped future
                     if timeout is None:
                         yield _result_or_cancel(fs.pop())
diff --git a/Lib/concurrent/futures/process.py 
b/Lib/concurrent/futures/process.py
index d79d6b959c90d3..4847550908adab 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -813,7 +813,7 @@ def submit(self, fn, /, *args, **kwargs):
             return f
     submit.__doc__ = _base.Executor.submit.__doc__
 
-    def map(self, fn, *iterables, timeout=None, chunksize=1):
+    def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
         """Returns an iterator equivalent to map(fn, iter).
 
         Args:
@@ -824,6 +824,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
             chunksize: If greater than one, the iterables will be chopped into
                 chunks of size chunksize and submitted to the process pool.
                 If set to one, the items in the list will be sent one at a 
time.
+            buffersize: The number of submitted tasks whose results have not
+                yet been yielded. If the buffer is full, iteration over the
+                iterables pauses until a result is yielded from the buffer.
+                If None, all input elements are eagerly collected, and a task 
is
+                submitted for each.
 
         Returns:
             An iterator equivalent to: map(func, *iterables) but the calls may
@@ -839,7 +844,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
 
         results = super().map(partial(_process_chunk, fn),
                               itertools.batched(zip(*iterables), chunksize),
-                              timeout=timeout)
+                              timeout=timeout,
+                              buffersize=buffersize)
         return _chain_from_iterable_of_lists(results)
 
     def shutdown(self, wait=True, *, cancel_futures=False):
diff --git a/Lib/test/test_concurrent_futures/executor.py 
b/Lib/test/test_concurrent_futures/executor.py
index 0221c28b3ceb8b..d88c34d1c8c8e4 100644
--- a/Lib/test/test_concurrent_futures/executor.py
+++ b/Lib/test/test_concurrent_futures/executor.py
@@ -1,7 +1,9 @@
+import itertools
 import threading
 import time
 import weakref
 from concurrent import futures
+from operator import add
 from test import support
 from test.support import Py_GIL_DISABLED
 
@@ -73,6 +75,74 @@ def test_map_timeout(self):
         # take longer than the specified timeout.
         self.assertIn(results, ([None, None], [None], []))
 
+    def test_map_buffersize_type_validation(self):
+        for buffersize in ("foo", 2.0):
+            with self.subTest(buffersize=buffersize):
+                with self.assertRaisesRegex(
+                    TypeError,
+                    "buffersize must be an integer or None",
+                ):
+                    self.executor.map(str, range(4), buffersize=buffersize)
+
+    def test_map_buffersize_value_validation(self):
+        for buffersize in (0, -1):
+            with self.subTest(buffersize=buffersize):
+                with self.assertRaisesRegex(
+                    ValueError,
+                    "buffersize must be None or > 0",
+                ):
+                    self.executor.map(str, range(4), buffersize=buffersize)
+
+    def test_map_buffersize(self):
+        ints = range(4)
+        for buffersize in (1, 2, len(ints), len(ints) * 2):
+            with self.subTest(buffersize=buffersize):
+                res = self.executor.map(str, ints, buffersize=buffersize)
+                self.assertListEqual(list(res), ["0", "1", "2", "3"])
+
+    def test_map_buffersize_on_multiple_iterables(self):
+        ints = range(4)
+        for buffersize in (1, 2, len(ints), len(ints) * 2):
+            with self.subTest(buffersize=buffersize):
+                res = self.executor.map(add, ints, ints, buffersize=buffersize)
+                self.assertListEqual(list(res), [0, 2, 4, 6])
+
+    def test_map_buffersize_on_infinite_iterable(self):
+        res = self.executor.map(str, itertools.count(), buffersize=2)
+        self.assertEqual(next(res, None), "0")
+        self.assertEqual(next(res, None), "1")
+        self.assertEqual(next(res, None), "2")
+
+    def test_map_buffersize_on_multiple_infinite_iterables(self):
+        res = self.executor.map(
+            add,
+            itertools.count(),
+            itertools.count(),
+            buffersize=2
+        )
+        self.assertEqual(next(res, None), 0)
+        self.assertEqual(next(res, None), 2)
+        self.assertEqual(next(res, None), 4)
+
+    def test_map_buffersize_on_empty_iterable(self):
+        res = self.executor.map(str, [], buffersize=2)
+        self.assertIsNone(next(res, None))
+
+    def test_map_buffersize_without_iterable(self):
+        res = self.executor.map(str, buffersize=2)
+        self.assertIsNone(next(res, None))
+
+    def test_map_buffersize_when_buffer_is_full(self):
+        ints = iter(range(4))
+        buffersize = 2
+        self.executor.map(str, ints, buffersize=buffersize)
+        self.executor.shutdown(wait=True)  # wait for tasks to complete
+        self.assertEqual(
+            next(ints),
+            buffersize,
+            msg="should have fetched only `buffersize` elements from `ints`.",
+        )
+
     def test_shutdown_race_issue12456(self):
         # Issue #12456: race condition at shutdown where trying to post a
         # sentinel in the call queue blocks (the queue is full while processes
diff --git 
a/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst 
b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst
new file mode 100644
index 00000000000000..6760e2b935430c
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2024-10-18-10-27-54.gh-issue-74028.4d4vVD.rst
@@ -0,0 +1,4 @@
+Add the optional ``buffersize`` parameter to
+:meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks
+whose results have not yet been yielded. If the buffer is full, iteration over
+the *iterables* pauses until a result is yielded from the buffer.

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to