https://github.com/python/cpython/commit/c9bc458d307f9b8a97ffb3e19f630b23744fa96c
commit: c9bc458d307f9b8a97ffb3e19f630b23744fa96c
branch: main
author: Ɓukasz Langa <luk...@langa.pl>
committer: pablogsal <pablog...@gmail.com>
date: 2025-04-23T18:22:29+01:00
summary:

gh-91048: Add ability to list all pending asyncio tasks in a process remotely 
(#132807)

files:
A Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst
M Lib/test/test_external_inspection.py
M Modules/_asynciomodule.c
M Modules/_testexternalinspection.c

diff --git a/Lib/test/test_external_inspection.py 
b/Lib/test/test_external_inspection.py
index 2ab48a4778be4d..9c0ee0248d20aa 100644
--- a/Lib/test/test_external_inspection.py
+++ b/Lib/test/test_external_inspection.py
@@ -3,7 +3,7 @@
 import textwrap
 import importlib
 import sys
-from test.support import os_helper, SHORT_TIMEOUT
+from test.support import os_helper, SHORT_TIMEOUT, busy_retry
 from test.support.script_helper import make_script
 
 import subprocess
@@ -14,6 +14,7 @@
     from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
     from _testexternalinspection import get_stack_trace
     from _testexternalinspection import get_async_stack_trace
+    from _testexternalinspection import get_all_awaited_by
 except ImportError:
     raise unittest.SkipTest(
         "Test only runs when _testexternalinspection is available")
@@ -349,6 +350,126 @@ async def main():
             ]
             self.assertEqual(stack_trace, expected_stack_trace)
 
+    @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
+                     "Test only runs on Linux and MacOS")
+    @unittest.skipIf(sys.platform == "linux" and not 
PROCESS_VM_READV_SUPPORTED,
+                     "Test only runs on Linux with process_vm_readv support")
+    def test_async_global_awaited_by(self):
+        script = textwrap.dedent("""\
+            import asyncio
+            import os
+            import random
+            import sys
+            from string import ascii_lowercase, digits
+            from test.support import socket_helper, SHORT_TIMEOUT
+
+            HOST = '127.0.0.1'
+            PORT = socket_helper.find_unused_port()
+            connections = 0
+
+            class EchoServerProtocol(asyncio.Protocol):
+                def connection_made(self, transport):
+                    global connections
+                    connections += 1
+                    self.transport = transport
+
+                def data_received(self, data):
+                    self.transport.write(data)
+                    self.transport.close()
+
+            async def echo_client(message):
+                reader, writer = await asyncio.open_connection(HOST, PORT)
+                writer.write(message.encode())
+                await writer.drain()
+
+                data = await reader.read(100)
+                assert message == data.decode()
+                writer.close()
+                await writer.wait_closed()
+                await asyncio.sleep(SHORT_TIMEOUT)
+
+            async def echo_client_spam(server):
+                async with asyncio.TaskGroup() as tg:
+                    while connections < 1000:
+                        msg = list(ascii_lowercase + digits)
+                        random.shuffle(msg)
+                        tg.create_task(echo_client("".join(msg)))
+                        await asyncio.sleep(0)
+                    # at least a 1000 tasks created
+                    fifo_path = sys.argv[1]
+                    with open(fifo_path, "w") as fifo:
+                        fifo.write("ready")
+                # at this point all client tasks completed without assertion 
errors
+                # let's wrap up the test
+                server.close()
+                await server.wait_closed()
+
+            async def main():
+                loop = asyncio.get_running_loop()
+                server = await loop.create_server(EchoServerProtocol, HOST, 
PORT)
+                async with server:
+                    async with asyncio.TaskGroup() as tg:
+                        tg.create_task(server.serve_forever(), name="server 
task")
+                        tg.create_task(echo_client_spam(server), name="echo 
client spam")
+
+            asyncio.run(main())
+            """)
+        stack_trace = None
+        with os_helper.temp_dir() as work_dir:
+            script_dir = os.path.join(work_dir, "script_pkg")
+            os.mkdir(script_dir)
+            fifo = f"{work_dir}/the_fifo"
+            os.mkfifo(fifo)
+            script_name = _make_test_script(script_dir, 'script', script)
+            try:
+                p = subprocess.Popen([sys.executable, script_name,  str(fifo)])
+                with open(fifo, "r") as fifo_file:
+                    response = fifo_file.read()
+                self.assertEqual(response, "ready")
+                for _ in busy_retry(SHORT_TIMEOUT):
+                    try:
+                        all_awaited_by = get_all_awaited_by(p.pid)
+                    except RuntimeError as re:
+                        # This call reads a linked list in another process with
+                        # no synchronization. That occasionally leads to 
invalid
+                        # reads. Here we avoid making the test flaky.
+                        msg = str(re)
+                        if msg.startswith("Task list appears corrupted"):
+                            continue
+                        elif msg.startswith("Invalid linked list structure 
reading remote memory"):
+                            continue
+                        elif msg.startswith("Unknown error reading memory"):
+                            continue
+                        elif msg.startswith("Unhandled frame owner"):
+                            continue
+                        raise  # Unrecognized exception, safest not to ignore 
it
+                    else:
+                        break
+                # expected: a list of two elements: 1 thread, 1 interp
+                self.assertEqual(len(all_awaited_by), 2)
+                # expected: a tuple with the thread ID and the awaited_by list
+                self.assertEqual(len(all_awaited_by[0]), 2)
+                # expected: no tasks in the fallback per-interp task list
+                self.assertEqual(all_awaited_by[1], (0, []))
+                entries = all_awaited_by[0][1]
+                # expected: at least 1000 pending tasks
+                self.assertGreaterEqual(len(entries), 1000)
+                # the first three tasks stem from the code structure
+                self.assertIn(('Task-1', []), entries)
+                self.assertIn(('server task', [[['main'], 'Task-1', []]]), 
entries)
+                self.assertIn(('echo client spam', [[['main'], 'Task-1', 
[]]]), entries)
+                # the final task will have some random number, but it should 
for
+                # sure be one of the echo client spam horde
+                self.assertEqual([[['echo_client_spam'], 'echo client spam', 
[[['main'], 'Task-1', []]]]], entries[-1][1])
+            except PermissionError:
+                self.skipTest(
+                    "Insufficient permissions to read the stack trace")
+            finally:
+                os.remove(fifo)
+                p.kill()
+                p.terminate()
+                p.wait(timeout=SHORT_TIMEOUT)
+
     @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux",
                      "Test only runs on Linux and MacOS")
     @unittest.skipIf(sys.platform == "linux" and not 
PROCESS_VM_READV_SUPPORTED,
diff --git 
a/Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst 
b/Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst
new file mode 100644
index 00000000000000..11171739a6c646
--- /dev/null
+++ b/Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst
@@ -0,0 +1,2 @@
+Add ability to externally inspect all pending asyncio tasks, even if no task
+is currently entered on the event loop.
diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c
index 134cfd8540557d..27e6e67e3c9386 100644
--- a/Modules/_asynciomodule.c
+++ b/Modules/_asynciomodule.c
@@ -105,11 +105,17 @@ typedef struct _Py_AsyncioModuleDebugOffsets {
         uint64_t task_is_task;
         uint64_t task_awaited_by_is_set;
         uint64_t task_coro;
+        uint64_t task_node;
     } asyncio_task_object;
+    struct _asyncio_interpreter_state {
+        uint64_t size;
+        uint64_t asyncio_tasks_head;
+    } asyncio_interpreter_state;
     struct _asyncio_thread_state {
         uint64_t size;
         uint64_t asyncio_running_loop;
         uint64_t asyncio_running_task;
+        uint64_t asyncio_tasks_head;
     } asyncio_thread_state;
 } Py_AsyncioModuleDebugOffsets;
 
@@ -121,11 +127,17 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, 
Py_AsyncioModuleDebugOffsets _AsyncioDebug)
            .task_is_task = offsetof(TaskObj, task_is_task),
            .task_awaited_by_is_set = offsetof(TaskObj, task_awaited_by_is_set),
            .task_coro = offsetof(TaskObj, task_coro),
+           .task_node = offsetof(TaskObj, task_node),
+       },
+       .asyncio_interpreter_state = {
+            .size = sizeof(PyInterpreterState),
+            .asyncio_tasks_head = offsetof(PyInterpreterState, 
asyncio_tasks_head),
        },
        .asyncio_thread_state = {
            .size = sizeof(_PyThreadStateImpl),
            .asyncio_running_loop = offsetof(_PyThreadStateImpl, 
asyncio_running_loop),
            .asyncio_running_task = offsetof(_PyThreadStateImpl, 
asyncio_running_task),
+           .asyncio_tasks_head = offsetof(_PyThreadStateImpl, 
asyncio_tasks_head),
        }};
 
 /* State of the _asyncio module */
diff --git a/Modules/_testexternalinspection.c 
b/Modules/_testexternalinspection.c
index 01fe700fa490f1..73d63df63261b3 100644
--- a/Modules/_testexternalinspection.c
+++ b/Modules/_testexternalinspection.c
@@ -54,6 +54,7 @@
 #include <internal/pycore_debug_offsets.h>  // _Py_DebugOffsets
 #include <internal/pycore_frame.h>          // FRAME_SUSPENDED_YIELD_FROM
 #include <internal/pycore_interpframe.h>    // FRAME_OWNED_BY_CSTACK
+#include <internal/pycore_llist.h>          // struct llist_node
 #include <internal/pycore_stackref.h>       // Py_TAG_BITS
 
 #ifndef HAVE_PROCESS_VM_READV
@@ -68,11 +69,17 @@ struct _Py_AsyncioModuleDebugOffsets {
         uint64_t task_is_task;
         uint64_t task_awaited_by_is_set;
         uint64_t task_coro;
+        uint64_t task_node;
     } asyncio_task_object;
+    struct _asyncio_interpreter_state {
+        uint64_t size;
+        uint64_t asyncio_tasks_head;
+    } asyncio_interpreter_state;
     struct _asyncio_thread_state {
         uint64_t size;
         uint64_t asyncio_running_loop;
         uint64_t asyncio_running_task;
+        uint64_t asyncio_tasks_head;
     } asyncio_thread_state;
 };
 
@@ -1464,6 +1471,259 @@ find_running_task(
     return 0;
 }
 
+static int
+append_awaited_by_for_thread(
+    int pid,
+    uintptr_t head_addr,
+    struct _Py_DebugOffsets *debug_offsets,
+    struct _Py_AsyncioModuleDebugOffsets *async_offsets,
+    PyObject *result
+) {
+    struct llist_node task_node;
+
+    if (0 > read_memory(
+                pid,
+                head_addr,
+                sizeof(task_node),
+                &task_node))
+    {
+        return -1;
+    }
+
+    size_t iteration_count = 0;
+    const size_t MAX_ITERATIONS = 2 << 15;  // A reasonable upper bound
+    while ((uintptr_t)task_node.next != head_addr) {
+        if (++iteration_count > MAX_ITERATIONS) {
+            PyErr_SetString(PyExc_RuntimeError, "Task list appears corrupted");
+            return -1;
+        }
+
+        if (task_node.next == NULL) {
+            PyErr_SetString(
+                PyExc_RuntimeError,
+                "Invalid linked list structure reading remote memory");
+            return -1;
+        }
+
+        uintptr_t task_addr = (uintptr_t)task_node.next
+            - async_offsets->asyncio_task_object.task_node;
+
+        PyObject *tn = parse_task_name(
+            pid,
+            debug_offsets,
+            async_offsets,
+            task_addr);
+        if (tn == NULL) {
+            return -1;
+        }
+
+        PyObject *current_awaited_by = PyList_New(0);
+        if (current_awaited_by == NULL) {
+            Py_DECREF(tn);
+            return -1;
+        }
+
+        PyObject *result_item = PyTuple_New(2);
+        if (result_item == NULL) {
+            Py_DECREF(tn);
+            Py_DECREF(current_awaited_by);
+            return -1;
+        }
+
+        PyTuple_SET_ITEM(result_item, 0, tn);  // steals ref
+        PyTuple_SET_ITEM(result_item, 1, current_awaited_by);  // steals ref
+        if (PyList_Append(result, result_item)) {
+            Py_DECREF(result_item);
+            return -1;
+        }
+        Py_DECREF(result_item);
+
+        if (parse_task_awaited_by(pid, debug_offsets, async_offsets,
+                                  task_addr, current_awaited_by))
+        {
+            return -1;
+        }
+
+        // onto the next one...
+        if (0 > read_memory(
+                    pid,
+                    (uintptr_t)task_node.next,
+                    sizeof(task_node),
+                    &task_node))
+        {
+            return -1;
+        }
+    }
+
+    return 0;
+}
+
+static int
+append_awaited_by(
+    int pid,
+    unsigned long tid,
+    uintptr_t head_addr,
+    struct _Py_DebugOffsets *debug_offsets,
+    struct _Py_AsyncioModuleDebugOffsets *async_offsets,
+    PyObject *result)
+{
+    PyObject *tid_py = PyLong_FromUnsignedLong(tid);
+    if (tid_py == NULL) {
+        return -1;
+    }
+
+    PyObject *result_item = PyTuple_New(2);
+    if (result_item == NULL) {
+        Py_DECREF(tid_py);
+        return -1;
+    }
+
+    PyObject* awaited_by_for_thread = PyList_New(0);
+    if (awaited_by_for_thread == NULL) {
+        Py_DECREF(tid_py);
+        Py_DECREF(result_item);
+        return -1;
+    }
+
+    PyTuple_SET_ITEM(result_item, 0, tid_py);  // steals ref
+    PyTuple_SET_ITEM(result_item, 1, awaited_by_for_thread);  // steals ref
+    if (PyList_Append(result, result_item)) {
+        Py_DECREF(result_item);
+        return -1;
+    }
+    Py_DECREF(result_item);
+
+    if (append_awaited_by_for_thread(
+            pid,
+            head_addr,
+            debug_offsets,
+            async_offsets,
+            awaited_by_for_thread))
+    {
+        return -1;
+    }
+
+    return 0;
+}
+
+static PyObject*
+get_all_awaited_by(PyObject* self, PyObject* args)
+{
+#if (!defined(__linux__) && !defined(__APPLE__)) || \
+    (defined(__linux__) && !HAVE_PROCESS_VM_READV)
+    PyErr_SetString(
+        PyExc_RuntimeError,
+        "get_all_awaited_by is not implemented on this platform");
+    return NULL;
+#endif
+
+    int pid;
+
+    if (!PyArg_ParseTuple(args, "i", &pid)) {
+        return NULL;
+    }
+
+    uintptr_t runtime_start_addr = get_py_runtime(pid);
+    if (runtime_start_addr == 0) {
+        if (!PyErr_Occurred()) {
+            PyErr_SetString(
+                PyExc_RuntimeError, "Failed to get .PyRuntime address");
+        }
+        return NULL;
+    }
+    struct _Py_DebugOffsets local_debug_offsets;
+
+    if (read_offsets(pid, &runtime_start_addr, &local_debug_offsets)) {
+        return NULL;
+    }
+
+    struct _Py_AsyncioModuleDebugOffsets local_async_debug;
+    if (read_async_debug(pid, &local_async_debug)) {
+        return NULL;
+    }
+
+    PyObject *result = PyList_New(0);
+    if (result == NULL) {
+        return NULL;
+    }
+
+    off_t interpreter_state_list_head =
+        local_debug_offsets.runtime_state.interpreters_head;
+
+    uintptr_t interpreter_state_addr;
+    if (0 > read_memory(
+                pid,
+                runtime_start_addr + interpreter_state_list_head,
+                sizeof(void*),
+                &interpreter_state_addr))
+    {
+        goto result_err;
+    }
+
+    uintptr_t thread_state_addr;
+    unsigned long tid = 0;
+    if (0 > read_memory(
+                pid,
+                interpreter_state_addr
+                + local_debug_offsets.interpreter_state.threads_head,
+                sizeof(void*),
+                &thread_state_addr))
+    {
+        goto result_err;
+    }
+
+    uintptr_t head_addr;
+    while (thread_state_addr != 0) {
+        if (0 > read_memory(
+                    pid,
+                    thread_state_addr
+                    + local_debug_offsets.thread_state.native_thread_id,
+                    sizeof(tid),
+                    &tid))
+        {
+            goto result_err;
+        }
+
+        head_addr = thread_state_addr
+            + local_async_debug.asyncio_thread_state.asyncio_tasks_head;
+
+        if (append_awaited_by(pid, tid, head_addr, &local_debug_offsets,
+                              &local_async_debug, result))
+        {
+            goto result_err;
+        }
+
+        if (0 > read_memory(
+                    pid,
+                    thread_state_addr + local_debug_offsets.thread_state.next,
+                    sizeof(void*),
+                    &thread_state_addr))
+        {
+            goto result_err;
+        }
+    }
+
+    head_addr = interpreter_state_addr
+        + local_async_debug.asyncio_interpreter_state.asyncio_tasks_head;
+
+    // On top of a per-thread task lists used by default by asyncio to avoid
+    // contention, there is also a fallback per-interpreter list of tasks;
+    // any tasks still pending when a thread is destroyed will be moved to the
+    // per-interpreter task list.  It's unlikely we'll find anything here, but
+    // interesting for debugging.
+    if (append_awaited_by(pid, 0, head_addr, &local_debug_offsets,
+                        &local_async_debug, result))
+    {
+        goto result_err;
+    }
+
+    return result;
+
+result_err:
+    Py_DECREF(result);
+    return NULL;
+}
+
 static PyObject*
 get_stack_trace(PyObject* self, PyObject* args)
 {
@@ -1686,6 +1946,8 @@ static PyMethodDef methods[] = {
         "Get the Python stack from a given PID"},
     {"get_async_stack_trace", get_async_stack_trace, METH_VARARGS,
         "Get the asyncio stack from a given PID"},
+    {"get_all_awaited_by", get_all_awaited_by, METH_VARARGS,
+        "Get all tasks and their awaited_by from a given PID"},
     {NULL, NULL, 0, NULL},
 };
 

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to