Hi,
stream.read() blocks forever when reading from a pipe with data that is
larger than 2 * limit.
stream.read(n) for non-negative n doesn't deadlock in this case.
For example, if asyncio.StreamReader(loop=loop) is replaced with
asyncio.StreamReader(loop=loop, limit=(len(b'data') // 2 - 1))
in examples/subprocess_attach_read_pipe.py then
yield from reader.read() never returns.
https://code.google.com/p/tulip/source/browse/examples/subprocess_attach_read_pipe.py
It is reproducible for larger limit values if the data to be send is larger
than 2*limit.
I've attached the test file that demonstrates it without an external
process.
Is it the correct way to connect a read pipe to the event loop?
Related discussion:
https://code.google.com/p/tulip/source/detail?r=0a716436176993a12cf861b6cafffe8a31bc1127
"""Demonstrate that stream.read(-1) blocks forever when reading from a
pipe with data that is larger than 2*limit.
- a read with any non-negative parameter succeeds e.g.,
stream.read(1<<20) doesn't block in this case
"""
import gc
import os
import unittest
import asyncio
from asyncio import test_utils
# from tests/tests_streams.py
class StreamReaderTests(unittest.TestCase):
def setUp(self):
self.limit = 10
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
def tearDown(self):
# just in case if we have transport close callbacks
test_utils.run_briefly(self.loop)
self.loop.close()
gc.collect()
def test_read_more_than_limit_with_eof(self):
# test whether we can read more than 2*limit in a single
# read(-1) call
data_size = 2 * self.limit + 1
stream = asyncio.StreamReader(loop=self.loop, limit=self.limit)
def cb(data=b'x' * data_size):
stream.feed_data(data)
stream.feed_eof() # <-- works with it
self.loop.call_soon(cb)
data = self.loop.run_until_complete(stream.read(-1)) # yes, we can!
self.assertEqual(len(data), data_size)
self.assertEqual(b'', stream._buffer)
def _test_read_from_pipe(self, limit, data_size, n=-1):
# test whether we can read more than 2*limit in a single
# read(n) call from a pipe
@asyncio.coroutine
def read_pipe(loop):
def write_and_close(fd, data):
written = os.write(fd, data)
os.close(fd)
return written
rfd, wfd = os.pipe()
fut = loop.run_in_executor(None, write_and_close,
wfd, b'x' * data_size) # write data
with open(rfd, 'rb', 0) as pipe:
reader = asyncio.StreamReader(loop=loop, limit=limit)
protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
yield from loop.connect_read_pipe(lambda: protocol, pipe)
return (yield from asyncio.gather(
reader.read(n), fut, loop=loop))
coro = asyncio.wait_for(read_pipe(loop=self.loop),
timeout=3, loop=self.loop)
data, written = self.loop.run_until_complete(coro)
self.assertEqual(len(data), data_size)
self.assertTrue(0 < written <= data_size or data_size < 1)
def test_read_from_pipe_n(self):
self._test_read_from_pipe(self.limit, self.limit - 1, n=1<<20)
def test_read_more_than_limit_from_pipe_n(self):
self._test_read_from_pipe(self.limit, 2 * self.limit + 1, n=1<<20)
def test_read_from_pipe(self):
self._test_read_from_pipe(self.limit, self.limit - 1, n=-1) # works
def test_read_more_than_limit_from_pipe(self):
with self.assertRaises(asyncio.TimeoutError): # we can't read!
self._test_read_from_pipe(self.limit, 2 * self.limit + 1, n=-1)
if __name__ == "__main__":
unittest.main()