2014-01-24 Victor Stinner <[email protected]>:
> I will try to hack asyncio to see how StreamWriter can be implemented for 
> stdin.

I created a subproces-stream branch (oops, typo!) in Tulip repository.
It's a work in progress, but it works :-) Attached patch are the
differences with the default branch.

I tried to limit changes on existing code:

- add SubprocessProtocol.pip_connection_made(): needed to retrieve the
writer for stdin
- add two attributes to SubprocessProtocol: read_pipe_protocol,
write_pipe_protocol => factories to build protocols for stdout/stderr
and stdin

I added new classes in asyncio/subprocess_stream.py.

The syntax is more what I expect from asyncio: asynchronous read and
write (instead of callbacks). You can also wait for the completion of
the write (drain: wait until the write buffer is flushed) and wait
until the process exited (wait).

Example:
---
@asyncio.coroutine
def cat(loop):
    transport, protocol = yield from
loop.subprocess_shell(asyncio.SubprocessStreamProtocol, "cat")

    protocol.stdin.write(b"Hello World!")
    yield from protocol.stdin.drain()
    protocol.stdin.close()

    print((yield from protocol.stdout.read()).decode('ascii'))

    returncode = yield from proc.wait()
    print("exit code: %s" % returncode)
---

See examples/subprocess_stream.py for a more complete example.

I didn't test on Windows yet.

I prefer to add attributes to SubprocessProtocol instead of passing
new (keyword) parameters to subprocess_exec/subprocess_shell. The pipe
protocols are related the subprocess protocol: see
SubprocessStreamProtocol.pipe_connection_madeSubprocess().

Victor
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/__init__.py
--- a/asyncio/__init__.py	Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/__init__.py	Sun Jan 26 04:18:32 2014 +0100
@@ -24,6 +24,7 @@ from .locks import *
 from .protocols import *
 from .queues import *
 from .streams import *
+from .subprocess_stream import *
 from .tasks import *
 from .transports import *
 
@@ -39,5 +40,6 @@ else:
            protocols.__all__ +
            queues.__all__ +
            streams.__all__ +
+           subprocess_stream.__all__ +
            tasks.__all__ +
            transports.__all__)
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/base_subprocess.py
--- a/asyncio/base_subprocess.py	Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/base_subprocess.py	Sun Jan 26 04:18:32 2014 +0100
@@ -30,6 +30,14 @@ class BaseSubprocessTransport(transports
         self._pending_calls = collections.deque()
         self._finished = False
         self._returncode = None
+        if protocol.read_pipe_protocol is not None:
+            self._read_pipe_protocol = protocol.read_pipe_protocol
+        else:
+            self._read_pipe_protocol = ReadSubprocessPipeProto
+        if protocol.write_pipe_protocol is not None:
+            self._write_pipe_protocol = protocol.write_pipe_protocol
+        else:
+            self._write_pipe_protocol = WriteSubprocessPipeProto
         self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
                     stderr=stderr, bufsize=bufsize, **kwargs)
         self._extra['subprocess'] = self._proc
@@ -75,16 +83,16 @@ class BaseSubprocessTransport(transports
         proc = self._proc
         loop = self._loop
         if proc.stdin is not None:
-            transp, proto = yield from loop.connect_write_pipe(
-                lambda: WriteSubprocessPipeProto(self, STDIN),
+            yield from loop.connect_write_pipe(
+                lambda: self._write_pipe_protocol(self, STDIN),
                 proc.stdin)
         if proc.stdout is not None:
-            transp, proto = yield from loop.connect_read_pipe(
-                lambda: ReadSubprocessPipeProto(self, STDOUT),
+            yield from loop.connect_read_pipe(
+                lambda: self._read_pipe_protocol(self, STDOUT),
                 proc.stdout)
         if proc.stderr is not None:
-            transp, proto = yield from loop.connect_read_pipe(
-                lambda: ReadSubprocessPipeProto(self, STDERR),
+            yield from loop.connect_read_pipe(
+                lambda: self._read_pipe_protocol(self, STDERR),
                 proc.stderr)
         if not self._pipes:
             self._try_connected()
@@ -103,6 +111,9 @@ class BaseSubprocessTransport(transports
                 self._loop.call_soon(callback, *data)
             self._pending_calls = None
 
+    def _pipe_connection_made(self, fd, pipe):
+        self._protocol.pipe_connection_made(fd, pipe)
+
     def _pipe_connection_lost(self, fd, exc):
         self._call(self._protocol.pipe_connection_lost, fd, exc)
         self._try_finish()
@@ -150,6 +161,7 @@ class WriteSubprocessPipeProto(protocols
         self.connected = True
         self.pipe = transport
         self.proc._try_connected()
+        self.proc._pipe_connection_made(self.fd, self)
 
     def connection_lost(self, exc):
         self.disconnected = True
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/protocols.py
--- a/asyncio/protocols.py	Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/protocols.py	Sun Jan 26 04:18:32 2014 +0100
@@ -111,6 +111,12 @@ class DatagramProtocol(BaseProtocol):
 class SubprocessProtocol(BaseProtocol):
     """Interface for protocol for subprocess calls."""
 
+    read_pipe_protocol = None
+    write_pipe_protocol = None
+
+    def pipe_connection_made(self, fd, pipe):
+        pass
+
     def pipe_data_received(self, fd, data):
         """Called when the subprocess writes data into stdout/stderr pipe.
 
@@ -127,3 +133,4 @@ class SubprocessProtocol(BaseProtocol):
 
     def process_exited(self):
         """Called when subprocess has exited."""
+
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/subprocess_stream.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/asyncio/subprocess_stream.py	Sun Jan 26 04:18:32 2014 +0100
@@ -0,0 +1,190 @@
+__all__ = ['SubprocessStreamProtocol']
+
+from . import base_subprocess
+from . import events
+from . import protocols
+from . import streams
+from . import tasks
+
+class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto):
+    def __init__(self, process_transport, fd):
+        base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd)
+        self._drain_waiter = None
+        self._paused = False
+        self.writer = WritePipeStream(None, self, None)
+
+    def connection_made(self, transport):
+        super().connection_made(transport)
+        self.writer._transport = transport
+        self.writer._loop = transport._loop
+
+    def connection_lost(self, exc):
+        # FIXME: call super().connection_lost(exc)
+        # Also wake up the writing side.
+        if self._paused:
+            waiter = self._drain_waiter
+            if waiter is not None:
+                self._drain_waiter = None
+                if not waiter.done():
+                    if exc is None:
+                        waiter.set_result(None)
+                    else:
+                        waiter.set_exception(exc)
+
+    def pause_writing(self):
+        assert not self._paused
+        self._paused = True
+
+    def resume_writing(self):
+        assert self._paused
+        self._paused = False
+        waiter = self._drain_waiter
+        if waiter is not None:
+            self._drain_waiter = None
+            if not waiter.done():
+                waiter.set_result(None)
+
+
+class WritePipeStream:
+    """Wraps a Transport.
+
+    This exposes write(), writelines(), [can_]write_eof(),
+    get_extra_info() and close().  It adds drain() which returns an
+    optional Future on which you can wait for flow control.  It also
+    adds a transport property which references the Transport
+    directly.
+    """
+
+    def __init__(self, transport, protocol, loop):
+        self._transport = transport
+        self._protocol = protocol
+        self._loop = loop
+
+    @property
+    def transport(self):
+        return self._transport
+
+    def write(self, data):
+        self._transport.write(data)
+
+    def writelines(self, data):
+        self._transport.writelines(data)
+
+    def write_eof(self):
+        return self._transport.write_eof()
+
+    def can_write_eof(self):
+        return self._transport.can_write_eof()
+
+    def close(self):
+        return self._transport.close()
+
+    def get_extra_info(self, name, default=None):
+        return self._transport.get_extra_info(name, default)
+
+    def drain(self):
+        """This method has an unusual return value.
+
+        The intended use is to write
+
+          w.write(data)
+          yield from w.drain()
+
+        When there's nothing to wait for, drain() returns (), and the
+        yield-from continues immediately.  When the transport buffer
+        is full (the protocol is paused), drain() creates and returns
+        a Future and the yield-from will block until that Future is
+        completed, which will happen when the buffer is (partially)
+        drained and the protocol is resumed.
+        """
+        if self._transport._conn_lost:  # Uses private variable.
+            raise ConnectionResetError('Connection lost')
+        if not self._protocol._paused:
+            return ()
+        waiter = self._protocol._drain_waiter
+        assert waiter is None or waiter.cancelled()
+        waiter = futures.Future(loop=self._loop)
+        self._protocol._drain_waiter = waiter
+        return waiter
+
+class ReadSubprocessPipeStreamProto(base_subprocess.ReadSubprocessPipeProto):
+    def __init__(self, proc, fd, limit=streams._DEFAULT_LIMIT):
+        super().__init__(proc, fd)
+        self._stream_reader = streams.StreamReader(limit=limit)
+
+    def connection_made(self, transport):
+        super().connection_made(transport)
+        self._stream_reader.set_transport(transport)
+
+    def connection_lost(self, exc):
+        # FIXME: call super().connection_lost(exc)
+        if exc is None:
+            self._stream_reader.feed_eof()
+        else:
+            self._stream_reader.set_exception(exc)
+
+    def data_received(self, data):
+        self._stream_reader.feed_data(data)
+
+    def eof_received(self):
+        self._stream_reader.feed_eof()
+
+
+class SubprocessStreamProtocol(protocols.SubprocessProtocol):
+    write_pipe_protocol = WriteSubprocessPipeStreamProto
+
+    def __init__(self, limit=streams._DEFAULT_LIMIT):
+        self._pipes = {}
+        self.limit = limit
+        self.stdin = None
+        self.stdout = None
+        self.stderr = None
+        self._waiters = []
+        self._transport = None
+
+    def read_pipe_protocol(self, transport, fd):
+        protocol = ReadSubprocessPipeStreamProto(transport, fd, self.limit)
+        self.pipe_connection_made(fd, protocol)
+        return protocol
+
+    def connection_made(self, transport):
+        self._transport = transport
+
+    def pipe_data_received(self, fd, data):
+        pipe = self._pipes[fd]
+        pipe.data_received(data)
+
+    def pipe_connection_lost(self, fd, exc):
+        pipe = self._pipes[fd]
+        pipe.connection_lost(exc)
+
+    @tasks.coroutine
+    def wait(self):
+        """
+        Wait until the process exit and return the process return code.
+        """
+        returncode = self._transport.get_returncode()
+        if returncode is not None:
+            return returncode
+
+        fut = tasks.Future()
+        self._waiters.append(fut)
+        yield from fut
+        return fut.result()
+
+    def process_exited(self):
+        returncode = self._transport.get_returncode()
+        # FIXME: not thread safe
+        waiters = self._waiters.copy()
+        self._waiters.clear()
+        for waiter in waiters:
+            waiter.set_result(returncode)
+
+    def pipe_connection_made(self, fd, pipe):
+        self._pipes[fd] = pipe
+        if fd == 0:
+            self.stdin = pipe.writer
+        elif fd == 1:
+            self.stdout = pipe._stream_reader
+        elif fd == 2:
+            self.stderr = pipe._stream_reader
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/unix_events.py
--- a/asyncio/unix_events.py	Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/unix_events.py	Sun Jan 26 04:18:32 2014 +0100
@@ -284,6 +284,16 @@ class _UnixWritePipeTransport(transports
         # Pipe was closed by peer.
         self._close()
 
+    # FIXME: inline this method?
+    def _pause_protocol(self):
+        # FIXME: try/except as _SelectorTransport._maybe_pause_protocol?
+        self._protocol.pause_writing()
+
+    # FIXME: inline this method?
+    def _resume_protocol(self):
+        # FIXME: try/except as _SelectorTransport._maybe_resume_protocol?
+        self._protocol.resume_writing()
+
     def write(self, data):
         assert isinstance(data, bytes), repr(data)
         if not data:
@@ -313,6 +323,7 @@ class _UnixWritePipeTransport(transports
             self._loop.add_writer(self._fileno, self._write_ready)
 
         self._buffer.append(data)
+        self._pause_protocol()
 
     def _write_ready(self):
         data = b''.join(self._buffer)
@@ -332,6 +343,8 @@ class _UnixWritePipeTransport(transports
         else:
             if n == len(data):
                 self._loop.remove_writer(self._fileno)
+                # FIXME: move resume after the closing block?
+                self._resume_protocol()
                 if self._closing:
                     self._loop.remove_reader(self._fileno)
                     self._call_connection_lost(None)
diff -r 30b9a06f4c0a -r ea0b8f14c719 examples/subprocess_stream.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/subprocess_stream.py	Sun Jan 26 04:18:32 2014 +0100
@@ -0,0 +1,33 @@
+import asyncio
+
[email protected]
+def cat(loop):
+    transport, proc = yield from loop.subprocess_shell(asyncio.SubprocessStreamProtocol, "cat")
+    print("pid: %s" % transport.get_pid())
+
+    message = "Hello World!"
+    print("cat write: %r" % message)
+    proc.stdin.write(message.encode('ascii'))
+    yield from proc.stdin.drain()
+
+    proc.stdin.close()
+    read = yield from proc.stdout.read()
+    print("cat read: %r" % read.decode('ascii'))
+
+    returncode = yield from proc.wait()
+    print("exit code: %s" % returncode)
+    transport.close()
+
[email protected]
+def ls(loop):
+    transport, proc = yield from loop.subprocess_exec(asyncio.SubprocessStreamProtocol, "ls", stdin=None)
+    while True:
+        line = yield from proc.stdout.readline()
+        if not line:
+            break
+        print("ls>>", line.decode('ascii').rstrip())
+    transport.close()
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(cat(loop))
+loop.run_until_complete(ls(loop))

Reply via email to