commit: 3cc986f87ddda86ee93770e03cca06346aee54c5 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Fri Feb 23 06:06:14 2024 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Fri Feb 23 06:48:29 2024 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=3cc986f8
AsyncioEventLoop: Call process.run_exitfuncs() before close For the event loop running in the main thread, call process.run_exitfuncs() before close with the event loop running so that anything attached can clean itself up (like the socks5 ProxyManager for bug 925240). This is necessary because process.spawn uses the event loop to implement the new returnproc parameter related to bug 916566. Bug: https://bugs.gentoo.org/916566 Bug: https://bugs.gentoo.org/925240 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/portage/tests/util/test_socks5.py | 51 ++++++++++++++++++++++- lib/portage/util/_eventloop/asyncio_event_loop.py | 44 +++++++++++++++---- lib/portage/util/socks5.py | 16 ++++++- 3 files changed, 101 insertions(+), 10 deletions(-) diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py index 7b33cb3f6b..4a6d08169d 100644 --- a/lib/portage/tests/util/test_socks5.py +++ b/lib/portage/tests/util/test_socks5.py @@ -12,6 +12,8 @@ import time import portage from portage.tests import TestCase from portage.util import socks5 +from portage.util.futures.executor.fork import ForkExecutor +from portage.util._eventloop.global_event_loop import global_event_loop from portage.const import PORTAGE_BIN_PATH from http.server import BaseHTTPRequestHandler, HTTPServer @@ -189,8 +191,10 @@ class Socks5ServerTestCase(TestCase): path = "/index.html" proxy = None tempdir = tempfile.mkdtemp() + previous_exithandlers = portage.process._exithandlers try: + portage.process._exithandlers = [] with AsyncHTTPServer(host, {path: content}, loop) as server: settings = { "PORTAGE_TMPDIR": tempdir, @@ -211,5 +215,50 @@ class Socks5ServerTestCase(TestCase): self.assertEqual(result, content) finally: - await socks5.proxy.stop() + try: + # Also run_exitfuncs to test atexit hook cleanup. + await socks5.proxy.stop() + self.assertNotEqual(portage.process._exithandlers, []) + portage.process.run_exitfuncs() + self.assertEqual(portage.process._exithandlers, []) + finally: + portage.process._exithandlers = previous_exithandlers + shutil.rmtree(tempdir) + + +class Socks5ServerLoopCloseTestCase(TestCase): + """ + For bug 925240, test that the socks5 proxy is automatically + terminated when the main event loop is closed, using a subprocess + for isolation. + """ + + def testSocks5ServerLoopClose(self): + asyncio.run(self._testSocks5ServerLoopClose()) + + async def _testSocks5ServerLoopClose(self): + loop = asyncio.get_running_loop() + self.assertEqual( + await loop.run_in_executor( + ForkExecutor(loop=loop), self._testSocks5ServerLoopCloseSubprocess + ), + True, + ) + + @staticmethod + def _testSocks5ServerLoopCloseSubprocess(): + loop = global_event_loop() + tempdir = tempfile.mkdtemp() + try: + settings = { + "PORTAGE_TMPDIR": tempdir, + "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH, + } + + socks5.get_socks5_proxy(settings) + loop.run_until_complete(socks5.proxy.ready()) + finally: + loop.close() shutil.rmtree(tempdir) + + return not socks5.proxy.is_running() diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py b/lib/portage/util/_eventloop/asyncio_event_loop.py index b9e96bb20e..ee9e4c60ef 100644 --- a/lib/portage/util/_eventloop/asyncio_event_loop.py +++ b/lib/portage/util/_eventloop/asyncio_event_loop.py @@ -1,8 +1,9 @@ -# Copyright 2018-2023 Gentoo Authors +# Copyright 2018-2024 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 import os import signal +import threading import asyncio as _real_asyncio from asyncio.events import AbstractEventLoop as _AbstractEventLoop @@ -14,6 +15,7 @@ except ImportError: PidfdChildWatcher = None import portage +from portage.util import socks5 class AsyncioEventLoop(_AbstractEventLoop): @@ -25,18 +27,14 @@ class AsyncioEventLoop(_AbstractEventLoop): def __init__(self, loop=None): loop = loop or _real_asyncio.get_event_loop() self._loop = loop - self.run_until_complete = ( - self._run_until_complete - if portage._internal_caller - else loop.run_until_complete - ) + self.run_until_complete = self._run_until_complete self.call_soon = loop.call_soon self.call_soon_threadsafe = loop.call_soon_threadsafe self.call_later = loop.call_later self.call_at = loop.call_at self.is_running = loop.is_running self.is_closed = loop.is_closed - self.close = loop.close + self.close = self._close self.create_future = ( loop.create_future if hasattr(loop, "create_future") @@ -55,10 +53,36 @@ class AsyncioEventLoop(_AbstractEventLoop): self.get_debug = loop.get_debug self._wakeup_fd = -1 self._child_watcher = None + # Used to drop recursive calls to _close. + self._closing = False + # Initialized in _run_until_complete. + self._is_main = None if portage._internal_caller: loop.set_exception_handler(self._internal_caller_exception_handler) + def _close(self): + """ + Before closing the main loop, run portage.process.run_exitfuncs() + with the event loop running so that anything attached can clean + itself up (like the socks5 ProxyManager for bug 925240). + """ + if not (self._closing or self.is_closed()): + self._closing = True + if self._is_main: + self.run_until_complete(self._close_main()) + self._loop.close() + self._closing = False + + async def _close_main(self): + # Even though this has an exit hook, invoke it here so that + # we can properly wait for it and avoid messages like this: + # [ERROR] Task was destroyed but it is pending! + if socks5.proxy.is_running(): + await socks5.proxy.stop() + + portage.process.run_exitfuncs() + @staticmethod def _internal_caller_exception_handler(loop, context): """ @@ -139,6 +163,12 @@ class AsyncioEventLoop(_AbstractEventLoop): In order to avoid potential interference with API consumers, this implementation is only used when portage._internal_caller is True. """ + if self._is_main is None: + self._is_main = threading.current_thread() is threading.main_thread() + + if not portage._internal_caller: + return self._loop.run_until_complete(future) + if self._wakeup_fd != -1: signal.set_wakeup_fd(self._wakeup_fd) self._wakeup_fd = -1 diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py index 74592aeefe..f8fcdf9fca 100644 --- a/lib/portage/util/socks5.py +++ b/lib/portage/util/socks5.py @@ -8,6 +8,13 @@ import os import socket from typing import Union +import portage + +portage.proxy.lazyimport.lazyimport( + globals(), + "portage.util._eventloop.global_event_loop:global_event_loop", +) + import portage.data from portage import _python_interpreter from portage.data import portage_gid, portage_uid, userpriv_groups @@ -74,10 +81,15 @@ class ProxyManager: if self._proc is not None: self._proc.terminate() if loop is None: - asyncio.run(self._proc.wait()) + # In this case spawn internals would have used + # portage's global loop when attaching a waiter to + # self._proc, so we are obligated to use that. + global_event_loop().run_until_complete(self._proc.wait()) else: if self._proc_waiter is None: - self._proc_waiter = asyncio.ensure_future(self._proc.wait(), loop) + self._proc_waiter = asyncio.ensure_future( + self._proc.wait(), loop=loop + ) future = asyncio.shield(self._proc_waiter) if loop is not None and future is None: