https://github.com/python/cpython/commit/ae1bdcafb85e70659239935c289d41c44da9758e
commit: ae1bdcafb85e70659239935c289d41c44da9758e
branch: 3.13
author: Miss Islington (bot) <31488909+miss-isling...@users.noreply.github.com>
committer: AA-Turner <9087854+aa-tur...@users.noreply.github.com>
date: 2024-07-30T04:08:35Z
summary:

[3.13] GH-121970: Extract ``audit_events`` into a new extension (GH-122325) 
(#122434)

files:
A Doc/tools/extensions/audit_events.py
M Doc/conf.py
M Doc/tools/extensions/pyspecific.py

diff --git a/Doc/conf.py b/Doc/conf.py
index 4841b69e380085..3860d146a27e85 100644
--- a/Doc/conf.py
+++ b/Doc/conf.py
@@ -20,6 +20,7 @@
 # ---------------------
 
 extensions = [
+    'audit_events',
     'c_annotations',
     'glossary_search',
     'lexers',
diff --git a/Doc/tools/extensions/audit_events.py 
b/Doc/tools/extensions/audit_events.py
new file mode 100644
index 00000000000000..d0f08522d21ea2
--- /dev/null
+++ b/Doc/tools/extensions/audit_events.py
@@ -0,0 +1,262 @@
+"""Support for documenting audit events."""
+
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING
+
+from docutils import nodes
+from sphinx.errors import NoUri
+from sphinx.locale import _ as sphinx_gettext
+from sphinx.transforms.post_transforms import SphinxPostTransform
+from sphinx.util import logging
+from sphinx.util.docutils import SphinxDirective
+
+if TYPE_CHECKING:
+    from collections.abc import Iterator
+
+    from sphinx.application import Sphinx
+    from sphinx.builders import Builder
+    from sphinx.environment import BuildEnvironment
+
+logger = logging.getLogger(__name__)
+
+# This list of sets are allowable synonyms for event argument names.
+# If two names are in the same set, they are treated as equal for the
+# purposes of warning. This won't help if the number of arguments is
+# different!
+_SYNONYMS = [
+    frozenset({"file", "path", "fd"}),
+]
+
+
+class AuditEvents:
+    def __init__(self) -> None:
+        self.events: dict[str, list[str]] = {}
+        self.sources: dict[str, list[tuple[str, str]]] = {}
+
+    def __iter__(self) -> Iterator[tuple[str, list[str], tuple[str, str]]]:
+        for name, args in self.events.items():
+            for source in self.sources[name]:
+                yield name, args, source
+
+    def add_event(
+        self, name, args: list[str], source: tuple[str, str]
+    ) -> None:
+        if name in self.events:
+            self._check_args_match(name, args)
+        else:
+            self.events[name] = args
+        self.sources.setdefault(name, []).append(source)
+
+    def _check_args_match(self, name: str, args: list[str]) -> None:
+        current_args = self.events[name]
+        msg = (
+            f"Mismatched arguments for audit-event {name}: "
+            f"{current_args!r} != {args!r}"
+        )
+        if current_args == args:
+            return
+        if len(current_args) != len(args):
+            logger.warning(msg)
+            return
+        for a1, a2 in zip(current_args, args, strict=False):
+            if a1 == a2:
+                continue
+            if any(a1 in s and a2 in s for s in _SYNONYMS):
+                continue
+            logger.warning(msg)
+            return
+
+    def id_for(self, name) -> str:
+        source_count = len(self.sources.get(name, ()))
+        name_clean = re.sub(r"\W", "_", name)
+        return f"audit_event_{name_clean}_{source_count}"
+
+    def rows(self) -> Iterator[tuple[str, list[str], list[tuple[str, str]]]]:
+        for name in sorted(self.events.keys()):
+            yield name, self.events[name], self.sources[name]
+
+
+def initialise_audit_events(app: Sphinx) -> None:
+    """Initialise the audit_events attribute on the environment."""
+    if not hasattr(app.env, "audit_events"):
+        app.env.audit_events = AuditEvents()
+
+
+def audit_events_purge(
+    app: Sphinx, env: BuildEnvironment, docname: str
+) -> None:
+    """This is to remove traces of removed documents from env.audit_events."""
+    fresh_audit_events = AuditEvents()
+    for name, args, (doc, target) in env.audit_events:
+        if doc != docname:
+            fresh_audit_events.add_event(name, args, (doc, target))
+
+
+def audit_events_merge(
+    app: Sphinx,
+    env: BuildEnvironment,
+    docnames: list[str],
+    other: BuildEnvironment,
+) -> None:
+    """In Sphinx parallel builds, this merges audit_events from 
subprocesses."""
+    for name, args, source in other.audit_events:
+        env.audit_events.add_event(name, args, source)
+
+
+class AuditEvent(SphinxDirective):
+    has_content = True
+    required_arguments = 1
+    optional_arguments = 2
+    final_argument_whitespace = True
+
+    _label = [
+        sphinx_gettext(
+            "Raises an :ref:`auditing event <auditing>` "
+            "{name} with no arguments."
+        ),
+        sphinx_gettext(
+            "Raises an :ref:`auditing event <auditing>` "
+            "{name} with argument {args}."
+        ),
+        sphinx_gettext(
+            "Raises an :ref:`auditing event <auditing>` "
+            "{name} with arguments {args}."
+        ),
+    ]
+
+    def run(self) -> list[nodes.paragraph]:
+        name = self.arguments[0]
+        if len(self.arguments) >= 2 and self.arguments[1]:
+            args = [
+                arg
+                for argument in self.arguments[1].strip("'\"").split(",")
+                if (arg := argument.strip())
+            ]
+        else:
+            args = []
+        ids = []
+        try:
+            target = self.arguments[2].strip("\"'")
+        except (IndexError, TypeError):
+            target = None
+        if not target:
+            target = self.env.audit_events.id_for(name)
+            ids.append(target)
+        self.env.audit_events.add_event(name, args, (self.env.docname, target))
+
+        node = nodes.paragraph("", classes=["audit-hook"], ids=ids)
+        self.set_source_info(node)
+        if self.content:
+            self.state.nested_parse(self.content, self.content_offset, node)
+        else:
+            num_args = min(2, len(args))
+            text = self._label[num_args].format(
+                name=f"``{name}``",
+                args=", ".join(f"``{a}``" for a in args),
+            )
+            parsed, messages = self.state.inline_text(text, self.lineno)
+            node += parsed
+            node += messages
+        return [node]
+
+
+class audit_event_list(nodes.General, nodes.Element):  # noqa: N801
+    pass
+
+
+class AuditEventListDirective(SphinxDirective):
+    def run(self) -> list[audit_event_list]:
+        return [audit_event_list()]
+
+
+class AuditEventListTransform(SphinxPostTransform):
+    default_priority = 500
+
+    def run(self) -> None:
+        if self.document.next_node(audit_event_list) is None:
+            return
+
+        table = self._make_table(self.app.builder, self.env.docname)
+        for node in self.document.findall(audit_event_list):
+            node.replace_self(table)
+
+    def _make_table(self, builder: Builder, docname: str) -> nodes.table:
+        table = nodes.table(cols=3)
+        group = nodes.tgroup(
+            "",
+            nodes.colspec(colwidth=30),
+            nodes.colspec(colwidth=55),
+            nodes.colspec(colwidth=15),
+            cols=3,
+        )
+        head = nodes.thead()
+        body = nodes.tbody()
+
+        table += group
+        group += head
+        group += body
+
+        head += nodes.row(
+            "",
+            nodes.entry("", nodes.paragraph("", "Audit event")),
+            nodes.entry("", nodes.paragraph("", "Arguments")),
+            nodes.entry("", nodes.paragraph("", "References")),
+        )
+
+        for name, args, sources in builder.env.audit_events.rows():
+            body += self._make_row(builder, docname, name, args, sources)
+
+        return table
+
+    @staticmethod
+    def _make_row(
+        builder: Builder,
+        docname: str,
+        name: str,
+        args: list[str],
+        sources: list[tuple[str, str]],
+    ) -> nodes.row:
+        row = nodes.row()
+        name_node = nodes.paragraph("", nodes.Text(name))
+        row += nodes.entry("", name_node)
+
+        args_node = nodes.paragraph()
+        for arg in args:
+            args_node += nodes.literal(arg, arg)
+            args_node += nodes.Text(", ")
+        if len(args_node.children) > 0:
+            args_node.children.pop()  # remove trailing comma
+        row += nodes.entry("", args_node)
+
+        backlinks_node = nodes.paragraph()
+        backlinks = enumerate(sorted(set(sources)), start=1)
+        for i, (doc, label) in backlinks:
+            if isinstance(label, str):
+                ref = nodes.reference("", f"[{i}]", internal=True)
+                try:
+                    target = (
+                        f"{builder.get_relative_uri(docname, doc)}#{label}"
+                    )
+                except NoUri:
+                    continue
+                else:
+                    ref["refuri"] = target
+                    backlinks_node += ref
+        row += nodes.entry("", backlinks_node)
+        return row
+
+
+def setup(app: Sphinx):
+    app.add_directive("audit-event", AuditEvent)
+    app.add_directive("audit-event-table", AuditEventListDirective)
+    app.add_post_transform(AuditEventListTransform)
+    app.connect("builder-inited", initialise_audit_events)
+    app.connect("env-purge-doc", audit_events_purge)
+    app.connect("env-merge-info", audit_events_merge)
+    return {
+        "version": "1.0",
+        "parallel_read_safe": True,
+        "parallel_write_safe": True,
+    }
diff --git a/Doc/tools/extensions/pyspecific.py 
b/Doc/tools/extensions/pyspecific.py
index 6ac4a574dbf21d..6138246ccb4a31 100644
--- a/Doc/tools/extensions/pyspecific.py
+++ b/Doc/tools/extensions/pyspecific.py
@@ -15,7 +15,6 @@
 from time import asctime
 from pprint import pformat
 
-import sphinx
 from docutils import nodes
 from docutils.io import StringOutput
 from docutils.parsers.rst import directives
@@ -24,7 +23,6 @@
 from sphinx.builders import Builder
 from sphinx.domains.changeset import VersionChange, versionlabels, 
versionlabel_classes
 from sphinx.domains.python import PyFunction, PyMethod, PyModule
-from sphinx.errors import NoUri
 from sphinx.locale import _ as sphinx_gettext
 from sphinx.util import logging
 from sphinx.util.docutils import SphinxDirective
@@ -184,142 +182,6 @@ def parse_platforms(self):
         return platforms
 
 
-# Support for documenting audit event
-
-def audit_events_purge(app, env, docname):
-    """This is to remove from env.all_audit_events old traces of removed
-    documents.
-    """
-    if not hasattr(env, 'all_audit_events'):
-        return
-    fresh_all_audit_events = {}
-    for name, event in env.all_audit_events.items():
-        event["source"] = [(d, t) for d, t in event["source"] if d != docname]
-        if event["source"]:
-            # Only keep audit_events that have at least one source.
-            fresh_all_audit_events[name] = event
-    env.all_audit_events = fresh_all_audit_events
-
-
-def audit_events_merge(app, env, docnames, other):
-    """In Sphinx parallel builds, this merges env.all_audit_events from
-    subprocesses.
-
-    all_audit_events is a dict of names, with values like:
-    {'source': [(docname, target), ...], 'args': args}
-    """
-    if not hasattr(other, 'all_audit_events'):
-        return
-    if not hasattr(env, 'all_audit_events'):
-        env.all_audit_events = {}
-    for name, value in other.all_audit_events.items():
-        if name in env.all_audit_events:
-            env.all_audit_events[name]["source"].extend(value["source"])
-        else:
-            env.all_audit_events[name] = value
-
-
-class AuditEvent(SphinxDirective):
-
-    has_content = True
-    required_arguments = 1
-    optional_arguments = 2
-    final_argument_whitespace = True
-
-    _label = [
-        sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with 
no arguments."),
-        sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with 
argument {args}."),
-        sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with 
arguments {args}."),
-    ]
-
-    @property
-    def logger(self):
-        cls = type(self)
-        return logging.getLogger(cls.__module__ + "." + cls.__name__)
-
-    def run(self):
-        name = self.arguments[0]
-        if len(self.arguments) >= 2 and self.arguments[1]:
-            args = (a.strip() for a in 
self.arguments[1].strip("'\"").split(","))
-            args = [a for a in args if a]
-        else:
-            args = []
-
-        label = self._label[min(2, len(args))]
-        text = label.format(name="``{}``".format(name),
-                            args=", ".join("``{}``".format(a) for a in args if 
a))
-
-        if not hasattr(self.env, 'all_audit_events'):
-            self.env.all_audit_events = {}
-
-        new_info = {
-            'source': [],
-            'args': args
-        }
-        info = self.env.all_audit_events.setdefault(name, new_info)
-        if info is not new_info:
-            if not self._do_args_match(info['args'], new_info['args']):
-                self.logger.warning(
-                    "Mismatched arguments for audit-event {}: {!r} != {!r}"
-                    .format(name, info['args'], new_info['args'])
-                )
-
-        ids = []
-        try:
-            target = self.arguments[2].strip("\"'")
-        except (IndexError, TypeError):
-            target = None
-        if not target:
-            target = "audit_event_{}_{}".format(
-                re.sub(r'\W', '_', name),
-                len(info['source']),
-            )
-            ids.append(target)
-
-        info['source'].append((self.env.docname, target))
-
-        pnode = nodes.paragraph(text, classes=["audit-hook"], ids=ids)
-        pnode.line = self.lineno
-        if self.content:
-            self.state.nested_parse(self.content, self.content_offset, pnode)
-        else:
-            n, m = self.state.inline_text(text, self.lineno)
-            pnode.extend(n + m)
-
-        return [pnode]
-
-    # This list of sets are allowable synonyms for event argument names.
-    # If two names are in the same set, they are treated as equal for the
-    # purposes of warning. This won't help if number of arguments is
-    # different!
-    _SYNONYMS = [
-        {"file", "path", "fd"},
-    ]
-
-    def _do_args_match(self, args1, args2):
-        if args1 == args2:
-            return True
-        if len(args1) != len(args2):
-            return False
-        for a1, a2 in zip(args1, args2):
-            if a1 == a2:
-                continue
-            if any(a1 in s and a2 in s for s in self._SYNONYMS):
-                continue
-            return False
-        return True
-
-
-class audit_event_list(nodes.General, nodes.Element):
-    pass
-
-
-class AuditEventListDirective(SphinxDirective):
-
-    def run(self):
-        return [audit_event_list('')]
-
-
 # Support for documenting decorators
 
 class PyDecoratorMixin(object):
@@ -583,70 +445,6 @@ def parse_monitoring_event(env, sig, signode):
     return sig
 
 
-def process_audit_events(app, doctree, fromdocname):
-    for node in doctree.findall(audit_event_list):
-        break
-    else:
-        return
-
-    env = app.builder.env
-
-    table = nodes.table(cols=3)
-    group = nodes.tgroup(
-        '',
-        nodes.colspec(colwidth=30),
-        nodes.colspec(colwidth=55),
-        nodes.colspec(colwidth=15),
-        cols=3,
-    )
-    head = nodes.thead()
-    body = nodes.tbody()
-
-    table += group
-    group += head
-    group += body
-
-    row = nodes.row()
-    row += nodes.entry('', nodes.paragraph('', nodes.Text('Audit event')))
-    row += nodes.entry('', nodes.paragraph('', nodes.Text('Arguments')))
-    row += nodes.entry('', nodes.paragraph('', nodes.Text('References')))
-    head += row
-
-    for name in sorted(getattr(env, "all_audit_events", ())):
-        audit_event = env.all_audit_events[name]
-
-        row = nodes.row()
-        node = nodes.paragraph('', nodes.Text(name))
-        row += nodes.entry('', node)
-
-        node = nodes.paragraph()
-        for i, a in enumerate(audit_event['args']):
-            if i:
-                node += nodes.Text(", ")
-            node += nodes.literal(a, nodes.Text(a))
-        row += nodes.entry('', node)
-
-        node = nodes.paragraph()
-        backlinks = enumerate(sorted(set(audit_event['source'])), start=1)
-        for i, (doc, label) in backlinks:
-            if isinstance(label, str):
-                ref = nodes.reference("", nodes.Text("[{}]".format(i)), 
internal=True)
-                try:
-                    ref['refuri'] = "{}#{}".format(
-                        app.builder.get_relative_uri(fromdocname, doc),
-                        label,
-                    )
-                except NoUri:
-                    continue
-                node += ref
-        row += nodes.entry('', node)
-
-        body += row
-
-    for node in doctree.findall(audit_event_list):
-        node.replace_self(table)
-
-
 def patch_pairindextypes(app, _env) -> None:
     """Remove all entries from ``pairindextypes`` before writing POT files.
 
@@ -676,8 +474,6 @@ def setup(app):
     app.add_role('gh', gh_issue_role)
     app.add_directive('impl-detail', ImplementationDetail)
     app.add_directive('availability', Availability)
-    app.add_directive('audit-event', AuditEvent)
-    app.add_directive('audit-event-table', AuditEventListDirective)
     app.add_directive('deprecated-removed', DeprecatedRemoved)
     app.add_builder(PydocTopicsBuilder)
     app.add_object_type('opcode', 'opcode', '%s (opcode)', 
parse_opcode_signature)
@@ -692,7 +488,4 @@ def setup(app):
     app.add_directive_to_domain('py', 'abstractmethod', PyAbstractMethod)
     app.add_directive('miscnews', MiscNews)
     app.connect('env-check-consistency', patch_pairindextypes)
-    app.connect('doctree-resolved', process_audit_events)
-    app.connect('env-merge-info', audit_events_merge)
-    app.connect('env-purge-doc', audit_events_purge)
     return {'version': '1.0', 'parallel_read_safe': True}

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to