I want to share a custom aync Stream class which I
frequently use. I would like to hear your experiences
regarding async stream usage patterns. And I'll be
glad if a similar approach is included in standard library.
(There is also a related custom subprocess class
which will be send seperately)
Features of this custom Stream class are:
1) A wrapper around read, readexactly and readline
functions with "optional" timeout. The class has
default timeout used in all read operations. If it is
None, no timeout is used. Default behaviour can
be overriden with timeout parameter of each read call.
2) readinto an other stream. (i.e. piping)
3) Provides a ContextManager which allows
chunked transfer to underlying writer.
It is primary designed for http1.1. However,
it can be used in any form of chunked
communication (socket, subprocess, ..)
Within the context, operator() is used to
fill the chunk buffer and flushchunk is called
to close, send current chunk and open a fresh
chunk.
As a shortcut, optional parameter "b" of
flushchunk combines operator() and flushing.
from asyncio import wait_for
from lib.setup import bsize
class Stream:
def __init__(self, r, w, tout=None):
self.r = r
self.w = w
if w:
w.transport.set_write_buffer_limits(bsize, 0)
self.tout = tout
# Chunked transfer buffer
self.buf = None
# Reader Functions
async def read(self, l=-1, tout=None): # -1 Read all
return await self.readtimeout(self.r.read(l), tout)
async def readexactly(self, l, tout=None):
return await self.readtimeout(self.r.readexactly(l), tout)
async def readline(self, tout=None):
return await self.readtimeout(self.r.readline(), tout)
async def readtimeout(self, co, tout):
if not tout:
tout = self.tout
if tout:
try:
return await wait_for(co, tout)
except:
self.w.close()
else:
return await co
def write(self, b):
self.w.write(b)
def close(self):
self.w.close()
async def readinto(self, outstream, tout=None):
while 1:
rd = await self.read(bsize, tout)
if not rd:
return
if outstream.buf is None:
outstream.write(rd)
else: # We are in chunked transfer
await outstream.flushchunk(rd)
# Chunked Operations
async def flushchunk(self, b=None, drain=False, check=True):
if b:
self(b)
l = len(self.buf) - 8
if check and l < bsize:
return
lc = hex(l)[2:].encode()
start = 6 - len(lc)
self.buf[start:6] = lc # Chunk Length
self(b'\r\n') # End of Chunk
self.w.write(self.buf[start:])
self.buf = bytearray(b' \r\n')
if drain:
await self.w.drain()
return l
async def __aenter__(self):
self.buf = bytearray(b' \r\n')
return self
async def __aexit__(self, *args):
l = await self.flushchunk(check=False)
# Send empty closing chunk if current one is non-empty
if l:
await self.flushchunk(check=False)
self.buf = None
def __call__(self, b):
self.buf.extend(b)