2014-01-24 Victor Stinner <[email protected]>:
> I will try to hack asyncio to see how StreamWriter can be implemented for
> stdin.
I created a subproces-stream branch (oops, typo!) in Tulip repository.
It's a work in progress, but it works :-) Attached patch are the
differences with the default branch.
I tried to limit changes on existing code:
- add SubprocessProtocol.pip_connection_made(): needed to retrieve the
writer for stdin
- add two attributes to SubprocessProtocol: read_pipe_protocol,
write_pipe_protocol => factories to build protocols for stdout/stderr
and stdin
I added new classes in asyncio/subprocess_stream.py.
The syntax is more what I expect from asyncio: asynchronous read and
write (instead of callbacks). You can also wait for the completion of
the write (drain: wait until the write buffer is flushed) and wait
until the process exited (wait).
Example:
---
@asyncio.coroutine
def cat(loop):
transport, protocol = yield from
loop.subprocess_shell(asyncio.SubprocessStreamProtocol, "cat")
protocol.stdin.write(b"Hello World!")
yield from protocol.stdin.drain()
protocol.stdin.close()
print((yield from protocol.stdout.read()).decode('ascii'))
returncode = yield from proc.wait()
print("exit code: %s" % returncode)
---
See examples/subprocess_stream.py for a more complete example.
I didn't test on Windows yet.
I prefer to add attributes to SubprocessProtocol instead of passing
new (keyword) parameters to subprocess_exec/subprocess_shell. The pipe
protocols are related the subprocess protocol: see
SubprocessStreamProtocol.pipe_connection_madeSubprocess().
Victor
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/__init__.py
--- a/asyncio/__init__.py Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/__init__.py Sun Jan 26 04:18:32 2014 +0100
@@ -24,6 +24,7 @@ from .locks import *
from .protocols import *
from .queues import *
from .streams import *
+from .subprocess_stream import *
from .tasks import *
from .transports import *
@@ -39,5 +40,6 @@ else:
protocols.__all__ +
queues.__all__ +
streams.__all__ +
+ subprocess_stream.__all__ +
tasks.__all__ +
transports.__all__)
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/base_subprocess.py
--- a/asyncio/base_subprocess.py Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/base_subprocess.py Sun Jan 26 04:18:32 2014 +0100
@@ -30,6 +30,14 @@ class BaseSubprocessTransport(transports
self._pending_calls = collections.deque()
self._finished = False
self._returncode = None
+ if protocol.read_pipe_protocol is not None:
+ self._read_pipe_protocol = protocol.read_pipe_protocol
+ else:
+ self._read_pipe_protocol = ReadSubprocessPipeProto
+ if protocol.write_pipe_protocol is not None:
+ self._write_pipe_protocol = protocol.write_pipe_protocol
+ else:
+ self._write_pipe_protocol = WriteSubprocessPipeProto
self._start(args=args, shell=shell, stdin=stdin, stdout=stdout,
stderr=stderr, bufsize=bufsize, **kwargs)
self._extra['subprocess'] = self._proc
@@ -75,16 +83,16 @@ class BaseSubprocessTransport(transports
proc = self._proc
loop = self._loop
if proc.stdin is not None:
- transp, proto = yield from loop.connect_write_pipe(
- lambda: WriteSubprocessPipeProto(self, STDIN),
+ yield from loop.connect_write_pipe(
+ lambda: self._write_pipe_protocol(self, STDIN),
proc.stdin)
if proc.stdout is not None:
- transp, proto = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDOUT),
+ yield from loop.connect_read_pipe(
+ lambda: self._read_pipe_protocol(self, STDOUT),
proc.stdout)
if proc.stderr is not None:
- transp, proto = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDERR),
+ yield from loop.connect_read_pipe(
+ lambda: self._read_pipe_protocol(self, STDERR),
proc.stderr)
if not self._pipes:
self._try_connected()
@@ -103,6 +111,9 @@ class BaseSubprocessTransport(transports
self._loop.call_soon(callback, *data)
self._pending_calls = None
+ def _pipe_connection_made(self, fd, pipe):
+ self._protocol.pipe_connection_made(fd, pipe)
+
def _pipe_connection_lost(self, fd, exc):
self._call(self._protocol.pipe_connection_lost, fd, exc)
self._try_finish()
@@ -150,6 +161,7 @@ class WriteSubprocessPipeProto(protocols
self.connected = True
self.pipe = transport
self.proc._try_connected()
+ self.proc._pipe_connection_made(self.fd, self)
def connection_lost(self, exc):
self.disconnected = True
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/protocols.py
--- a/asyncio/protocols.py Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/protocols.py Sun Jan 26 04:18:32 2014 +0100
@@ -111,6 +111,12 @@ class DatagramProtocol(BaseProtocol):
class SubprocessProtocol(BaseProtocol):
"""Interface for protocol for subprocess calls."""
+ read_pipe_protocol = None
+ write_pipe_protocol = None
+
+ def pipe_connection_made(self, fd, pipe):
+ pass
+
def pipe_data_received(self, fd, data):
"""Called when the subprocess writes data into stdout/stderr pipe.
@@ -127,3 +133,4 @@ class SubprocessProtocol(BaseProtocol):
def process_exited(self):
"""Called when subprocess has exited."""
+
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/subprocess_stream.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/asyncio/subprocess_stream.py Sun Jan 26 04:18:32 2014 +0100
@@ -0,0 +1,190 @@
+__all__ = ['SubprocessStreamProtocol']
+
+from . import base_subprocess
+from . import events
+from . import protocols
+from . import streams
+from . import tasks
+
+class WriteSubprocessPipeStreamProto(base_subprocess.WriteSubprocessPipeProto):
+ def __init__(self, process_transport, fd):
+ base_subprocess.WriteSubprocessPipeProto.__init__(self, process_transport, fd)
+ self._drain_waiter = None
+ self._paused = False
+ self.writer = WritePipeStream(None, self, None)
+
+ def connection_made(self, transport):
+ super().connection_made(transport)
+ self.writer._transport = transport
+ self.writer._loop = transport._loop
+
+ def connection_lost(self, exc):
+ # FIXME: call super().connection_lost(exc)
+ # Also wake up the writing side.
+ if self._paused:
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ if exc is None:
+ waiter.set_result(None)
+ else:
+ waiter.set_exception(exc)
+
+ def pause_writing(self):
+ assert not self._paused
+ self._paused = True
+
+ def resume_writing(self):
+ assert self._paused
+ self._paused = False
+ waiter = self._drain_waiter
+ if waiter is not None:
+ self._drain_waiter = None
+ if not waiter.done():
+ waiter.set_result(None)
+
+
+class WritePipeStream:
+ """Wraps a Transport.
+
+ This exposes write(), writelines(), [can_]write_eof(),
+ get_extra_info() and close(). It adds drain() which returns an
+ optional Future on which you can wait for flow control. It also
+ adds a transport property which references the Transport
+ directly.
+ """
+
+ def __init__(self, transport, protocol, loop):
+ self._transport = transport
+ self._protocol = protocol
+ self._loop = loop
+
+ @property
+ def transport(self):
+ return self._transport
+
+ def write(self, data):
+ self._transport.write(data)
+
+ def writelines(self, data):
+ self._transport.writelines(data)
+
+ def write_eof(self):
+ return self._transport.write_eof()
+
+ def can_write_eof(self):
+ return self._transport.can_write_eof()
+
+ def close(self):
+ return self._transport.close()
+
+ def get_extra_info(self, name, default=None):
+ return self._transport.get_extra_info(name, default)
+
+ def drain(self):
+ """This method has an unusual return value.
+
+ The intended use is to write
+
+ w.write(data)
+ yield from w.drain()
+
+ When there's nothing to wait for, drain() returns (), and the
+ yield-from continues immediately. When the transport buffer
+ is full (the protocol is paused), drain() creates and returns
+ a Future and the yield-from will block until that Future is
+ completed, which will happen when the buffer is (partially)
+ drained and the protocol is resumed.
+ """
+ if self._transport._conn_lost: # Uses private variable.
+ raise ConnectionResetError('Connection lost')
+ if not self._protocol._paused:
+ return ()
+ waiter = self._protocol._drain_waiter
+ assert waiter is None or waiter.cancelled()
+ waiter = futures.Future(loop=self._loop)
+ self._protocol._drain_waiter = waiter
+ return waiter
+
+class ReadSubprocessPipeStreamProto(base_subprocess.ReadSubprocessPipeProto):
+ def __init__(self, proc, fd, limit=streams._DEFAULT_LIMIT):
+ super().__init__(proc, fd)
+ self._stream_reader = streams.StreamReader(limit=limit)
+
+ def connection_made(self, transport):
+ super().connection_made(transport)
+ self._stream_reader.set_transport(transport)
+
+ def connection_lost(self, exc):
+ # FIXME: call super().connection_lost(exc)
+ if exc is None:
+ self._stream_reader.feed_eof()
+ else:
+ self._stream_reader.set_exception(exc)
+
+ def data_received(self, data):
+ self._stream_reader.feed_data(data)
+
+ def eof_received(self):
+ self._stream_reader.feed_eof()
+
+
+class SubprocessStreamProtocol(protocols.SubprocessProtocol):
+ write_pipe_protocol = WriteSubprocessPipeStreamProto
+
+ def __init__(self, limit=streams._DEFAULT_LIMIT):
+ self._pipes = {}
+ self.limit = limit
+ self.stdin = None
+ self.stdout = None
+ self.stderr = None
+ self._waiters = []
+ self._transport = None
+
+ def read_pipe_protocol(self, transport, fd):
+ protocol = ReadSubprocessPipeStreamProto(transport, fd, self.limit)
+ self.pipe_connection_made(fd, protocol)
+ return protocol
+
+ def connection_made(self, transport):
+ self._transport = transport
+
+ def pipe_data_received(self, fd, data):
+ pipe = self._pipes[fd]
+ pipe.data_received(data)
+
+ def pipe_connection_lost(self, fd, exc):
+ pipe = self._pipes[fd]
+ pipe.connection_lost(exc)
+
+ @tasks.coroutine
+ def wait(self):
+ """
+ Wait until the process exit and return the process return code.
+ """
+ returncode = self._transport.get_returncode()
+ if returncode is not None:
+ return returncode
+
+ fut = tasks.Future()
+ self._waiters.append(fut)
+ yield from fut
+ return fut.result()
+
+ def process_exited(self):
+ returncode = self._transport.get_returncode()
+ # FIXME: not thread safe
+ waiters = self._waiters.copy()
+ self._waiters.clear()
+ for waiter in waiters:
+ waiter.set_result(returncode)
+
+ def pipe_connection_made(self, fd, pipe):
+ self._pipes[fd] = pipe
+ if fd == 0:
+ self.stdin = pipe.writer
+ elif fd == 1:
+ self.stdout = pipe._stream_reader
+ elif fd == 2:
+ self.stderr = pipe._stream_reader
diff -r 30b9a06f4c0a -r ea0b8f14c719 asyncio/unix_events.py
--- a/asyncio/unix_events.py Sun Jan 26 00:00:59 2014 +0100
+++ b/asyncio/unix_events.py Sun Jan 26 04:18:32 2014 +0100
@@ -284,6 +284,16 @@ class _UnixWritePipeTransport(transports
# Pipe was closed by peer.
self._close()
+ # FIXME: inline this method?
+ def _pause_protocol(self):
+ # FIXME: try/except as _SelectorTransport._maybe_pause_protocol?
+ self._protocol.pause_writing()
+
+ # FIXME: inline this method?
+ def _resume_protocol(self):
+ # FIXME: try/except as _SelectorTransport._maybe_resume_protocol?
+ self._protocol.resume_writing()
+
def write(self, data):
assert isinstance(data, bytes), repr(data)
if not data:
@@ -313,6 +323,7 @@ class _UnixWritePipeTransport(transports
self._loop.add_writer(self._fileno, self._write_ready)
self._buffer.append(data)
+ self._pause_protocol()
def _write_ready(self):
data = b''.join(self._buffer)
@@ -332,6 +343,8 @@ class _UnixWritePipeTransport(transports
else:
if n == len(data):
self._loop.remove_writer(self._fileno)
+ # FIXME: move resume after the closing block?
+ self._resume_protocol()
if self._closing:
self._loop.remove_reader(self._fileno)
self._call_connection_lost(None)
diff -r 30b9a06f4c0a -r ea0b8f14c719 examples/subprocess_stream.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/subprocess_stream.py Sun Jan 26 04:18:32 2014 +0100
@@ -0,0 +1,33 @@
+import asyncio
+
[email protected]
+def cat(loop):
+ transport, proc = yield from loop.subprocess_shell(asyncio.SubprocessStreamProtocol, "cat")
+ print("pid: %s" % transport.get_pid())
+
+ message = "Hello World!"
+ print("cat write: %r" % message)
+ proc.stdin.write(message.encode('ascii'))
+ yield from proc.stdin.drain()
+
+ proc.stdin.close()
+ read = yield from proc.stdout.read()
+ print("cat read: %r" % read.decode('ascii'))
+
+ returncode = yield from proc.wait()
+ print("exit code: %s" % returncode)
+ transport.close()
+
[email protected]
+def ls(loop):
+ transport, proc = yield from loop.subprocess_exec(asyncio.SubprocessStreamProtocol, "ls", stdin=None)
+ while True:
+ line = yield from proc.stdout.readline()
+ if not line:
+ break
+ print("ls>>", line.decode('ascii').rstrip())
+ transport.close()
+
+loop = asyncio.get_event_loop()
+loop.run_until_complete(cat(loop))
+loop.run_until_complete(ls(loop))