commit:     3cc986f87ddda86ee93770e03cca06346aee54c5
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Feb 23 06:06:14 2024 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Feb 23 06:48:29 2024 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=3cc986f8

AsyncioEventLoop: Call process.run_exitfuncs() before close

For the event loop running in the main thread, call
process.run_exitfuncs() before close with the event loop
running so that anything attached can clean itself up (like
the socks5 ProxyManager for bug 925240). This is necessary
because process.spawn uses the event loop to implement the
new returnproc parameter related to bug 916566.

Bug: https://bugs.gentoo.org/916566
Bug: https://bugs.gentoo.org/925240
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/util/test_socks5.py             | 51 ++++++++++++++++++++++-
 lib/portage/util/_eventloop/asyncio_event_loop.py | 44 +++++++++++++++----
 lib/portage/util/socks5.py                        | 16 ++++++-
 3 files changed, 101 insertions(+), 10 deletions(-)

diff --git a/lib/portage/tests/util/test_socks5.py 
b/lib/portage/tests/util/test_socks5.py
index 7b33cb3f6b..4a6d08169d 100644
--- a/lib/portage/tests/util/test_socks5.py
+++ b/lib/portage/tests/util/test_socks5.py
@@ -12,6 +12,8 @@ import time
 import portage
 from portage.tests import TestCase
 from portage.util import socks5
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util._eventloop.global_event_loop import global_event_loop
 from portage.const import PORTAGE_BIN_PATH
 
 from http.server import BaseHTTPRequestHandler, HTTPServer
@@ -189,8 +191,10 @@ class Socks5ServerTestCase(TestCase):
         path = "/index.html"
         proxy = None
         tempdir = tempfile.mkdtemp()
+        previous_exithandlers = portage.process._exithandlers
 
         try:
+            portage.process._exithandlers = []
             with AsyncHTTPServer(host, {path: content}, loop) as server:
                 settings = {
                     "PORTAGE_TMPDIR": tempdir,
@@ -211,5 +215,50 @@ class Socks5ServerTestCase(TestCase):
 
                 self.assertEqual(result, content)
         finally:
-            await socks5.proxy.stop()
+            try:
+                # Also run_exitfuncs to test atexit hook cleanup.
+                await socks5.proxy.stop()
+                self.assertNotEqual(portage.process._exithandlers, [])
+                portage.process.run_exitfuncs()
+                self.assertEqual(portage.process._exithandlers, [])
+            finally:
+                portage.process._exithandlers = previous_exithandlers
+                shutil.rmtree(tempdir)
+
+
+class Socks5ServerLoopCloseTestCase(TestCase):
+    """
+    For bug 925240, test that the socks5 proxy is automatically
+    terminated when the main event loop is closed, using a subprocess
+    for isolation.
+    """
+
+    def testSocks5ServerLoopClose(self):
+        asyncio.run(self._testSocks5ServerLoopClose())
+
+    async def _testSocks5ServerLoopClose(self):
+        loop = asyncio.get_running_loop()
+        self.assertEqual(
+            await loop.run_in_executor(
+                ForkExecutor(loop=loop), 
self._testSocks5ServerLoopCloseSubprocess
+            ),
+            True,
+        )
+
+    @staticmethod
+    def _testSocks5ServerLoopCloseSubprocess():
+        loop = global_event_loop()
+        tempdir = tempfile.mkdtemp()
+        try:
+            settings = {
+                "PORTAGE_TMPDIR": tempdir,
+                "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH,
+            }
+
+            socks5.get_socks5_proxy(settings)
+            loop.run_until_complete(socks5.proxy.ready())
+        finally:
+            loop.close()
             shutil.rmtree(tempdir)
+
+        return not socks5.proxy.is_running()

diff --git a/lib/portage/util/_eventloop/asyncio_event_loop.py 
b/lib/portage/util/_eventloop/asyncio_event_loop.py
index b9e96bb20e..ee9e4c60ef 100644
--- a/lib/portage/util/_eventloop/asyncio_event_loop.py
+++ b/lib/portage/util/_eventloop/asyncio_event_loop.py
@@ -1,8 +1,9 @@
-# Copyright 2018-2023 Gentoo Authors
+# Copyright 2018-2024 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import os
 import signal
+import threading
 
 import asyncio as _real_asyncio
 from asyncio.events import AbstractEventLoop as _AbstractEventLoop
@@ -14,6 +15,7 @@ except ImportError:
     PidfdChildWatcher = None
 
 import portage
+from portage.util import socks5
 
 
 class AsyncioEventLoop(_AbstractEventLoop):
@@ -25,18 +27,14 @@ class AsyncioEventLoop(_AbstractEventLoop):
     def __init__(self, loop=None):
         loop = loop or _real_asyncio.get_event_loop()
         self._loop = loop
-        self.run_until_complete = (
-            self._run_until_complete
-            if portage._internal_caller
-            else loop.run_until_complete
-        )
+        self.run_until_complete = self._run_until_complete
         self.call_soon = loop.call_soon
         self.call_soon_threadsafe = loop.call_soon_threadsafe
         self.call_later = loop.call_later
         self.call_at = loop.call_at
         self.is_running = loop.is_running
         self.is_closed = loop.is_closed
-        self.close = loop.close
+        self.close = self._close
         self.create_future = (
             loop.create_future
             if hasattr(loop, "create_future")
@@ -55,10 +53,36 @@ class AsyncioEventLoop(_AbstractEventLoop):
         self.get_debug = loop.get_debug
         self._wakeup_fd = -1
         self._child_watcher = None
+        # Used to drop recursive calls to _close.
+        self._closing = False
+        # Initialized in _run_until_complete.
+        self._is_main = None
 
         if portage._internal_caller:
             loop.set_exception_handler(self._internal_caller_exception_handler)
 
+    def _close(self):
+        """
+        Before closing the main loop, run portage.process.run_exitfuncs()
+        with the event loop running so that anything attached can clean
+        itself up (like the socks5 ProxyManager for bug 925240).
+        """
+        if not (self._closing or self.is_closed()):
+            self._closing = True
+            if self._is_main:
+                self.run_until_complete(self._close_main())
+            self._loop.close()
+            self._closing = False
+
+    async def _close_main(self):
+        # Even though this has an exit hook, invoke it here so that
+        # we can properly wait for it and avoid messages like this:
+        # [ERROR] Task was destroyed but it is pending!
+        if socks5.proxy.is_running():
+            await socks5.proxy.stop()
+
+        portage.process.run_exitfuncs()
+
     @staticmethod
     def _internal_caller_exception_handler(loop, context):
         """
@@ -139,6 +163,12 @@ class AsyncioEventLoop(_AbstractEventLoop):
         In order to avoid potential interference with API consumers, this
         implementation is only used when portage._internal_caller is True.
         """
+        if self._is_main is None:
+            self._is_main = threading.current_thread() is 
threading.main_thread()
+
+        if not portage._internal_caller:
+            return self._loop.run_until_complete(future)
+
         if self._wakeup_fd != -1:
             signal.set_wakeup_fd(self._wakeup_fd)
             self._wakeup_fd = -1

diff --git a/lib/portage/util/socks5.py b/lib/portage/util/socks5.py
index 74592aeefe..f8fcdf9fca 100644
--- a/lib/portage/util/socks5.py
+++ b/lib/portage/util/socks5.py
@@ -8,6 +8,13 @@ import os
 import socket
 from typing import Union
 
+import portage
+
+portage.proxy.lazyimport.lazyimport(
+    globals(),
+    "portage.util._eventloop.global_event_loop:global_event_loop",
+)
+
 import portage.data
 from portage import _python_interpreter
 from portage.data import portage_gid, portage_uid, userpriv_groups
@@ -74,10 +81,15 @@ class ProxyManager:
         if self._proc is not None:
             self._proc.terminate()
             if loop is None:
-                asyncio.run(self._proc.wait())
+                # In this case spawn internals would have used
+                # portage's global loop when attaching a waiter to
+                # self._proc, so we are obligated to use that.
+                global_event_loop().run_until_complete(self._proc.wait())
             else:
                 if self._proc_waiter is None:
-                    self._proc_waiter = 
asyncio.ensure_future(self._proc.wait(), loop)
+                    self._proc_waiter = asyncio.ensure_future(
+                        self._proc.wait(), loop=loop
+                    )
                 future = asyncio.shield(self._proc_waiter)
 
         if loop is not None and future is None:

Reply via email to