jenkins-bot has submitted this change. ( 
https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1105000?usp=email )

Change subject: IMPR: Add BoundedPoolExecutor
......................................................................

IMPR: Add BoundedPoolExecutor

- add BoundedPoolExecutor to tools.threading which inherits from every
  concurrent.futures.Executor subclass but uses BoundedSemaphore
  to limit the items added to worker in submit() function.
- test BoundedPoolExecutor
- update all scripts using ThreadPoolExecutor and use the new
  BoundedThreadPoolExecutor instead.

Bug: T333741
Change-Id: I47bb2b4743f44dcd04c5d4df57978749791bf99e
---
M pywikibot/scripts/login.py
M pywikibot/tools/threading.py
M scripts/archivebot.py
M scripts/fixing_redirects.py
M scripts/watchlist.py
M tests/tools_threading_tests.py
6 files changed, 232 insertions(+), 15 deletions(-)

Approvals:
  Xqt: Looks good to me, approved
  jenkins-bot: Verified




diff --git a/pywikibot/scripts/login.py b/pywikibot/scripts/login.py
index 3cc1bf8..c1af7f6 100755
--- a/pywikibot/scripts/login.py
+++ b/pywikibot/scripts/login.py
@@ -51,13 +51,13 @@
 from __future__ import annotations

 import datetime
-from concurrent.futures import ThreadPoolExecutor
 from contextlib import nullcontext, suppress

 import pywikibot
 from pywikibot import config
 from pywikibot.exceptions import NoUsernameError, SiteDefinitionError
 from pywikibot.login import OauthLoginManager
+from pywikibot.tools.threading import BoundedPoolExecutor


 def _get_consumer_token(site) -> tuple[str, str]:
@@ -157,8 +157,9 @@
         namedict = {site.family.name: {site.code: None}}

     params = oauth, logout, autocreate
-    context = ThreadPoolExecutor if asynchronous else nullcontext
-    with context() as executor:
+    context = (nullcontext(),
+               BoundedPoolExecutor('ThreadPoolExecutor'))[asynchronous]
+    with context as executor:
         for family_name in namedict:
             for lang in namedict[family_name]:
                 if asynchronous:
diff --git a/pywikibot/tools/threading.py b/pywikibot/tools/threading.py
index 7539561..b93b6d9 100644
--- a/pywikibot/tools/threading.py
+++ b/pywikibot/tools/threading.py
@@ -1,20 +1,25 @@
 """Classes which can be used for threading."""
 #
-# (C) Pywikibot team, 2008-2022
+# (C) Pywikibot team, 2008-2024
 #
 # Distributed under the terms of the MIT license.
 #
 from __future__ import annotations

+import concurrent.futures as futures
+import importlib
 import queue
 import re
 import threading
 import time
+from typing import Any

 import pywikibot  # T306760
+from pywikibot.tools import SPHINX_RUNNING


 __all__ = (
+    'BoundedPoolExecutor',
     'RLock',
     'ThreadedGenerator',
     'ThreadList',
@@ -189,6 +194,8 @@
     ...     pool.append(threading.Thread(target=work))
     ...

+    .. seealso:: :class:`BoundedPoolExecutor`
+
     """

     def __init__(self, limit: int = 128, wait_time: float = 2, *args) -> None:
@@ -225,3 +232,121 @@
         super().append(thd)
         thd.start()
         pywikibot.logging.debug(f"thread {len(self)} ('{type(thd)}') started")
+
+
+class BoundedPoolExecutor(futures.Executor):
+
+    """A bounded Executor which limits prefetched Futures.
+
+    BoundedThreadPoolExecutor behaves like other executors derived from
+    :pylib:`concurrent.futures.Executor
+    <concurrent.futures.html#concurrent.futures.Executor>` but will
+    block further items on :meth:`submit` calls to be added to workers
+    queue if the *max_bound* limit is reached.
+
+    .. versionadded:: 10.0
+
+    .. seealso::
+       - :pylib:`concurrent.futures.html#executor-objects`
+       - :class:`ThreadList`
+
+    :param executor: One of the executors found in ``concurrent.futures``.
+        The parameter may be given as class type or its name.
+    :param max_bound: the maximum number of items in the workers queue.
+        If not given or None, the number is set to *max_workers*.
+    :param args: Any positional argument for the given *executor*
+    :param kwargs: Any keyword argument for the given *executor*
+    :raises AttributeError: given *executor* is not found in
+        concurrent.futures.
+    :raises TypeError: given *executor* is not a class or not a real
+        subclass of concurrent.futures.Executor.
+    :raises ValueError: minimum *max_bound* is 1.
+    """
+
+    def __new__(
+        cls,
+        executor: futures.Executor | str,
+        /,
+        max_bound: int | None = None,
+        *args: Any,
+        **kwargs: Any
+    ) -> BoundedPoolExecutor:
+        """Create a new BoundedPoolExecutor subclass.
+
+        The class inherits from :class:`BoundedPoolExecutor` and the
+        given *executor*. The class name is composed of "Bounded" and
+        the name of the *executor*.
+        """
+        module = 'concurrent.futures'
+        if isinstance(executor, str):
+            base = getattr(
+                importlib.import_module(module), executor)
+        else:
+            base = executor
+
+        if base is futures.Executor or not issubclass(base, futures.Executor):
+            raise TypeError(
+                f'expected a real subclass of {module + ".Executor"!r} or the '
+                f'class name for executor parameter, not {base.__name__!r}'
+            )
+        new = type('Bounded' + base.__name__, (cls, base), {})
+        return super().__new__(new)
+
+    def __init__(self, executor, /, max_bound=None, *args, **kwargs) -> None:
+        """Initializer."""
+        if max_bound is not None and max_bound < 1:
+            raise ValueError("Minimum 'max_bound' is 1")
+
+        super().__init__(*args, **kwargs)
+        self._bound_semaphore = threading.BoundedSemaphore(
+            max_bound or self._max_workers)
+
+    def submit(self, fn, /, *args, **kwargs) -> futures.Future:
+        """Schedules callable *fn* to be executed as ``fn(*args, **kwargs)``.
+
+        .. code-block:: python
+
+           with BoundedPoolExecutor('ThreadPoolExecutor',
+                                     max_bound=5,
+                                     max_workers=1) as executor:
+               future = executor.submit(pow, 323, 1235)
+               print(future.result())
+
+        """
+        self._bound_semaphore.acquire()
+
+        try:
+            f = super().submit(fn, *args, **kwargs)
+        except futures.BrokenExecutor:
+            self._bound_semaphore.release()
+            raise
+
+        f.add_done_callback(lambda _f: self._bound_semaphore.release())
+        return f
+
+    if not SPHINX_RUNNING:
+        submit.__doc__ = futures.Executor.submit.__doc__
+
+    def _bound(self, sep: str = '') -> str:
+        """Helper method for str and repr."""
+        if not hasattr(self, '_bound_semaphore'):
+            # class is not fully initialized
+            return ''
+
+        bound = self._bound_semaphore._initial_value
+        return '' if bound == self._max_workers else f'{sep}{bound}'
+
+    def __str__(self):
+        """String of current BoundedPoolExecutor type.
+
+        Includes *max_bound* if necessary.
+        """
+        return f'{type(self).__name__}({self._bound()})'
+
+    def __repr__(self):
+        """Representation string of BoundedPoolExecutor.
+
+        Includes the *executor* and *max_bound* if necessary.
+        """
+        base, executor = type(self).__bases__
+        return f'{base.__name__}({executor.__name__!r}{self._bound(", ")})'
diff --git a/scripts/archivebot.py b/scripts/archivebot.py
index ac8b0fc..ccabe1f 100755
--- a/scripts/archivebot.py
+++ b/scripts/archivebot.py
@@ -164,7 +164,6 @@
 import threading
 import time
 from collections import OrderedDict, defaultdict
-from concurrent.futures import ThreadPoolExecutor
 from contextlib import nullcontext
 from hashlib import md5
 from math import ceil
@@ -185,6 +184,7 @@
 )
 from pywikibot.time import MW_KEYS, parse_duration, str2timedelta
 from pywikibot.tools import PYTHON_VERSION
+from pywikibot.tools.threading import BoundedPoolExecutor


 class ArchiveBotSiteConfigError(Error):
@@ -970,9 +970,9 @@

     if asynchronous:
         signal.signal(signal.SIGINT, signal_handler)
-        context = ThreadPoolExecutor
+        context = BoundedPoolExecutor('ThreadPoolExecutor')
     else:
-        context = nullcontext
+        context = nullcontext()

     for template_name in templates:
         tmpl = pywikibot.Page(site, template_name, ns=10)
@@ -992,7 +992,7 @@

         botargs = tmpl, salt, force, keep, sort
         futures = []  # needed for Python < 3.9
-        with context() as executor:
+        with context as executor:
             for pg in gen:
                 if asynchronous:
                     future = executor.submit(process_page, pg, *botargs)
diff --git a/scripts/fixing_redirects.py b/scripts/fixing_redirects.py
index d327621..dba56d8 100755
--- a/scripts/fixing_redirects.py
+++ b/scripts/fixing_redirects.py
@@ -23,7 +23,7 @@
 from __future__ import annotations

 import re
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import as_completed
 from contextlib import suppress

 import pywikibot
@@ -45,6 +45,7 @@
 from pywikibot.textlib import isDisabled
 from pywikibot.tools import first_lower
 from pywikibot.tools import first_upper as firstcap
+from pywikibot.tools.threading import BoundedPoolExecutor


 # This is required for the text that is shown when you run this script
@@ -187,7 +188,7 @@
             pywikibot.error(e)
             return

-        with ThreadPoolExecutor() as executor:
+        with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
             futures = {executor.submit(self.get_target, p)
                        for p in self.current_page.linkedPages()}
             for future in as_completed(futures):
diff --git a/scripts/watchlist.py b/scripts/watchlist.py
index d5a2e1c..c6d1a61 100755
--- a/scripts/watchlist.py
+++ b/scripts/watchlist.py
@@ -33,12 +33,13 @@

 import datetime
 import os
-from concurrent.futures import ThreadPoolExecutor, as_completed
+from concurrent.futures import as_completed
 
 import pywikibot
 from pywikibot import config
 from pywikibot.data.api import CachedRequest
 from pywikibot.exceptions import InvalidTitleError
+from pywikibot.tools.threading import BoundedPoolExecutor


 try:
@@ -67,7 +68,7 @@
     if not quiet:
         pywikibot.info('Counting pages in watchlists of all wikis...')

-    with ThreadPoolExecutor() as executor:
+    with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
         futures = {executor.submit(refresh, pywikibot.Site(lang, family))
                    for family in config.usernames
                    for lang in config.usernames[family]}
@@ -95,7 +96,7 @@
     cache_path = CachedRequest._get_cache_dir()
     files = os.scandir(cache_path)
     seen = set()
-    with ThreadPoolExecutor() as executor:
+    with BoundedPoolExecutor('ThreadPoolExecutor') as executor:
         for filename in files:
             entry = CacheEntry(cache_path, filename)
             entry._load_cache()
diff --git a/tests/tools_threading_tests.py b/tests/tools_threading_tests.py
index 4808895..d14981e 100755
--- a/tests/tools_threading_tests.py
+++ b/tests/tools_threading_tests.py
@@ -1,16 +1,24 @@
 #!/usr/bin/env python3
 """Tests for threading tools."""
 #
-# (C) Pywikibot team, 2014-2022
+# (C) Pywikibot team, 2014-2024
 #
 # Distributed under the terms of the MIT license.
 #
 from __future__ import annotations

+import time
 import unittest
+from concurrent.futures import (
+    Executor,
+    Future,
+    ProcessPoolExecutor,
+    ThreadPoolExecutor,
+)
 from contextlib import suppress

-from pywikibot.tools.threading import ThreadedGenerator
+from pywikibot.tools import PYTHON_VERSION
+from pywikibot.tools.threading import BoundedPoolExecutor, ThreadedGenerator
 from tests.aspects import TestCase


@@ -41,6 +49,87 @@
         self.assertEqual(list(thd_gen), list(iterable))


+class BoundedThreadPoolTests(TestCase):
+
+    """BoundedThreadPool test cases."""
+
+    net = False
+
+    def test_strings(self):
+        """Test string and repr methods for executor strings."""
+        executors = ['ThreadPoolExecutor', 'ProcessPoolExecutor']
+        if PYTHON_VERSION >= (3, 14):
+            executors.append('InterpreterPoolExecutor')
+
+        for executor in executors:
+            with self.subTest(executor=executor):
+                pool = BoundedPoolExecutor(executor)
+                self.assertEqual(str(pool), f'Bounded{executor}()')
+                self.assertEqual(repr(pool),
+                                 f'BoundedPoolExecutor({executor!r})')
+                self.assertEqual(pool._bound_semaphore._initial_value,
+                                 pool._max_workers)
+
+    def test_class(self):
+        """Test string and repr methods for a executor class."""
+        executors = [ThreadPoolExecutor, ProcessPoolExecutor]
+        if PYTHON_VERSION >= (3, 14):
+            from concurrent.futures import InterpreterPoolExecutor
+            executors.append(InterpreterPoolExecutor)
+
+        for executor in executors:
+            with self.subTest(executor=executor):
+                pool = BoundedPoolExecutor(executor)
+                self.assertEqual(str(pool), f'Bounded{executor.__name__}()')
+                self.assertEqual(repr(pool),
+                                 f'BoundedPoolExecutor({executor.__name__!r})')
+                self.assertEqual(pool._bound_semaphore._initial_value,
+                                 pool._max_workers)
+
+    def test_run(self):
+        """Test examples for Executor during run."""
+        for bound in (2, 5, 7):
+            futures = []
+            with self.subTest(bound=bound), \
+                 BoundedPoolExecutor('ThreadPoolExecutor',
+                                     max_bound=bound,
+                                     max_workers=5) as pool:
+                for _ in range(10):
+                    future = pool.submit(time.sleep, 1)
+                    self.assertIsInstance(future, Future)
+                    futures.append(future)
+
+            self.assertLength(futures, 10)
+            for future in futures:
+                self.assertTrue(future.done())
+                self.assertIsNone(future.result())
+
+    def test_exceptions(self):
+        """Test exceptions when creating a bounded executor."""
+        with self.assertRaisesRegex(TypeError,
+                                    r'issubclass\(\) arg 1 must be a class'):
+            BoundedPoolExecutor(PYTHON_VERSION)
+        with self.assertRaisesRegex(TypeError,
+                                    'expected a real subclass of '
+                                    r"'concurrent\.futures\.Executor'"):
+            BoundedPoolExecutor(TestCase)
+        with self.assertRaisesRegex(TypeError,
+                                    'expected a real subclass of '
+                                    r"'concurrent\.futures\.Executor'"):
+            BoundedPoolExecutor(Future)
+        with self.assertRaisesRegex(TypeError,
+                                    'expected a real subclass of '
+                                    r"'concurrent\.futures\.Executor'"):
+            BoundedPoolExecutor(Executor)
+        with self.assertRaisesRegex(
+                TypeError, "duplicate base class '?BoundedPoolExecutor'?"):
+            BoundedPoolExecutor(BoundedPoolExecutor)
+        with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"):
+            BoundedPoolExecutor('ThreadPoolExecutor', 0)
+        with self.assertRaisesRegex(ValueError, "Minimum 'max_bound' is 1"):
+            BoundedPoolExecutor('ThreadPoolExecutor', max_bound=0)
+
+
 if __name__ == '__main__':
     with suppress(SystemExit):
         unittest.main()

--
To view, visit 
https://gerrit.wikimedia.org/r/c/pywikibot/core/+/1105000?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.wikimedia.org/r/settings?usp=email

Gerrit-MessageType: merged
Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: I47bb2b4743f44dcd04c5d4df57978749791bf99e
Gerrit-Change-Number: 1105000
Gerrit-PatchSet: 6
Gerrit-Owner: Xqt <i...@gno.de>
Gerrit-Reviewer: Xqt <i...@gno.de>
Gerrit-Reviewer: jenkins-bot
_______________________________________________
Pywikibot-commits mailing list -- pywikibot-commits@lists.wikimedia.org
To unsubscribe send an email to pywikibot-commits-le...@lists.wikimedia.org

Reply via email to