Well, if you need help, check if this patch helps.  Maybe you will have
already arrived at the same result, but... I just tweaked StreamWriter very
slightly to be able to work without a reader.

Then we only need to add an API to produce a future that completes when a
subprocess exits, then it would be perfect ;-)

Regards,
Gustavo.


On 25 January 2014 22:42, Victor Stinner <[email protected]> wrote:

> (Guido hijicked my "Round timeout in BaseEventLoop._run_once()" thread
> with the subprocess discussion, I prefer to reply here)
>
> >> Hum, yes, something should be done for subprocess. IMO the API of
> >> subprocess should be changed. The best would be to provide something
> >> like StreamReader and StreamWriter for subprocess. If it's not
> >> possible to develop it before Python 3.4 final, a compromise is to
> >> ensure that the API allows to develop it later. According to my
> >> analysis (see the other dedicated thread on Tulip mailing list), it's
> >> not the case with the current API.
> >
> > It's clear that not enough people have actually tried to *use* the
> > subprocess API. :-(
>
> I take a look at examples/child_process.py, but I don't understand
> this example. It doesn't use EventLoop.subprocess_exec(). It doesn't
> use SubprocessTransport, whereas I would like to use it to benefit of
> the great FastChildWatcher.
>
> Having to write a connect_read_pipe() function to get a StreamReader
> is too complex for me. I don't want to duplicate this function in each
> script that want to run a command and use asyncio.
>
> The current API looks very low-level. I don't want to use it in my
> application.
>
> > My suggestion: first try to write code using the existing public API to
> hook
> > up the existing StreamReader / StreamWriter classes to stdin, stdout,
> stderr
> > of a subprocess.
>
> I did and I failed, especially for StreamWriter.
>
> > StreamReader and StreamWriter are explicitly intended to be hooked up in
> > other ways than the code in open_connection(), start_server() and
> > StreamReaderProtocol), and those three are intended to be simple enough
> to
> > be able to be copied into user code and then modified for the user's
> > purpose. If changes to StreamReader/Writer are necessary we should
> > prioritize those -- their implementation is *not* meant to be copied.
>
> StreamReaderProtocol and StreamWriter cannot be used for write pipes
> (stdin) because these classes expect that a connected reader, which is
> not the case. I don't care of having to use a completly different
> class if it gives the same nice API (write, writeline and drain).
>
> I will play with the API to see how I can implement something like a
> StreamWriter for write pipes.
>
> Victor
>



-- 
Gustavo J. A. M. Carneiro
Gambit Research LLC
"The universe is always one step beyond logic." -- Frank Herbert
diff -r a9270ac18ecc asyncio/streams.py
--- a/asyncio/streams.py        Sat Jan 25 22:02:14 2014 +0100
+++ b/asyncio/streams.py        Sat Jan 25 22:59:14 2014 +0000
@@ -2,6 +2,7 @@
 
 __all__ = ['StreamReader', 'StreamWriter', 'StreamReaderProtocol',
            'open_connection', 'start_server', 'IncompleteReadError',
+           'connect_read_pipe', 'connect_write_pipe',
            ]
 
 import collections
@@ -157,6 +158,33 @@
             if not waiter.done():
                 waiter.set_result(None)
 
+#
+# Return a write-only transport wrapping a writable pipe
+#
+
[email protected]
+def connect_write_pipe(file, loop=None):
+    if loop is None:
+        loop = events.get_event_loop()
+    protocol = protocols.Protocol()
+    transport, _ =  yield from loop.connect_write_pipe(protocols.Protocol, 
file)
+    writer = StreamWriter(transport, protocol, None, loop)
+    return writer
+
+#
+# Wrap a readable pipe in a stream
+#
+
[email protected]
+def connect_read_pipe(file, loop=None):
+    if loop is None:
+        loop = events.get_event_loop()
+    stream_reader = StreamReader(loop=loop)
+    def factory():
+        return StreamReaderProtocol(stream_reader)
+    _transport, _ = yield from loop.connect_read_pipe(factory, file)
+    return stream_reader
+
 
 class StreamWriter:
     """Wraps a Transport.
@@ -211,7 +239,7 @@
         completed, which will happen when the buffer is (partially)
         drained and the protocol is resumed.
         """
-        if self._reader._exception is not None:
+        if self._reader is not None and self._reader._exception is not None:
             raise self._reader._exception
         if self._transport._conn_lost:  # Uses private variable.
             raise ConnectionResetError('Connection lost')
@@ -422,3 +450,6 @@
             n -= len(block)
 
         return b''.join(blocks)
+
+    def close(self):
+        return self._transport.close()
diff -r a9270ac18ecc examples/child_process.py
--- a/examples/child_process.py Sat Jan 25 22:02:14 2014 +0100
+++ b/examples/child_process.py Sat Jan 25 22:59:14 2014 +0000
@@ -23,29 +23,6 @@
 else:
     from subprocess import Popen, PIPE
 
-#
-# Return a write-only transport wrapping a writable pipe
-#
-
[email protected]
-def connect_write_pipe(file):
-    loop = asyncio.get_event_loop()
-    protocol = protocols.Protocol()
-    transport, _ =  yield from loop.connect_write_pipe(asyncio.Protocol, file)
-    return transport
-
-#
-# Wrap a readable pipe in a stream
-#
-
[email protected]
-def connect_read_pipe(file):
-    loop = asyncio.get_event_loop()
-    stream_reader = streams.StreamReader(loop=loop)
-    def factory():
-        return streams.StreamReaderProtocol(stream_reader)
-    transport, _ = yield from loop.connect_read_pipe(factory, file)
-    return stream_reader, transport
 
 
 #
@@ -82,9 +59,9 @@
     p = Popen([sys.executable, '-c', code],
               stdin=PIPE, stdout=PIPE, stderr=PIPE)
 
-    stdin = yield from connect_write_pipe(p.stdin)
-    stdout, stdout_transport = yield from connect_read_pipe(p.stdout)
-    stderr, stderr_transport = yield from connect_read_pipe(p.stderr)
+    stdin = yield from asyncio.connect_write_pipe(p.stdin)
+    stdout = yield from asyncio.connect_read_pipe(p.stdout)
+    stderr = yield from asyncio.connect_read_pipe(p.stderr)
 
     # interact with subprocess
     name = {stdout:'OUT', stderr:'ERR'}
@@ -115,8 +92,8 @@
                     registered[asyncio.Task(stream.readline())] = stream
             timeout = 0.0
 
-    stdout_transport.close()
-    stderr_transport.close()
+    stdout.close()
+    stderr.close()
 
 if __name__ == '__main__':
     if sys.platform == 'win32':

Reply via email to