In python versions that support asyncio, this allows API consumers to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout and stderr parameters.
Bug: https://bugs.gentoo.org/649588 --- .../util/futures/asyncio/test_subprocess_exec.py | 30 ++++ pym/portage/util/futures/unix_events.py | 157 ++++++++++++++++++++- 2 files changed, 184 insertions(+), 3 deletions(-) diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py index d30f48c43..94984fc93 100644 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -2,6 +2,7 @@ # Distributed under the terms of the GNU General Public License v2 import os +import subprocess from portage.process import find_binary from portage.tests import TestCase @@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase): f.close() self._run_test(test) + + def testReadTransport(self): + """ + Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) which + requires an AbstractEventLoop.connect_read_pipe implementation + (and a ReadTransport implementation for it to return). + """ + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + args_tuple = (b'hello', b'world') + echo_binary = find_binary("echo") + self.assertNotEqual(echo_binary, None) + echo_binary = echo_binary.encode() + + def test(loop): + with open(os.devnull, 'rb', 0) as devnull: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + echo_binary, *args_tuple, + stdin=devnull, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) + + self.assertEqual( + tuple(loop.run_until_complete(proc.stdout.read()).split()), + args_tuple) + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + + self._run_test(test) diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index 1abc420e1..6ba0adff6 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -9,12 +9,18 @@ __all__ = ( try: from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher + from asyncio.transports import ReadTransport as _ReadTransport except ImportError: _AbstractChildWatcher = object _BaseSubprocessTransport = object + _ReadTransport = object +import errno +import fcntl import functools +import logging import os +import stat import subprocess from portage.util._eventloop.global_event_loop import ( @@ -81,6 +87,35 @@ class _PortageEventLoop(events.AbstractEventLoop): """ return asyncio.Task(coro, loop=self) + def connect_read_pipe(self, protocol_factory, pipe): + """ + Register read pipe in event loop. Set the pipe to non-blocking mode. + + @type protocol_factory: callable + @param protocol_factory: must instantiate object with Protocol interface + @type pipe: file + @param pipe: a pipe to read from + @rtype: asyncio.Future + @return: Return pair (transport, protocol), where transport supports the + ReadTransport interface. + """ + protocol = protocol_factory() + result = self.create_future() + waiter = self.create_future() + transport = self._make_read_pipe_transport(pipe, protocol, waiter=waiter) + + def waiter_callback(waiter): + try: + waiter.result() + except Exception as e: + transport.close() + result.set_exception(e) + else: + result.set_result((transport, protocol)) + + waiter.add_done_callback(waiter_callback) + return result + def subprocess_exec(self, protocol_factory, program, *args, **kwargs): """ Run subprocesses asynchronously using the subprocess module. @@ -104,9 +139,9 @@ class _PortageEventLoop(events.AbstractEventLoop): stdout = kwargs.pop('stdout', subprocess.PIPE) stderr = kwargs.pop('stderr', subprocess.PIPE) - if subprocess.PIPE in (stdin, stdout, stderr): - # Requires connect_read/write_pipe implementation, for example - # see asyncio.unix_events._UnixReadPipeTransport. + if stdin == subprocess.PIPE: + # Requires connect_write_pipe implementation, for example + # see asyncio.unix_events._UnixWritePipeTransport. raise NotImplementedError() universal_newlines = kwargs.pop('universal_newlines', False) @@ -131,6 +166,10 @@ class _PortageEventLoop(events.AbstractEventLoop): bufsize, **kwargs) return result + def _make_read_pipe_transport(self, pipe, protocol, waiter=None, + extra=None): + return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) + def _make_subprocess_transport(self, result, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): waiter = self.create_future() @@ -162,6 +201,118 @@ class _PortageEventLoop(events.AbstractEventLoop): result.set_exception(wait_transp.exception() or exception) +if hasattr(os, 'set_blocking'): + def _set_nonblocking(fd): + os.set_blocking(fd, False) +else: + def _set_nonblocking(fd): + flags = fcntl.fcntl(fd, fcntl.F_GETFL) + flags = flags | os.O_NONBLOCK + fcntl.fcntl(fd, fcntl.F_SETFL, flags) + + +class _UnixReadPipeTransport(_ReadTransport): + """ + This is identical to the standard library's private + asyncio.unix_events._UnixReadPipeTransport class, except that it + only calls public AbstractEventLoop methods. + """ + + max_size = 256 * 1024 # max bytes we read in one event loop iteration + + def __init__(self, loop, pipe, protocol, waiter=None, extra=None): + super().__init__(extra) + self._extra['pipe'] = pipe + self._loop = loop + self._pipe = pipe + self._fileno = pipe.fileno() + self._protocol = protocol + self._closing = False + + mode = os.fstat(self._fileno).st_mode + if not (stat.S_ISFIFO(mode) or + stat.S_ISSOCK(mode) or + stat.S_ISCHR(mode)): + self._pipe = None + self._fileno = None + self._protocol = None + raise ValueError("Pipe transport is for pipes/sockets only.") + + _set_nonblocking(self._fileno) + + self._loop.call_soon(self._protocol.connection_made, self) + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) + if waiter is not None: + # only wake up the waiter when connection_made() has been called + self._loop.call_soon( + lambda: None if waiter.cancelled() else waiter.set_result(None)) + + def _read_ready(self): + try: + data = os.read(self._fileno, self.max_size) + except (BlockingIOError, InterruptedError): + pass + except OSError as exc: + self._fatal_error(exc, 'Fatal read error on pipe transport') + else: + if data: + self._protocol.data_received(data) + else: + self._closing = True + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._protocol.eof_received) + self._loop.call_soon(self._call_connection_lost, None) + + def pause_reading(self): + self._loop.remove_reader(self._fileno) + + def resume_reading(self): + self._loop.add_reader(self._fileno, self._read_ready) + + def set_protocol(self, protocol): + self._protocol = protocol + + def get_protocol(self): + return self._protocol + + def is_closing(self): + return self._closing + + def close(self): + if not self._closing: + self._close(None) + + def _fatal_error(self, exc, message='Fatal error on pipe transport'): + # should be called by exception handler only + if (isinstance(exc, OSError) and exc.errno == errno.EIO): + if self._loop.get_debug(): + logging.debug("%r: %s", self, message, exc_info=True) + else: + self._loop.call_exception_handler({ + 'message': message, + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + self._close(exc) + + def _close(self, exc): + self._closing = True + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._call_connection_lost, exc) + + def _call_connection_lost(self, exc): + try: + self._protocol.connection_lost(exc) + finally: + self._pipe.close() + self._pipe = None + self._protocol = None + self._loop = None + + class _UnixSubprocessTransport(_BaseSubprocessTransport): """ This is identical to the standard library's private -- 2.13.6