commit:     72914c0dbae1dfab7565a627451b616e330b8889
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Apr 15 10:11:21 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Apr 17 02:19:57 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=72914c0d

Implement AbstractEventLoop.connect_write_pipe (bug 649588)

In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdin
parameters.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   |  34 +++
 pym/portage/util/futures/transports.py             |  90 +++++++
 pym/portage/util/futures/unix_events.py            | 259 ++++++++++++++++++++-
 3 files changed, 373 insertions(+), 10 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index 94984fc93..8c8c395ca 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -191,3 +191,37 @@ class SubprocessExecTestCase(TestCase):
                        self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
 
                self._run_test(test)
+
+       def testWriteTransport(self):
+               """
+               Test asyncio.create_subprocess_exec(stdin=subprocess.PIPE) which
+               requires an AbstractEventLoop.connect_write_pipe implementation
+               (and a WriteTransport implementation for it to return).
+               """
+               if not hasattr(asyncio, 'create_subprocess_exec'):
+                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+               stdin_data = b'hello world'
+               cat_binary = find_binary("cat")
+               self.assertNotEqual(cat_binary, None)
+               cat_binary = cat_binary.encode()
+
+               def test(loop):
+                       proc = loop.run_until_complete(
+                               asyncio.create_subprocess_exec(
+                               cat_binary,
+                               stdin=subprocess.PIPE,
+                               stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+
+                       # This buffers data when necessary to avoid blocking.
+                       proc.stdin.write(stdin_data)
+                       # Any buffered data is written asynchronously after the
+                       # close method is called.
+                       proc.stdin.close()
+
+                       self.assertEqual(
+                               loop.run_until_complete(proc.stdout.read()),
+                               stdin_data)
+                       self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+
+               self._run_test(test)

diff --git a/pym/portage/util/futures/transports.py 
b/pym/portage/util/futures/transports.py
new file mode 100644
index 000000000..60ea93073
--- /dev/null
+++ b/pym/portage/util/futures/transports.py
@@ -0,0 +1,90 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+       from asyncio.transports import Transport as _Transport
+except ImportError:
+       _Transport = object
+
+
+class _FlowControlMixin(_Transport):
+       """
+       This is identical to the standard library's private
+       asyncio.transports._FlowControlMixin class.
+
+       All the logic for (write) flow control in a mix-in base class.
+
+       The subclass must implement get_write_buffer_size().  It must call
+       _maybe_pause_protocol() whenever the write buffer size increases,
+       and _maybe_resume_protocol() whenever it decreases.  It may also
+       override set_write_buffer_limits() (e.g. to specify different
+       defaults).
+
+       The subclass constructor must call super().__init__(extra).  This
+       will call set_write_buffer_limits().
+
+       The user may call set_write_buffer_limits() and
+       get_write_buffer_size(), and their protocol's pause_writing() and
+       resume_writing() may be called.
+       """
+
+       def __init__(self, extra=None, loop=None):
+               super().__init__(extra)
+               assert loop is not None
+               self._loop = loop
+               self._protocol_paused = False
+               self._set_write_buffer_limits()
+
+       def _maybe_pause_protocol(self):
+               size = self.get_write_buffer_size()
+               if size <= self._high_water:
+                       return
+               if not self._protocol_paused:
+                       self._protocol_paused = True
+                       try:
+                               self._protocol.pause_writing()
+                       except Exception as exc:
+                               self._loop.call_exception_handler({
+                                       'message': 'protocol.pause_writing() 
failed',
+                                       'exception': exc,
+                                       'transport': self,
+                                       'protocol': self._protocol,
+                               })
+
+       def _maybe_resume_protocol(self):
+               if (self._protocol_paused and
+                       self.get_write_buffer_size() <= self._low_water):
+                       self._protocol_paused = False
+                       try:
+                               self._protocol.resume_writing()
+                       except Exception as exc:
+                               self._loop.call_exception_handler({
+                                       'message': 'protocol.resume_writing() 
failed',
+                                       'exception': exc,
+                                       'transport': self,
+                                       'protocol': self._protocol,
+                               })
+
+       def get_write_buffer_limits(self):
+               return (self._low_water, self._high_water)
+
+       def _set_write_buffer_limits(self, high=None, low=None):
+               if high is None:
+                       if low is None:
+                               high = 64*1024
+                       else:
+                               high = 4*low
+               if low is None:
+                       low = high // 4
+               if not high >= low >= 0:
+                       raise ValueError('high (%r) must be >= low (%r) must be 
>= 0' %
+                                                        (high, low))
+               self._high_water = high
+               self._low_water = low
+
+       def set_write_buffer_limits(self, high=None, low=None):
+               self._set_write_buffer_limits(high=high, low=low)
+               self._maybe_pause_protocol()
+
+       def get_write_buffer_size(self):
+               raise NotImplementedError

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index d69b13718..1a86ed439 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,19 +9,25 @@ __all__ = (
 try:
        from asyncio.base_subprocess import BaseSubprocessTransport as 
_BaseSubprocessTransport
        from asyncio.unix_events import AbstractChildWatcher as 
_AbstractChildWatcher
-       from asyncio.transports import ReadTransport as _ReadTransport
+       from asyncio.transports import (
+               ReadTransport as _ReadTransport,
+               WriteTransport as _WriteTransport,
+       )
 except ImportError:
        _AbstractChildWatcher = object
        _BaseSubprocessTransport = object
        _ReadTransport = object
+       _WriteTransport = object
 
 import errno
 import fcntl
 import functools
 import logging
 import os
+import socket
 import stat
 import subprocess
+import sys
 
 from portage.util._eventloop.global_event_loop import (
        global_event_loop as _global_event_loop,
@@ -31,6 +37,8 @@ from portage.util.futures import (
        events,
 )
 
+from portage.util.futures.transports import _FlowControlMixin
+
 
 class _PortageEventLoop(events.AbstractEventLoop):
        """
@@ -104,6 +112,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
                waiter.add_done_callback(waiter_callback)
                return result
 
+       def connect_write_pipe(self, protocol_factory, pipe):
+               """
+               Register write pipe in event loop. Set the pipe to non-blocking 
mode.
+
+               @type protocol_factory: callable
+               @param protocol_factory: must instantiate object with Protocol 
interface
+               @type pipe: file
+               @param pipe: a pipe to write to
+               @rtype: asyncio.Future
+               @return: Return pair (transport, protocol), where transport 
supports the
+                       WriteTransport interface.
+               """
+               protocol = protocol_factory()
+               result = self.create_future()
+               waiter = self.create_future()
+               transport = self._make_write_pipe_transport(pipe, protocol, 
waiter)
+
+               def waiter_callback(waiter):
+                       try:
+                               waiter.result()
+                       except Exception as e:
+                               transport.close()
+                               result.set_exception(e)
+                       else:
+                               result.set_result((transport, protocol))
+
+               waiter.add_done_callback(waiter_callback)
+               return result
+
        def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
                """
                Run subprocesses asynchronously using the subprocess module.
@@ -127,11 +164,6 @@ class _PortageEventLoop(events.AbstractEventLoop):
                stdout = kwargs.pop('stdout', subprocess.PIPE)
                stderr = kwargs.pop('stderr', subprocess.PIPE)
 
-               if stdin == subprocess.PIPE:
-                       # Requires connect_write_pipe implementation, for 
example
-                       # see asyncio.unix_events._UnixWritePipeTransport.
-                       raise NotImplementedError()
-
                universal_newlines = kwargs.pop('universal_newlines', False)
                shell = kwargs.pop('shell', False)
                bufsize = kwargs.pop('bufsize', 0)
@@ -158,6 +190,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
                                                                  extra=None):
                return _UnixReadPipeTransport(self, pipe, protocol, waiter, 
extra)
 
+       def _make_write_pipe_transport(self, pipe, protocol, waiter=None,
+                                                                  extra=None):
+               return _UnixWritePipeTransport(self, pipe, protocol, waiter, 
extra)
+
        def _make_subprocess_transport(self, result, protocol, args, shell,
                stdin, stdout, stderr, bufsize, extra=None, **kwargs):
                waiter = self.create_future()
@@ -301,18 +337,221 @@ class _UnixReadPipeTransport(_ReadTransport):
                        self._loop = None
 
 
+class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport):
+       """
+       This is identical to the standard library's private
+       asyncio.unix_events._UnixWritePipeTransport class, except that it
+       only calls public AbstractEventLoop methods.
+       """
+
+       def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+               super().__init__(extra, loop)
+               self._extra['pipe'] = pipe
+               self._pipe = pipe
+               self._fileno = pipe.fileno()
+               self._protocol = protocol
+               self._buffer = bytearray()
+               self._conn_lost = 0
+               self._closing = False  # Set when close() or write_eof() called.
+
+               mode = os.fstat(self._fileno).st_mode
+               is_char = stat.S_ISCHR(mode)
+               is_fifo = stat.S_ISFIFO(mode)
+               is_socket = stat.S_ISSOCK(mode)
+               if not (is_char or is_fifo or is_socket):
+                       self._pipe = None
+                       self._fileno = None
+                       self._protocol = None
+                       raise ValueError("Pipe transport is only for "
+                                                        "pipes, sockets and 
character devices")
+
+               _set_nonblocking(self._fileno)
+               self._loop.call_soon(self._protocol.connection_made, self)
+
+               # On AIX, the reader trick (to be notified when the read end of 
the
+               # socket is closed) only works for sockets. On other platforms 
it
+               # works for pipes and sockets. (Exception: OS X 10.4?  Issue 
#19294.)
+               if is_socket or (is_fifo and not 
sys.platform.startswith("aix")):
+                       # only start reading when connection_made() has been 
called
+                       self._loop.call_soon(self._loop.add_reader,
+                                                                self._fileno, 
self._read_ready)
+
+               if waiter is not None:
+                       # only wake up the waiter when connection_made() has 
been called
+                       self._loop.call_soon(
+                               lambda: None if waiter.cancelled() else 
waiter.set_result(None))
+
+       def get_write_buffer_size(self):
+               return len(self._buffer)
+
+       def _read_ready(self):
+               # Pipe was closed by peer.
+               if self._loop.get_debug():
+                       logging.info("%r was closed by peer", self)
+               if self._buffer:
+                       self._close(BrokenPipeError())
+               else:
+                       self._close()
+
+       def write(self, data):
+               assert isinstance(data, (bytes, bytearray, memoryview)), 
repr(data)
+               if isinstance(data, bytearray):
+                       data = memoryview(data)
+               if not data:
+                       return
+
+               if self._conn_lost or self._closing:
+                       self._conn_lost += 1
+                       return
+
+               if not self._buffer:
+                       # Attempt to send it right away first.
+                       try:
+                               n = os.write(self._fileno, data)
+                       except (BlockingIOError, InterruptedError):
+                               n = 0
+                       except Exception as exc:
+                               self._conn_lost += 1
+                               self._fatal_error(exc, 'Fatal write error on 
pipe transport')
+                               return
+                       if n == len(data):
+                               return
+                       elif n > 0:
+                               data = memoryview(data)[n:]
+                       self._loop.add_writer(self._fileno, self._write_ready)
+
+               self._buffer += data
+               self._maybe_pause_protocol()
+
+       def _write_ready(self):
+               assert self._buffer, 'Data should not be empty'
+
+               try:
+                       n = os.write(self._fileno, self._buffer)
+               except (BlockingIOError, InterruptedError):
+                       pass
+               except Exception as exc:
+                       self._buffer.clear()
+                       self._conn_lost += 1
+                       # Remove writer here, _fatal_error() doesn't it
+                       # because _buffer is empty.
+                       self._loop.remove_writer(self._fileno)
+                       self._fatal_error(exc, 'Fatal write error on pipe 
transport')
+               else:
+                       if n == len(self._buffer):
+                               self._buffer.clear()
+                               self._loop.remove_writer(self._fileno)
+                               self._maybe_resume_protocol()  # May append to 
buffer.
+                               if self._closing:
+                                       self._loop.remove_reader(self._fileno)
+                                       self._call_connection_lost(None)
+                               return
+                       elif n > 0:
+                               del self._buffer[:n]
+
+       def can_write_eof(self):
+               return True
+
+       def write_eof(self):
+               if self._closing:
+                       return
+               assert self._pipe
+               self._closing = True
+               if not self._buffer:
+                       self._loop.remove_reader(self._fileno)
+                       self._loop.call_soon(self._call_connection_lost, None)
+
+       def set_protocol(self, protocol):
+               self._protocol = protocol
+
+       def get_protocol(self):
+               return self._protocol
+
+       def is_closing(self):
+               return self._closing
+
+       def close(self):
+               if self._pipe is not None and not self._closing:
+                       # write_eof is all what we needed to close the write 
pipe
+                       self.write_eof()
+
+       def abort(self):
+               self._close(None)
+
+       def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+               # should be called by exception handler only
+               if isinstance(exc,
+                       (BrokenPipeError, ConnectionResetError, 
ConnectionAbortedError)):
+                       if self._loop.get_debug():
+                               logging.debug("%r: %s", self, message, 
exc_info=True)
+               else:
+                       self._loop.call_exception_handler({
+                               'message': message,
+                               'exception': exc,
+                               'transport': self,
+                               'protocol': self._protocol,
+                       })
+               self._close(exc)
+
+       def _close(self, exc=None):
+               self._closing = True
+               if self._buffer:
+                       self._loop.remove_writer(self._fileno)
+               self._buffer.clear()
+               self._loop.remove_reader(self._fileno)
+               self._loop.call_soon(self._call_connection_lost, exc)
+
+       def _call_connection_lost(self, exc):
+               try:
+                       self._protocol.connection_lost(exc)
+               finally:
+                       self._pipe.close()
+                       self._pipe = None
+                       self._protocol = None
+                       self._loop = None
+
+
+if hasattr(os, 'set_inheritable'):
+       # Python 3.4 and newer
+       _set_inheritable = os.set_inheritable
+else:
+       def _set_inheritable(fd, inheritable):
+               cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1)
+
+               old = fcntl.fcntl(fd, fcntl.F_GETFD)
+               if not inheritable:
+                       fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
+               else:
+                       fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)
+
+
 class _UnixSubprocessTransport(_BaseSubprocessTransport):
        """
        This is identical to the standard library's private
-       asyncio.unix_events._UnixSubprocessTransport class, except that
-       subprocess.PIPE is not implemented for stdin, since that would
-       require connect_write_pipe support in the event loop. For example,
-       see the asyncio.unix_events._UnixWritePipeTransport class.
+       asyncio.unix_events._UnixSubprocessTransport class, except that it
+       only calls public AbstractEventLoop methods.
        """
        def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+               stdin_w = None
+               if stdin == subprocess.PIPE:
+                       # Use a socket pair for stdin, since not all platforms
+                       # support selecting read events on the write end of a
+                       # socket (which we use in order to detect closing of the
+                       # other end).  Notably this is needed on AIX, and works
+                       # just fine on other platforms.
+                       stdin, stdin_w = socket.socketpair()
+
+                       # Mark the write end of the stdin pipe as 
non-inheritable,
+                       # needed by close_fds=False on Python 3.3 and older
+                       # (Python 3.4 implements the PEP 446, socketpair returns
+                       # non-inheritable sockets)
+                       _set_inheritable(stdin_w.fileno(), False)
                self._proc = subprocess.Popen(
                        args, shell=shell, stdin=stdin, stdout=stdout, 
stderr=stderr,
                        universal_newlines=False, bufsize=bufsize, **kwargs)
+               if stdin_w is not None:
+                       stdin.close()
+                       self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', 
bufsize)
 
 
 class AbstractChildWatcher(_AbstractChildWatcher):

Reply via email to