Below you can find async SubProcess class which
derives from Stream Class send previously.

Features:
1) Process Pipes (See Example)
2) ContextManager style usage (See Example)

I would like to hear your experiences regarding
SubProcess usage patterns. I'll be glad if a similar
approach is included in standard library.

Example:
This is a http handler which dumps Html formatted
man page into a chunked stream.
** Note the piping among processes.

async def manpage(stream, f):
    async with await create_proc('groff -mandoc -Thtml -P -l -P -D -P
/tmp/', 'rw') as groff:
        async with await create_proc('zcat ' + f, outstream=groff):
            while 1:
                r = await groff.readline()
                if not r:
                    break
                # Format links to other man pages
                r = sub(b'<b>(.+?)</b>\\((.+?)\\)',
                        b'<b><a href="./\\1.\\2"
target=_blank>\\1</a>(\\2)</b>', r)
                await stream.flushchunk(r)


Source:
from asyncio import create_subprocess_exec, get_event_loop
from asyncio.subprocess import PIPE

from lib.setup import bsize
from lib.stream import Stream

async def create_proc(cmd, mode='r', outstream=None):
    stdin = PIPE if 'w' in mode else None
    stdout = PIPE if outstream or ('r' in mode) else None
    if type(cmd) is str:
        clist = cmd.split()
    else:
        clist = cmd
    aproc = await create_subprocess_exec(*clist, stdin=stdin,
stdout=stdout, limit=bsize)
    return Proc(aproc, outstream)


class Proc(Stream):

    def __init__(self, aproc, outstream):
        Stream.__init__(self, aproc.stdout, aproc.stdin)
        self.aproc = aproc
        self.outstream = outstream

    async def readwrite(self):
        while 1:
            r = await self.read(bsize)
            if r:
                self.outstream.write(r)
            else:
                break
        self.outstream.close()

    async def __aenter__(self):
        if self.outstream:
            get_event_loop().create_task(self.readwrite())
        return self

    async def __aexit__(self, *args):
        if self.aproc.stdout:
            await self.read()  # ??
        await self.aproc.wait()

Reply via email to