https://github.com/python/cpython/commit/c9bc458d307f9b8a97ffb3e19f630b23744fa96c commit: c9bc458d307f9b8a97ffb3e19f630b23744fa96c branch: main author: Ćukasz Langa <luk...@langa.pl> committer: pablogsal <pablog...@gmail.com> date: 2025-04-23T18:22:29+01:00 summary:
gh-91048: Add ability to list all pending asyncio tasks in a process remotely (#132807) files: A Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst M Lib/test/test_external_inspection.py M Modules/_asynciomodule.c M Modules/_testexternalinspection.c diff --git a/Lib/test/test_external_inspection.py b/Lib/test/test_external_inspection.py index 2ab48a4778be4d..9c0ee0248d20aa 100644 --- a/Lib/test/test_external_inspection.py +++ b/Lib/test/test_external_inspection.py @@ -3,7 +3,7 @@ import textwrap import importlib import sys -from test.support import os_helper, SHORT_TIMEOUT +from test.support import os_helper, SHORT_TIMEOUT, busy_retry from test.support.script_helper import make_script import subprocess @@ -14,6 +14,7 @@ from _testexternalinspection import PROCESS_VM_READV_SUPPORTED from _testexternalinspection import get_stack_trace from _testexternalinspection import get_async_stack_trace + from _testexternalinspection import get_all_awaited_by except ImportError: raise unittest.SkipTest( "Test only runs when _testexternalinspection is available") @@ -349,6 +350,126 @@ async def main(): ] self.assertEqual(stack_trace, expected_stack_trace) + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", + "Test only runs on Linux and MacOS") + @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, + "Test only runs on Linux with process_vm_readv support") + def test_async_global_awaited_by(self): + script = textwrap.dedent("""\ + import asyncio + import os + import random + import sys + from string import ascii_lowercase, digits + from test.support import socket_helper, SHORT_TIMEOUT + + HOST = '127.0.0.1' + PORT = socket_helper.find_unused_port() + connections = 0 + + class EchoServerProtocol(asyncio.Protocol): + def connection_made(self, transport): + global connections + connections += 1 + self.transport = transport + + def data_received(self, data): + self.transport.write(data) + self.transport.close() + + async def echo_client(message): + reader, writer = await asyncio.open_connection(HOST, PORT) + writer.write(message.encode()) + await writer.drain() + + data = await reader.read(100) + assert message == data.decode() + writer.close() + await writer.wait_closed() + await asyncio.sleep(SHORT_TIMEOUT) + + async def echo_client_spam(server): + async with asyncio.TaskGroup() as tg: + while connections < 1000: + msg = list(ascii_lowercase + digits) + random.shuffle(msg) + tg.create_task(echo_client("".join(msg))) + await asyncio.sleep(0) + # at least a 1000 tasks created + fifo_path = sys.argv[1] + with open(fifo_path, "w") as fifo: + fifo.write("ready") + # at this point all client tasks completed without assertion errors + # let's wrap up the test + server.close() + await server.wait_closed() + + async def main(): + loop = asyncio.get_running_loop() + server = await loop.create_server(EchoServerProtocol, HOST, PORT) + async with server: + async with asyncio.TaskGroup() as tg: + tg.create_task(server.serve_forever(), name="server task") + tg.create_task(echo_client_spam(server), name="echo client spam") + + asyncio.run(main()) + """) + stack_trace = None + with os_helper.temp_dir() as work_dir: + script_dir = os.path.join(work_dir, "script_pkg") + os.mkdir(script_dir) + fifo = f"{work_dir}/the_fifo" + os.mkfifo(fifo) + script_name = _make_test_script(script_dir, 'script', script) + try: + p = subprocess.Popen([sys.executable, script_name, str(fifo)]) + with open(fifo, "r") as fifo_file: + response = fifo_file.read() + self.assertEqual(response, "ready") + for _ in busy_retry(SHORT_TIMEOUT): + try: + all_awaited_by = get_all_awaited_by(p.pid) + except RuntimeError as re: + # This call reads a linked list in another process with + # no synchronization. That occasionally leads to invalid + # reads. Here we avoid making the test flaky. + msg = str(re) + if msg.startswith("Task list appears corrupted"): + continue + elif msg.startswith("Invalid linked list structure reading remote memory"): + continue + elif msg.startswith("Unknown error reading memory"): + continue + elif msg.startswith("Unhandled frame owner"): + continue + raise # Unrecognized exception, safest not to ignore it + else: + break + # expected: a list of two elements: 1 thread, 1 interp + self.assertEqual(len(all_awaited_by), 2) + # expected: a tuple with the thread ID and the awaited_by list + self.assertEqual(len(all_awaited_by[0]), 2) + # expected: no tasks in the fallback per-interp task list + self.assertEqual(all_awaited_by[1], (0, [])) + entries = all_awaited_by[0][1] + # expected: at least 1000 pending tasks + self.assertGreaterEqual(len(entries), 1000) + # the first three tasks stem from the code structure + self.assertIn(('Task-1', []), entries) + self.assertIn(('server task', [[['main'], 'Task-1', []]]), entries) + self.assertIn(('echo client spam', [[['main'], 'Task-1', []]]), entries) + # the final task will have some random number, but it should for + # sure be one of the echo client spam horde + self.assertEqual([[['echo_client_spam'], 'echo client spam', [[['main'], 'Task-1', []]]]], entries[-1][1]) + except PermissionError: + self.skipTest( + "Insufficient permissions to read the stack trace") + finally: + os.remove(fifo) + p.kill() + p.terminate() + p.wait(timeout=SHORT_TIMEOUT) + @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS") @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, diff --git a/Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst b/Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst new file mode 100644 index 00000000000000..11171739a6c646 --- /dev/null +++ b/Misc/NEWS.d/next/Tests/2025-04-23-12-40-27.gh-issue-91048.WJQCdV.rst @@ -0,0 +1,2 @@ +Add ability to externally inspect all pending asyncio tasks, even if no task +is currently entered on the event loop. diff --git a/Modules/_asynciomodule.c b/Modules/_asynciomodule.c index 134cfd8540557d..27e6e67e3c9386 100644 --- a/Modules/_asynciomodule.c +++ b/Modules/_asynciomodule.c @@ -105,11 +105,17 @@ typedef struct _Py_AsyncioModuleDebugOffsets { uint64_t task_is_task; uint64_t task_awaited_by_is_set; uint64_t task_coro; + uint64_t task_node; } asyncio_task_object; + struct _asyncio_interpreter_state { + uint64_t size; + uint64_t asyncio_tasks_head; + } asyncio_interpreter_state; struct _asyncio_thread_state { uint64_t size; uint64_t asyncio_running_loop; uint64_t asyncio_running_task; + uint64_t asyncio_tasks_head; } asyncio_thread_state; } Py_AsyncioModuleDebugOffsets; @@ -121,11 +127,17 @@ GENERATE_DEBUG_SECTION(AsyncioDebug, Py_AsyncioModuleDebugOffsets _AsyncioDebug) .task_is_task = offsetof(TaskObj, task_is_task), .task_awaited_by_is_set = offsetof(TaskObj, task_awaited_by_is_set), .task_coro = offsetof(TaskObj, task_coro), + .task_node = offsetof(TaskObj, task_node), + }, + .asyncio_interpreter_state = { + .size = sizeof(PyInterpreterState), + .asyncio_tasks_head = offsetof(PyInterpreterState, asyncio_tasks_head), }, .asyncio_thread_state = { .size = sizeof(_PyThreadStateImpl), .asyncio_running_loop = offsetof(_PyThreadStateImpl, asyncio_running_loop), .asyncio_running_task = offsetof(_PyThreadStateImpl, asyncio_running_task), + .asyncio_tasks_head = offsetof(_PyThreadStateImpl, asyncio_tasks_head), }}; /* State of the _asyncio module */ diff --git a/Modules/_testexternalinspection.c b/Modules/_testexternalinspection.c index 01fe700fa490f1..73d63df63261b3 100644 --- a/Modules/_testexternalinspection.c +++ b/Modules/_testexternalinspection.c @@ -54,6 +54,7 @@ #include <internal/pycore_debug_offsets.h> // _Py_DebugOffsets #include <internal/pycore_frame.h> // FRAME_SUSPENDED_YIELD_FROM #include <internal/pycore_interpframe.h> // FRAME_OWNED_BY_CSTACK +#include <internal/pycore_llist.h> // struct llist_node #include <internal/pycore_stackref.h> // Py_TAG_BITS #ifndef HAVE_PROCESS_VM_READV @@ -68,11 +69,17 @@ struct _Py_AsyncioModuleDebugOffsets { uint64_t task_is_task; uint64_t task_awaited_by_is_set; uint64_t task_coro; + uint64_t task_node; } asyncio_task_object; + struct _asyncio_interpreter_state { + uint64_t size; + uint64_t asyncio_tasks_head; + } asyncio_interpreter_state; struct _asyncio_thread_state { uint64_t size; uint64_t asyncio_running_loop; uint64_t asyncio_running_task; + uint64_t asyncio_tasks_head; } asyncio_thread_state; }; @@ -1464,6 +1471,259 @@ find_running_task( return 0; } +static int +append_awaited_by_for_thread( + int pid, + uintptr_t head_addr, + struct _Py_DebugOffsets *debug_offsets, + struct _Py_AsyncioModuleDebugOffsets *async_offsets, + PyObject *result +) { + struct llist_node task_node; + + if (0 > read_memory( + pid, + head_addr, + sizeof(task_node), + &task_node)) + { + return -1; + } + + size_t iteration_count = 0; + const size_t MAX_ITERATIONS = 2 << 15; // A reasonable upper bound + while ((uintptr_t)task_node.next != head_addr) { + if (++iteration_count > MAX_ITERATIONS) { + PyErr_SetString(PyExc_RuntimeError, "Task list appears corrupted"); + return -1; + } + + if (task_node.next == NULL) { + PyErr_SetString( + PyExc_RuntimeError, + "Invalid linked list structure reading remote memory"); + return -1; + } + + uintptr_t task_addr = (uintptr_t)task_node.next + - async_offsets->asyncio_task_object.task_node; + + PyObject *tn = parse_task_name( + pid, + debug_offsets, + async_offsets, + task_addr); + if (tn == NULL) { + return -1; + } + + PyObject *current_awaited_by = PyList_New(0); + if (current_awaited_by == NULL) { + Py_DECREF(tn); + return -1; + } + + PyObject *result_item = PyTuple_New(2); + if (result_item == NULL) { + Py_DECREF(tn); + Py_DECREF(current_awaited_by); + return -1; + } + + PyTuple_SET_ITEM(result_item, 0, tn); // steals ref + PyTuple_SET_ITEM(result_item, 1, current_awaited_by); // steals ref + if (PyList_Append(result, result_item)) { + Py_DECREF(result_item); + return -1; + } + Py_DECREF(result_item); + + if (parse_task_awaited_by(pid, debug_offsets, async_offsets, + task_addr, current_awaited_by)) + { + return -1; + } + + // onto the next one... + if (0 > read_memory( + pid, + (uintptr_t)task_node.next, + sizeof(task_node), + &task_node)) + { + return -1; + } + } + + return 0; +} + +static int +append_awaited_by( + int pid, + unsigned long tid, + uintptr_t head_addr, + struct _Py_DebugOffsets *debug_offsets, + struct _Py_AsyncioModuleDebugOffsets *async_offsets, + PyObject *result) +{ + PyObject *tid_py = PyLong_FromUnsignedLong(tid); + if (tid_py == NULL) { + return -1; + } + + PyObject *result_item = PyTuple_New(2); + if (result_item == NULL) { + Py_DECREF(tid_py); + return -1; + } + + PyObject* awaited_by_for_thread = PyList_New(0); + if (awaited_by_for_thread == NULL) { + Py_DECREF(tid_py); + Py_DECREF(result_item); + return -1; + } + + PyTuple_SET_ITEM(result_item, 0, tid_py); // steals ref + PyTuple_SET_ITEM(result_item, 1, awaited_by_for_thread); // steals ref + if (PyList_Append(result, result_item)) { + Py_DECREF(result_item); + return -1; + } + Py_DECREF(result_item); + + if (append_awaited_by_for_thread( + pid, + head_addr, + debug_offsets, + async_offsets, + awaited_by_for_thread)) + { + return -1; + } + + return 0; +} + +static PyObject* +get_all_awaited_by(PyObject* self, PyObject* args) +{ +#if (!defined(__linux__) && !defined(__APPLE__)) || \ + (defined(__linux__) && !HAVE_PROCESS_VM_READV) + PyErr_SetString( + PyExc_RuntimeError, + "get_all_awaited_by is not implemented on this platform"); + return NULL; +#endif + + int pid; + + if (!PyArg_ParseTuple(args, "i", &pid)) { + return NULL; + } + + uintptr_t runtime_start_addr = get_py_runtime(pid); + if (runtime_start_addr == 0) { + if (!PyErr_Occurred()) { + PyErr_SetString( + PyExc_RuntimeError, "Failed to get .PyRuntime address"); + } + return NULL; + } + struct _Py_DebugOffsets local_debug_offsets; + + if (read_offsets(pid, &runtime_start_addr, &local_debug_offsets)) { + return NULL; + } + + struct _Py_AsyncioModuleDebugOffsets local_async_debug; + if (read_async_debug(pid, &local_async_debug)) { + return NULL; + } + + PyObject *result = PyList_New(0); + if (result == NULL) { + return NULL; + } + + off_t interpreter_state_list_head = + local_debug_offsets.runtime_state.interpreters_head; + + uintptr_t interpreter_state_addr; + if (0 > read_memory( + pid, + runtime_start_addr + interpreter_state_list_head, + sizeof(void*), + &interpreter_state_addr)) + { + goto result_err; + } + + uintptr_t thread_state_addr; + unsigned long tid = 0; + if (0 > read_memory( + pid, + interpreter_state_addr + + local_debug_offsets.interpreter_state.threads_head, + sizeof(void*), + &thread_state_addr)) + { + goto result_err; + } + + uintptr_t head_addr; + while (thread_state_addr != 0) { + if (0 > read_memory( + pid, + thread_state_addr + + local_debug_offsets.thread_state.native_thread_id, + sizeof(tid), + &tid)) + { + goto result_err; + } + + head_addr = thread_state_addr + + local_async_debug.asyncio_thread_state.asyncio_tasks_head; + + if (append_awaited_by(pid, tid, head_addr, &local_debug_offsets, + &local_async_debug, result)) + { + goto result_err; + } + + if (0 > read_memory( + pid, + thread_state_addr + local_debug_offsets.thread_state.next, + sizeof(void*), + &thread_state_addr)) + { + goto result_err; + } + } + + head_addr = interpreter_state_addr + + local_async_debug.asyncio_interpreter_state.asyncio_tasks_head; + + // On top of a per-thread task lists used by default by asyncio to avoid + // contention, there is also a fallback per-interpreter list of tasks; + // any tasks still pending when a thread is destroyed will be moved to the + // per-interpreter task list. It's unlikely we'll find anything here, but + // interesting for debugging. + if (append_awaited_by(pid, 0, head_addr, &local_debug_offsets, + &local_async_debug, result)) + { + goto result_err; + } + + return result; + +result_err: + Py_DECREF(result); + return NULL; +} + static PyObject* get_stack_trace(PyObject* self, PyObject* args) { @@ -1686,6 +1946,8 @@ static PyMethodDef methods[] = { "Get the Python stack from a given PID"}, {"get_async_stack_trace", get_async_stack_trace, METH_VARARGS, "Get the asyncio stack from a given PID"}, + {"get_all_awaited_by", get_all_awaited_by, METH_VARARGS, + "Get all tasks and their awaited_by from a given PID"}, {NULL, NULL, 0, NULL}, }; _______________________________________________ Python-checkins mailing list -- python-checkins@python.org To unsubscribe send an email to python-checkins-le...@python.org https://mail.python.org/mailman3/lists/python-checkins.python.org/ Member address: arch...@mail-archive.com