jenkins-bot has submitted this change. ( 
https://gerrit.wikimedia.org/r/c/pywikibot/core/+/840669 )

Change subject: [cleanup] Refactor TokenWallet
......................................................................

[cleanup] Refactor TokenWallet

Since MW 1.24 a new token system was introduced. All tokens can be
retrieved at once.

_tokenwallet.py:
- derive TokenWallet from collections.abc.Container
- give up the user key in TokenWallet._tokens because usually a Site
  object is for only one user after user/sysop dualism was given up.
  Keep a _currentuser attribute for sanity check.
- raise KeyError with TokenWallet if a key is not in the collection
  instead of pywikibot.Error as suggested by Python documentation.
- keep the token replacement for outdated tokens but print a
  FutureWarning in such cases.
- give up failed_cache attribute because __getitem__ lazy loads all
  tokens at once if one token was wanted. APISite.get_tokens() is used
  to get all tokens.
- add a new method 'clear()' to clear the internal cache
- deprecate 'load_tokens' method; just call clear() with it to enable
  a lazy loaded refresh of then tokens cache.
- __repr__ method now gives a new result string which looks like a
   valid Python expression.
- update_tokens was added to renew all tokens of a given list.
- this changes should also solve T270380

_apisite.py:
- deprecate validate_tokens() which is no longer needed.
- remove warnhandler in get_tokens which didn't work since MW 1.24 change
- deprecate 'all' parameter of get_tokens; all tokens are retrieved if
  list of 'types' is empty
- add a tokens property to enable deleting the tokens cache; this is a
  variant of calling 'clear()' method

requests.py:
- use TokenWallet.update_tokens() to renew tokens if needed in _bad_token()
- remove logging and simplify the code; we already have API warnings.

api/_login.py:
- deprecate get_login_token function and use self.tokens instead
- remove pre 1.27 code

others:
- update replaced tokens
- update tests
- update documentation

Bug: T306637
Bug: T270380
Change-Id: I12102055da723545f0f41408363cb45732b47967
---
M pywikibot/data/api/__init__.py
M pywikibot/data/api/_login.py
M pywikibot/data/api/_requests.py
M pywikibot/page/_user.py
M pywikibot/site/_apisite.py
M pywikibot/site/_datasite.py
M pywikibot/site/_extensions.py
M pywikibot/site/_tokenwallet.py
M pywikibot/site/_upload.py
M scripts/change_pagelang.py
M tests/aspects.py
M tests/token_tests.py
12 files changed, 310 insertions(+), 280 deletions(-)

Approvals:
  Xqt: Looks good to me, approved
  jenkins-bot: Verified



diff --git a/pywikibot/data/api/__init__.py b/pywikibot/data/api/__init__.py
index dbd6b4c..6445c8c 100644
--- a/pywikibot/data/api/__init__.py
+++ b/pywikibot/data/api/__init__.py
@@ -47,7 +47,7 @@
     """
     Clear cookies for site's second level domain.

-    get_login_token() will generate new cookies needed.
+    The http module takes care of all the cookie stuff.
     This is a workaround for requests bug, see :phab:`T224712`
     and https://github.com/psf/requests/issues/5411
     for more details.
diff --git a/pywikibot/data/api/_login.py b/pywikibot/data/api/_login.py
index 11de1ae..f91745e 100644
--- a/pywikibot/data/api/_login.py
+++ b/pywikibot/data/api/_login.py
@@ -12,6 +12,7 @@
 from pywikibot import login
 from pywikibot.backports import Dict
 from pywikibot.login import LoginStatus
+from pywikibot.tools import deprecated

 __all__ = ['LoginManager']

@@ -39,14 +40,11 @@
     def _login_parameters(self, *, botpassword: bool = False
                           ) -> Dict[str, str]:
         """Return login parameters."""
-        # Since MW 1.27 only for bot passwords.
-        self.action = 'login'
-        if not botpassword:
-            # get token using meta=tokens if supported
-            token = self.get_login_token()
-            if token:
-                # Standard login request since MW 1.27
-                self.action = 'clientlogin'
+        if botpassword:
+            self.action = 'login'
+        else:
+            token = self.site.tokens['login']
+            self.action = 'clientlogin'

         # prepare default login parameters
         parameters = {'action': self.action,
@@ -70,7 +68,6 @@
         Note, this doesn't do anything with cookies. The http module
         takes care of all the cookie stuff. Throws exception on failure.
         """
-        self.below_mw_1_27 = False
         if hasattr(self, '_waituntil') \
            and datetime.datetime.now() < self._waituntil:
             diff = self._waituntil - datetime.datetime.now()
@@ -114,21 +111,16 @@
                 return

             if status in ('NeedToken', 'WrongToken', 'badtoken'):
-                token = response.get('token')
-                if token and self.below_mw_1_27:  # pragma: no cover
-                    # fetched token using action=login
-                    login_request['lgtoken'] = token
-                    pywikibot.log('Received login token, proceed with login.')
-                else:
-                    # if incorrect login token was used,
-                    # force relogin and generate fresh one
-                    pywikibot.error('Received incorrect login token. '
-                                    'Forcing re-login.')
-                    # invalidate superior wiki cookies (T224712)
-                    pywikibot.data.api._invalidate_superior_cookies(
-                        self.site.family)
-                    login_request[
-                        self.keyword('token')] = self.get_login_token()
+                # if incorrect login token was used,
+                # force relogin and generate fresh one
+                pywikibot.error('Received incorrect login token. '
+                                'Forcing re-login.')
+                # invalidate superior wiki cookies (T224712)
+                pywikibot.data.api._invalidate_superior_cookies(
+                    self.site.family)
+                self.site.tokens.clear()
+                login_request[
+                    self.keyword('token')] = self.site.tokens['login']
                 continue

             # messagecode was introduced with 1.29.0-wmf.14
@@ -155,19 +147,12 @@

         raise pywikibot.exceptions.APIError(code=status, info=fail_reason)

+    @deprecated("site.tokens['login']", since='8.0.0')
     def get_login_token(self) -> Optional[str]:
         """Fetch login token for MediaWiki 1.27+.

+        .. deprecated:: 8.0
+
         :return: login token
         """
-        login_token_request = self.site._request(
-            use_get=False,
-            parameters={'action': 'query', 'meta': 'tokens', 'type': 'login'},
-        )
-        login_token_result = login_token_request.submit()
-        # check if we have to use old implementation of mw < 1.27
-        if 'query' in login_token_result:
-            return login_token_result['query']['tokens'].get('logintoken')
-
-        self.below_mw_1_27 = True  # pragma: no cover
-        return None
+        return self.site.tokens['login']
diff --git a/pywikibot/data/api/_requests.py b/pywikibot/data/api/_requests.py
index f8dce8c..8d656c8 100644
--- a/pywikibot/data/api/_requests.py
+++ b/pywikibot/data/api/_requests.py
@@ -917,7 +917,12 @@
         self.wait(delay)

     def _bad_token(self, code) -> bool:
-        """Check for bad token."""
+        """Check for bad token.
+
+        Check for bad tokens, call :meth:`TokenWallet.update_tokens()
+        <pywikibot.site._tokenwallet.TokenWallet.update_tokens>` method
+        to update the bunch of tokens and continue loop in :meth:`submit`.
+        """
         if code != 'badtoken':  # Other code not handled here
             return False

@@ -926,40 +931,12 @@
                           .format(self.site._loginstatus.name))
             return False

-        user_tokens = self.site.tokens._tokens[self.site.user()]
-        # all token values mapped to their type
-        tokens = {token: t_type for t_type, token in user_tokens.items()}
-        # determine which tokens are bad
-        invalid_param = {name: tokens[param[0]]
-                         for name, param in self._params.items()
-                         if len(param) == 1 and param[0] in tokens}
-        # doesn't care about the cache so can directly load them
-        if invalid_param:
-            pywikibot.log(
-                'Bad token error for {}. Tokens for "{}" used in request; '
-                'invalidated them.'
-                .format(self.site.user(),
-                        '", "'.join(sorted(set(invalid_param.values())))))
-            # invalidate superior wiki cookies (T224712)
-            pywikibot.data.api._invalidate_superior_cookies(self.site.family)
-            # request new token(s) instead of invalid
-            self.site.tokens.load_tokens(set(invalid_param.values()))
-            # fix parameters; lets hope that it doesn't mistake actual
-            # parameters as tokens
-            for name, t_type in invalid_param.items():
-                self[name] = self.site.tokens[t_type]
-            return True
-
-        # otherwise couldn't find any … weird there is nothing what
-        # can be done here because it doesn't know which parameters
-        # to fix
-        pywikibot.log(
-            'Bad token error for {} but no parameter is using a '
-            'token. Current tokens: {}'
-            .format(self.site.user(),
-                    ', '.join('{}: {}'.format(*e)
-                              for e in user_tokens.items())))
-        return False
+        # invalidate superior wiki cookies (T224712)
+        pywikibot.data.api._invalidate_superior_cookies(self.site.family)
+        # update tokens
+        tokens = self.site.tokens.update_tokens(self._params['token'])
+        self._params['token'] = tokens
+        return True

     def submit(self) -> dict:
         """
diff --git a/pywikibot/page/_user.py b/pywikibot/page/_user.py
index f12b507..9ac8e03 100644
--- a/pywikibot/page/_user.py
+++ b/pywikibot/page/_user.py
@@ -249,7 +249,7 @@
         params = {
             'action': 'emailuser',
             'target': self.username,
-            'token': self.site.tokens['email'],
+            'token': self.site.tokens['csrf'],
             'subject': subject,
             'text': text,
         }
diff --git a/pywikibot/site/_apisite.py b/pywikibot/site/_apisite.py
index a3305ff..f45b6c0 100644
--- a/pywikibot/site/_apisite.py
+++ b/pywikibot/site/_apisite.py
@@ -14,9 +14,17 @@
 from typing import Any, Iterable, Optional, Type, TypeVar, Union

 import pywikibot
-from pywikibot.backports import DefaultDict, Dict, List, Match
+from pywikibot.backports import (
+    DefaultDict,
+    Dict,
+    List,
+    Match,
+    Pattern,
+    Set,
+    Tuple,
+    removesuffix,
+)
 from pywikibot.backports import OrderedDict as OrderedDictType
-from pywikibot.backports import Pattern, Set, Tuple
 from pywikibot.comms.http import get_authentication
 from pywikibot.data import api
 from pywikibot.exceptions import (
@@ -72,6 +80,7 @@
     MediaWikiVersion,
     cached,
     deprecated,
+    issue_deprecation_warning,
     merge_unique_dicts,
     normalize_username,
 )
@@ -121,20 +130,20 @@
         self._msgcache: Dict[str, str] = {}
         self._paraminfo = api.ParamInfo(self)
         self._siteinfo = Siteinfo(self)
-        self.tokens = TokenWallet(self)
+        self._tokens = TokenWallet(self)

     def __getstate__(self) -> Dict[str, Any]:
         """Remove TokenWallet before pickling, for security reasons."""
-        new = super().__getstate__()
-        del new['tokens']
-        del new['_interwikimap']
-        return new
+        state = super().__getstate__()
+        del state['_tokens']
+        del state['_interwikimap']
+        return state

-    def __setstate__(self, attrs: Dict[str, Any]) -> None:
+    def __setstate__(self, state: Dict[str, Any]) -> None:
         """Restore things removed in __getstate__."""
-        super().__setstate__(attrs)
+        super().__setstate__(state)
         self._interwikimap = _InterwikiMap(self)
-        self.tokens = TokenWallet(self)
+        self._tokens = TokenWallet(self)

     def interwiki(self, prefix: str) -> BaseSite:
         """
@@ -447,7 +456,7 @@

         # Reset tokens and user properties
         del self.userinfo
-        self.tokens = TokenWallet(self)
+        self.tokens.clear()
         self._paraminfo = api.ParamInfo(self)

         # Clear also cookies for site's second level domain (T224712)
@@ -503,7 +512,7 @@
         - :meth:`logged_in` to verify the user is loggend in to a site

         .. seealso:: :api:`Userinfo`
-        .. versionchanged:: 8.0.0
+        .. versionchanged:: 8.0
            Use API formatversion 2.

         :return: A dict with the following keys and values:
@@ -1528,68 +1537,127 @@

         return page._redirtarget

+    @deprecated(since='8.0.0')
     def validate_tokens(self, types: List[str]) -> List[str]:
-        """Validate if requested tokens are acceptable."""
+        """Validate if requested tokens are acceptable.
+
+        Valid tokens may depend on mw version.
+
+        .. deprecated:: 8.0
+        """
         data = self._paraminfo.parameter('query+tokens', 'type')
         assert data is not None
         return [token for token in types if token in data['type']]

-    def get_tokens(
-        self,
-        types: List[str],
-        all: bool = False
-    ) -> Dict[str, str]:
-        """Preload one or multiple tokens.
+    def get_tokens(self, types: List[str], *args, **kwargs) -> Dict[str, str]:
+        r"""Preload one or multiple tokens.

-        For MediaWiki versions since 1.24wmfXXX a new token
-        system was introduced which reduced the amount of tokens available.
-        Most of them were merged into the 'csrf' token. If the token type in
-        the parameter is not known it will default to the 'csrf' token.
+        **Usage**

-        The other token types available are:
-         - createaccount
-         - deleteglobalaccount
-         - login
-         - patrol
-         - rollback
-         - setglobalaccountstatus
-         - userrights
-         - watch
+        >>> site = pywikibot.Site()
+        >>> tokens = site.get_tokens([])  # get all tokens
+        >>> list(tokens.keys())  # result depends on user
+        ['createaccount', 'login']
+        >>> tokens = site.get_tokens(['csrf', 'patrol'])
+        >>> list(tokens.keys())  # doctest: +SKIP
+        ['csrf', 'patrol']
+        >>> token = site.get_tokens(['csrf']).get('csrf')  # get a single token
+        >>> token  # doctest: +SKIP
+        'a9f...0a0+\\'
+        >>> token = site.get_tokens(['unknown'])  # try an invalid token
+        ... # doctest: +SKIP
+        ... # invalid token names shows a warnig and the key is not in result
+        ...
+        WARNING: API warning (tokens) of unknown format:
+        ... {'warnings': 'Unrecognized value for parameter "type": foo'}
+        {}

+        You should not call this method directly, especially if you only
+        need a specific token. Use :attr:`tokens` property instead.
+
+        .. versionchanged:: 8.0
+           ``all`` parameter is deprecated. Use an empty list for
+           ``types`` instead.
+        .. note:: ``args`` and ``kwargs`` are not used for deprecation
+           warning only.
         .. seealso:: :api:`Tokens`

-        :param types: the types of token (e.g., "edit", "move", "delete");
-            see API documentation for full list of types
-        :param all: load all available tokens, if None only if it can be done
-            in one request.
-
-        return: a dict with retrieved valid tokens.
+        :param types: the types of token (e.g., "csrf", "login", "patrol").
+            If the list is empty all available tokens are loaded. See
+            API documentation for full list of types.
+        :return: a dict with retrieved valid tokens.
         """
-        def warn_handler(mod: str, text: str) -> Optional[Match[str]]:
-            """Filter warnings for not available tokens."""
-            return re.match(
-                r'Action \'\w+\' is not allowed for the current user', text)
+        # deprecate 'all' parameter
+        if args or kwargs:
+            issue_deprecation_warning("'all' parameter",
+                                      "empty list for 'types' parameter",
+                                      since='8.0.0')
+            load_all = kwargs.get('all', args[0] if args else False)
+        else:
+            load_all = False

-        user_tokens = {}
-        if all is not False:
+        if not types or load_all is not False:
             pdata = self._paraminfo.parameter('query+tokens', 'type')
             assert pdata is not None
-            types.extend(pdata['type'])
+            types = pdata['type']

         req = self.simple_request(action='query', meta='tokens',
-                                  type=self.validate_tokens(types))
+                                  type=types, formatversion=2)

-        req._warning_handler = warn_handler
         data = req.submit()
         data = data.get('query', data)

+        user_tokens = {}
         if 'tokens' in data and data['tokens']:
-            user_tokens = {key[:-5]: val
+            user_tokens = {removesuffix(key, 'token'): val
                            for key, val in data['tokens'].items()
                            if val != '+\\'}

         return user_tokens

+    @property
+    def tokens(self) -> 'pywikibot.site._tokenwallet.TokenWallet':
+        r"""Return the TokenWallet collection.
+
+        :class:`TokenWallet<pywikibot.site._tokenwallet.TokenWallet>`
+        collection holds all available tokens. The tokens are loaded
+        via :meth:`get_tokens` method with the first token request and
+        is retained until the TokenWallet is cleared.
+
+        **Usage:**
+
+        >>> site = pywikibot.Site()
+        >>> token = site.tokens['csrf']  # doctest: +SKIP
+        >>> token  # doctest: +SKIP
+        'df8...9e6+\\'
+        >>> 'csrf' in site.tokens  # doctest: +SKIP
+        ... # Check whether the token exists
+        True
+        >>> 'invalid' in site.tokens  # doctest: +SKIP
+        False
+        >>> token = site.tokens['invalid']  # doctest: +SKIP
+        Traceback (most recent call last):
+        ...
+        KeyError: "Invalid token 'invalid' for user ...
+        >>> site.tokens.clear()  # clears the internal cache
+        >>> site.tokens['csrf']  # doctest: +SKIP
+        ... # get a new token
+        '1c8...9d3+\\'
+        >>> del site.tokens  # another variant to clear the cache
+
+        .. versionchanged:: 8.0
+           ``tokens`` attribute became a property to enable deleter.
+        .. warning:: A deprecation warning is shown if the token name is
+           outdated, see :api:`Tokens (action)`.
+        .. seealso:: :api:`Tokens` for valid token types
+        """
+        return self._tokens
+
+    @tokens.deleter
+    def tokens(self) -> None:
+        """Deleter method to clear the TokenWallet collection."""
+        self._tokens.clear()
+
     # TODO: expand support to other parameters of action=parse?
     def get_parsed_page(self, page: 'pywikibot.page.BasePage') -> str:
         """Retrieve parsed text of the page using action=parse.
@@ -1680,7 +1748,7 @@
         elif target:
             page = pywikibot.Page(self, target)

-        token = self.tokens['delete']
+        token = self.tokens['csrf']
         params = {
             'action': 'revisiondelete',
             'token': token,
@@ -1841,7 +1909,7 @@
                 if not recreate:
                     raise

-        token = self.tokens['edit']
+        token = self.tokens['csrf']
         if bot is None:
             bot = self.has_right('bot')
         params = dict(action='edit', title=page,
@@ -2143,7 +2211,7 @@
             raise NoPageError(page,
                               'Cannot move page {page} because it '
                               'does not exist on {site}.')
-        token = self.tokens['move']
+        token = self.tokens['csrf']
         self.lock_page(page)
         req = self.simple_request(action='move',
                                   noredirect=noredirect,
@@ -2332,7 +2400,7 @@
             raise TypeError("'page' must be a FilePage not a '{}'"
                             .format(page.__class__.__name__))

-        token = self.tokens['delete']
+        token = self.tokens['csrf']
         params = {
             'action': 'delete',
             'token': token,
@@ -2404,7 +2472,7 @@
             If None, restores all revisions.
         :param fileids: List of fileids to restore.
         """
-        token = self.tokens['delete']
+        token = self.tokens['csrf']
         params = {
             'action': 'undelete',
             'title': page,
@@ -2489,7 +2557,7 @@
             applied to all protections. If None, 'infinite', 'indefinite',
             'never', or '' is given, there is no expiry.
         """
-        token = self.tokens['protect']
+        token = self.tokens['csrf']
         self.lock_page(page)

         protections_list = [ptype + '=' + level
@@ -2570,7 +2638,7 @@
             blocked.
         :return: The data retrieved from the API request.
         """
-        token = self.tokens['block']
+        token = self.tokens['csrf']
         if expiry is False:
             expiry = 'never'
         req = self.simple_request(action='block', user=user.username,
@@ -2598,7 +2666,7 @@
         """
         req = self.simple_request(action='unblock',
                                   user=user.username,
-                                  token=self.tokens['block'],
+                                  token=self.tokens['csrf'],
                                   reason=reason)

         data = req.submit()
@@ -2698,7 +2766,7 @@
         # TODO: is there another way?
         req = self._request(throttle=False,
                             parameters={'action': 'upload',
-                                        'token': self.tokens['edit']})
+                                        'token': self.tokens['csrf']})
         try:
             req.submit()
         except APIError as error:
diff --git a/pywikibot/site/_datasite.py b/pywikibot/site/_datasite.py
index 4d870d8..75dce37 100644
--- a/pywikibot/site/_datasite.py
+++ b/pywikibot/site/_datasite.py
@@ -285,7 +285,7 @@
             params['bot'] = 1
         if 'baserevid' in kwargs and kwargs['baserevid']:
             params['baserevid'] = kwargs['baserevid']
-        params['token'] = self.tokens['edit']
+        params['token'] = self.tokens['csrf']

         for arg in kwargs:
             if arg in ['clear', 'summary']:
@@ -317,7 +317,7 @@
                   'claim': json.dumps(claim.toJSON()),
                   'baserevid': entity.latest_revision_id,
                   'summary': summary,
-                  'token': self.tokens['edit'],
+                  'token': self.tokens['csrf'],
                   'bot': bot,
                   }
         req = self.simple_request(**params)
@@ -350,7 +350,7 @@
             raise NoPageError(claim)
         params = {'action': 'wbsetclaimvalue', 'claim': claim.snak,
                   'snaktype': snaktype, 'summary': summary, 'bot': bot,
-                  'token': self.tokens['edit']}
+                  'token': self.tokens['csrf']}

         if snaktype == 'value':
             params['value'] = json.dumps(claim._formatValue())
@@ -377,7 +377,7 @@
             raise NoPageError(claim)
         params = {'action': 'wbsetclaim',
                   'claim': json.dumps(claim.toJSON()),
-                  'token': self.tokens['edit'],
+                  'token': self.tokens['csrf'],
                   'baserevid': claim.on_item.latest_revision_id,
                   'summary': summary,
                   'bot': bot,
@@ -411,7 +411,7 @@
             raise ValueError('The claim cannot have a source.')
         params = {'action': 'wbsetreference', 'statement': claim.snak,
                   'baserevid': claim.on_item.latest_revision_id,
-                  'summary': summary, 'bot': bot, 'token': self.tokens['edit']}
+                  'summary': summary, 'bot': bot, 'token': self.tokens['csrf']}

         # build up the snak
         if isinstance(source, list):
@@ -468,7 +468,7 @@
         if (not new and hasattr(qualifier, 'hash')
                 and qualifier.hash is not None):
             params['snakhash'] = qualifier.hash
-        params['token'] = self.tokens['edit']
+        params['token'] = self.tokens['csrf']
         # build up the snak
         if qualifier.getSnakType() == 'value':
             params['value'] = json.dumps(qualifier._formatValue())
@@ -505,7 +505,7 @@
             'summary': summary,
             'bot': bot,
             'claim': '|'.join(claim.snak for claim in claims),
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
         }

         req = self.simple_request(**params)
@@ -534,7 +534,7 @@
             'summary': summary, 'bot': bot,
             'statement': claim.snak,
             'references': '|'.join(source.hash for source in sources),
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
         }

         req = self.simple_request(**params)
@@ -564,7 +564,7 @@
             'summary': summary,
             'bot': bot,
             'qualifiers': [qualifier.hash for qualifier in qualifiers],
-            'token': self.tokens['edit']
+            'token': self.tokens['csrf']
         }

         req = self.simple_request(**params)
@@ -589,7 +589,7 @@
             'totitle': page1.title(),
             'fromsite': page2.site.dbName(),
             'fromtitle': page2.title(),
-            'token': self.tokens['edit']
+            'token': self.tokens['csrf']
         }
         if bot:
             params['bot'] = 1
@@ -621,7 +621,7 @@
             'fromid': from_item.getID(),
             'toid': to_item.getID(),
             'ignoreconflicts': ignore_conflicts,
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
             'summary': summary,
         }
         if bot:
@@ -649,7 +649,7 @@
             'action': 'wblmergelexemes',
             'source': from_lexeme.getID(),
             'target': to_lexeme.getID(),
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
             'summary': summary,
         }
         if bot:
@@ -673,7 +673,7 @@
             'action': 'wbcreateredirect',
             'from': from_item.getID(),
             'to': to_item.getID(),
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
             'bot': bot,
         }
         req = self.simple_request(**params)
@@ -870,7 +870,7 @@
         params.update(
             {'baserevid': baserevid,
              'action': action,
-             'token': self.tokens['edit'],
+             'token': self.tokens['csrf'],
              'bot': kwargs.pop('bot', True),
              })
         params.update(prepare_data(action, action_data))
@@ -940,7 +940,7 @@
             'lexemeId': lexeme.getID(),
             'data': json.dumps(form.toJSON()),
             'bot': bot,
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
         }
         if baserevid:
             params['baserevid'] = baserevid
@@ -965,7 +965,7 @@
             'action': 'wblremoveform',
             'id': form.getID(),
             'bot': bot,
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
         }
         if baserevid:
             params['baserevid'] = baserevid
@@ -995,7 +995,7 @@
             'formId': form.getID(),
             'data': json.dumps(data),
             'bot': bot,
-            'token': self.tokens['edit'],
+            'token': self.tokens['csrf'],
         }
         if baserevid:
             params['baserevid'] = baserevid
diff --git a/pywikibot/site/_extensions.py b/pywikibot/site/_extensions.py
index 8232a9e..6c5cf58 100644
--- a/pywikibot/site/_extensions.py
+++ b/pywikibot/site/_extensions.py
@@ -65,7 +65,7 @@
         # TODO: ensure that the 'echomarkread' action
         # is supported by the site
         kwargs = merge_unique_dicts(kwargs, action='echomarkread',
-                                    token=self.tokens['edit'])
+                                    token=self.tokens['csrf'])
         req = self.simple_request(**kwargs)
         data = req.submit()
         try:
diff --git a/pywikibot/site/_tokenwallet.py b/pywikibot/site/_tokenwallet.py
index 9e32f87..1ec829d 100644
--- a/pywikibot/site/_tokenwallet.py
+++ b/pywikibot/site/_tokenwallet.py
@@ -4,92 +4,132 @@
 #
 # Distributed under the terms of the MIT license.
 #
-from pywikibot import debug
-from pywikibot.exceptions import Error
+from collections.abc import Container
+from typing import Any, Optional, TYPE_CHECKING
+
+from pywikibot.backports import Dict, List
+from pywikibot.tools import issue_deprecation_warning, deprecated
+
+if TYPE_CHECKING:
+    from pywikibot.site import APISite


-class TokenWallet:
+class TokenWallet(Container):

-    """Container for tokens."""
+    """Container for tokens.

-    def __init__(self, site) -> None:
-        """Initializer.
+    You should not use this container class directly; use
+    :attr:`APISite.tokens<pywikibot.site._apisite.APISite.tokens>`
+    instead which gives access to the site's TokenWallet instance.
+    """

-        :type site: pywikibot.site.APISite
-        """
-        self.site = site
-        self._tokens = {}
-        self.failed_cache = set()  # cache unavailable tokens.
+    def __init__(self, site: 'APISite') -> None:
+        """Initializer."""
+        self.site: APISite = site
+        self._tokens: Dict[str, str] = {}
+        self._currentuser: Optional[str] = site.user()

-    def load_tokens(self, types, all: bool = False) -> None:
-        """
-        Preload one or multiple tokens.
-
-        :param types: the types of token.
-        :type types: iterable
-        :param all: load all available tokens, if None only if it can be done
-            in one request.
-        """
-        if self.site.user() is None:
-            self.site.login()
-
-        self._tokens.setdefault(self.site.user(), {}).update(
-            self.site.get_tokens(types, all=all))
-
-        # Preload all only the first time.
-        # When all=True types is extended in site.get_tokens().
-        # Keys not recognised as tokens, are cached so they are not requested
-        # any longer.
-        if all is not False:
-            for key in types:
-                if key not in self._tokens[self.site.user()]:
-                    self.failed_cache.add((self.site.user(), key))
-
-    def __getitem__(self, key):
+    def __getitem__(self, key: str) -> str:
         """Get token value for the given key."""
-        if self.site.user() is None:
+        if self.site.user() is None and key != 'login':
             self.site.login()

-        user_tokens = self._tokens.setdefault(self.site.user(), {})
-        # always preload all for users without tokens
-        failed_cache_key = (self.site.user(), key)
+        if self.site.user() != self._currentuser:
+            self._currentuser = self.site.user()
+            self.clear()

-        # redirect old tokens to be compatible with older MW version
+        if not self._tokens:
+            self._tokens = self.site.get_tokens([])
+
+        # Redirect old tokens which were used by outdated MediaWiki versions
+        # but show a FutureWarning for this usage:
         # 
https://www.mediawiki.org/wiki/MediaWiki_1.37/Deprecation_of_legacy_API_token_parameters
-        if self.site.mw_version >= '1.24wmf19' \
-           and key in {'edit', 'delete', 'protect', 'move', 'block', 'unblock',
-                       'email', 'import', 'options'}:
-            debug('Token {!r} was replaced by {!r}'.format(key, 'csrf'))
+        if key in {'edit', 'delete', 'protect', 'move', 'block', 'unblock',
+                   'email', 'import', 'options'}:
+            issue_deprecation_warning(
+                f'Token {key!r}', "'csrf'", since='8.0.0')
             key = 'csrf'

         try:
-            key = self.site.validate_tokens([key])[0]
-        except IndexError:
-            raise Error(
-                "Requested token '{}' is invalid on {} wiki."
-                .format(key, self.site))
+            token = self._tokens[key]
+        except KeyError:
+            raise KeyError(
+                f'Invalid token {key!r} for user {self._currentuser!r} on '
+                f'{self.site} wiki.') from None

-        if (key not in user_tokens
-                and failed_cache_key not in self.failed_cache):
-            self.load_tokens([key], all=False if user_tokens else None)
-
-        if key in user_tokens:
-            return user_tokens[key]
-        # token not allowed for self.site.user() on self.site
-        self.failed_cache.add(failed_cache_key)
-        # to be changed back to a plain KeyError?
-        raise Error(
-            "Action '{}' is not allowed for user {} on {} wiki."
-            .format(key, self.site.user(), self.site))
+        return token

     def __contains__(self, key) -> bool:
-        """Return True if the given token name is cached."""
-        return key in self._tokens.setdefault(self.site.user(), {})
+        """Return True if the token name is cached for the current user."""
+        try:
+            self[key]
+        except KeyError:
+            return False
+        return True

     def __str__(self) -> str:
         """Return a str representation of the internal tokens dictionary."""
-        return self._tokens.__str__()
+        return str(self._tokens)

     def __repr__(self) -> str:
-        """Return a representation of the internal tokens dictionary."""
-        return self._tokens.__repr__()
+        """Return a representation of the TokenWallet.
+
+        >>> import pywikibot
+        >>> site = pywikibot.Site('wikipedia:test')
+        >>> repr(site.tokens)
+        "TokenWallet(pywikibot.Site('wikipedia:test'))"
+
+        .. versionchanged:: 8.0
+           Provide a string which looks like a valid Python expression.
+        """
+        user = f', user={self._currentuser!r}' if self._currentuser else ''
+        return (f'{type(self).__name__}'
+                f'(pywikibot.Site({self.site.sitename!r}{user}))')
+
+    def clear(self):
+        """Clear the self._tokens cache. Tokens are reloaded when needed.
+
+        .. versionadded:: 8.0
+        """
+        self._tokens.clear()
+
+    def update_tokens(self, tokens: List[str]) -> List[str]:
+        """Return a list of new tokens for a given list of tokens.
+
+        This method can be used if a token is outdated and has to be
+        renewed but the token type is unknown and we only have the old
+        token. It first gets the token names from all given tokens,
+        clears the cache and returns fresh new tokens of the found types.
+
+        **Usage:**
+
+        >>> import pywikibot
+        >>> site = pywikibot.Site()
+        >>> tokens = [site.tokens['csrf']]  # doctest: +SKIP
+        >>> new_tokens = site.tokens.update_tokens(tokens)  # doctest: +SKIP
+
+        .. code-block:: Python
+           :caption: An example for replacing request token parameters
+
+           r._params['token'] = r.site.tokens.update_tokens(r._params['token'])
+
+        .. versionadded:: 8.0
+        """
+        # find the token types
+        types = [key
+                 for key, value in self._tokens.items() for token in tokens
+                 if value == token]
+        self.clear()  # clear the cache
+        return [self[token_type] for token_type in types]
+
+    @deprecated('clear()', since='8.0.0')
+    def load_tokens(self, *args: Any, **kwargs: Any) -> None:
+        """Clear cache to lazy load tokens when needed.
+
+        .. deprecated:: 8.0
+           Use :meth:`clear` instead.
+        .. versionchanged:: 8.0
+           Clear the cache instead of loading tokens. All parameters are
+           ignored.
+        """
+        self.clear()
diff --git a/pywikibot/site/_upload.py b/pywikibot/site/_upload.py
index 0d23428..2dd2c32 100644
--- a/pywikibot/site/_upload.py
+++ b/pywikibot/site/_upload.py
@@ -171,7 +171,7 @@

         ignore_all_warnings = not callable(ignore_warnings) and ignore_warnings

-        token = self.site.tokens['edit']
+        token = self.site.tokens['csrf']
         result = None
         file_page_title = self.filepage.title(with_ns=False)
         file_size = None
diff --git a/scripts/change_pagelang.py b/scripts/change_pagelang.py
index 780007c..06817bf 100755
--- a/scripts/change_pagelang.py
+++ b/scripts/change_pagelang.py
@@ -60,11 +60,10 @@
         :param page: The page to update and save
         :type page: pywikibot.page.BasePage
         """
-        token = self.site.get_tokens(['csrf']).get('csrf')
         parameters = {'action': 'setpagelanguage',
                       'title': page.title(),
                       'lang': self.opt.setlang,
-                      'token': token}
+                      'token': self.site.tokens['csrf']}
         r = self.site.simple_request(**parameters)
         r.submit()
         pywikibot.info(f'<<lightpurple>>{page}<<default>>: Setting '
diff --git a/tests/aspects.py b/tests/aspects.py
index 8f47518..431a387 100644
--- a/tests/aspects.py
+++ b/tests/aspects.py
@@ -1404,7 +1404,8 @@
     """Test cases for deprecation function in the tools module."""

     _generic_match = re.compile(
-        r'.* is deprecated(?: for \d+ [^;]*)?(; use .* instead)?\.')
+        r'.* is deprecated(?: since release [\d.]+ [^;]*)?'
+        r'(; use .* instead)?\.')

     source_adjustment_skips = [
         unittest.case._AssertRaisesContext,
diff --git a/tests/token_tests.py b/tests/token_tests.py
index 5ba8cef..8accc51 100755
--- a/tests/token_tests.py
+++ b/tests/token_tests.py
@@ -5,15 +5,20 @@
 #
 # Distributed under the terms of the MIT license.
 #
+import unittest
 from contextlib import suppress

 from pywikibot.exceptions import APIError, Error
 from pywikibot.site import TokenWallet
-from pywikibot.tools import MediaWikiVersion
-from tests.aspects import DefaultSiteTestCase, TestCase, TestCaseBase, unittest
+from tests.aspects import (
+    DefaultSiteTestCase,
+    DeprecationTestCase,
+    TestCase,
+    TestCaseBase,
+)


-class TestSiteTokens(DefaultSiteTestCase):
+class TestSiteTokens(DeprecationTestCase, DefaultSiteTestCase):

     """Test cases for tokens in Site methods.

@@ -26,67 +31,22 @@

     login = True

-    def setUp(self):
-        """Store version."""
-        super().setUp()
-        self.mysite = self.get_site()
-        self._version = self.mysite.mw_version
-        self.orig_version = self.mysite.version
-
-    def tearDown(self):
-        """Restore version."""
-        super().tearDown()
-        self.mysite.version = self.orig_version
-
-    def _test_tokens(self, version, test_version, additional_token):
+    def test_tokens(self):
         """Test tokens."""
-        if version and (self._version < version
-                        or self._version < test_version):
-            raise unittest.SkipTest(
-                'Site {} version {} is too low for this tests.'
-                .format(self.mysite, self._version))
-
-        self.mysite.version = lambda: test_version
-        del self.mysite._mw_version_time  # remove cached mw_version
-
         redirected_tokens = ['edit', 'move', 'delete']
-        for ttype in redirected_tokens + ['patrol', additional_token]:
-            try:
-                token = self.mysite.tokens[ttype]
-            except Error as error_msg:
-                if self.mysite.validate_tokens([ttype]):
-                    pattern = ("Action '[a-z]+' is not allowed "
-                               'for user .* on .* wiki.')
-                else:
-                    pattern = "Requested token '[a-z]+' is invalid on .* wiki."
-
-                self.assertRegex(str(error_msg), pattern)
-
-            else:
-                self.assertIsInstance(token, str)
-                self.assertEqual(token, self.mysite.tokens[ttype])
-                # test __contains__
-                if test_version < '1.24wmf19':
-                    self.assertIn(ttype, self.mysite.tokens)
-                elif ttype in redirected_tokens:
-                    self.assertEqual(self.mysite.tokens[ttype],
-                                     self.mysite.tokens['csrf'])
-
-    def test_tokens_in_mw_123_124wmf18(self):
-        """Test ability to get page tokens."""
-        if MediaWikiVersion(self.orig_version()) >= '1.37wmf24':
-            self.skipTest('Site {} version {} is too new for this tests.'
-                          .format(self.mysite, self._version))
-        self._test_tokens('1.23', '1.24wmf18', 'deleteglobalaccount')
-
-    def test_tokens_in_mw_124wmf19(self):
-        """Test ability to get page tokens."""
-        self._test_tokens('1.24wmf19', '1.24wmf20', 'deleteglobalaccount')
+        for ttype in redirected_tokens + ['patrol', 'deleteglobalaccount']:
+            self.assertIsInstance(self.site.tokens[ttype], str)
+            self.assertIn(ttype, self.site.tokens)  # test __contains__
+            if ttype in redirected_tokens:
+                self.assertEqual(self.site.tokens[ttype],
+                                 self.site.tokens['csrf'])
+                self._do_test_warning_filename = False
+                self.assertDeprecationParts(f'Token {ttype!r}', "'csrf'")

     def test_invalid_token(self):
         """Test invalid token."""
-        with self.assertRaises(Error):
-            self.mysite.tokens['invalidtype']
+        with self.assertRaises(KeyError):
+            self.site.tokens['invalidtype']


 class TokenTestBase(TestCaseBase):
@@ -100,11 +60,11 @@
         ttype = self.token_type
         try:
             token = mysite.tokens[ttype]
-        except Error as error_msg:
+        except KeyError as error_msg:
             self.assertRegex(
                 str(error_msg),
-                "Action '[a-z]+' is not allowed for user .* on .* wiki.")
-            self.assertNotIn(self.token_type, self.site.tokens)
+                f'Invalid token {ttype!r} for user .+ on {mysite} wiki.')
+            self.assertNotIn(ttype, self.site.tokens)
             self.skipTest(error_msg)

         self.token = token

--
To view, visit https://gerrit.wikimedia.org/r/c/pywikibot/core/+/840669
To unsubscribe, or for help writing mail filters, visit 
https://gerrit.wikimedia.org/r/settings

Gerrit-Project: pywikibot/core
Gerrit-Branch: master
Gerrit-Change-Id: I12102055da723545f0f41408363cb45732b47967
Gerrit-Change-Number: 840669
Gerrit-PatchSet: 18
Gerrit-Owner: Xqt <[email protected]>
Gerrit-Reviewer: JJMC89 <[email protected]>
Gerrit-Reviewer: Mpaa <[email protected]>
Gerrit-Reviewer: Xqt <[email protected]>
Gerrit-Reviewer: jenkins-bot
Gerrit-MessageType: merged
_______________________________________________
Pywikibot-commits mailing list -- [email protected]
To unsubscribe send an email to [email protected]

Reply via email to