Hi,

stream.read() blocks forever when reading from a pipe with data that is 
larger than 2 * limit.
stream.read(n) for non-negative n doesn't deadlock in this case.

For example, if asyncio.StreamReader(loop=loop) is replaced with
asyncio.StreamReader(loop=loop, limit=(len(b'data') // 2 - 1))
in examples/subprocess_attach_read_pipe.py then
yield from reader.read() never returns.

https://code.google.com/p/tulip/source/browse/examples/subprocess_attach_read_pipe.py

It is reproducible for larger limit values if the data to be send is larger 
than 2*limit.

I've attached the test file that demonstrates it without an external 
process.

Is it the correct way to connect a read pipe to the event loop?

Related discussion: 
https://code.google.com/p/tulip/source/detail?r=0a716436176993a12cf861b6cafffe8a31bc1127
"""Demonstrate that stream.read(-1) blocks forever when reading from a
pipe with data that is larger than 2*limit.

- a read with any non-negative parameter succeeds e.g.,
  stream.read(1<<20) doesn't block in this case
"""
import gc
import os
import unittest

import asyncio
from asyncio import test_utils


# from tests/tests_streams.py
class StreamReaderTests(unittest.TestCase):

    def setUp(self):
        self.limit = 10
        self.loop = asyncio.new_event_loop()
        asyncio.set_event_loop(None)

    def tearDown(self):
        # just in case if we have transport close callbacks
        test_utils.run_briefly(self.loop)

        self.loop.close()
        gc.collect()

    def test_read_more_than_limit_with_eof(self):
        # test whether we can read more than 2*limit in a single
        # read(-1) call
        data_size = 2 * self.limit + 1
        stream = asyncio.StreamReader(loop=self.loop, limit=self.limit)

        def cb(data=b'x' * data_size):
            stream.feed_data(data)
            stream.feed_eof() # <-- works with it
        self.loop.call_soon(cb)

        data = self.loop.run_until_complete(stream.read(-1)) # yes, we can!
        self.assertEqual(len(data), data_size)
        self.assertEqual(b'', stream._buffer)

    def _test_read_from_pipe(self, limit, data_size, n=-1):
        # test whether we can read more than 2*limit in a single
        # read(n) call from a pipe

        @asyncio.coroutine
        def read_pipe(loop):
            def write_and_close(fd, data):
                written = os.write(fd, data)
                os.close(fd)
                return written

            rfd, wfd = os.pipe()
            fut = loop.run_in_executor(None, write_and_close,
                                       wfd, b'x' * data_size) # write data

            with open(rfd, 'rb', 0) as pipe:
                reader = asyncio.StreamReader(loop=loop, limit=limit)
                protocol = asyncio.StreamReaderProtocol(reader, loop=loop)
                yield from loop.connect_read_pipe(lambda: protocol, pipe)
                return (yield from asyncio.gather(
                    reader.read(n), fut, loop=loop))

        coro = asyncio.wait_for(read_pipe(loop=self.loop),
                                timeout=3, loop=self.loop)
        data, written = self.loop.run_until_complete(coro)
        self.assertEqual(len(data), data_size)
        self.assertTrue(0 < written <= data_size or data_size < 1)

    def test_read_from_pipe_n(self):
        self._test_read_from_pipe(self.limit, self.limit - 1, n=1<<20)

    def test_read_more_than_limit_from_pipe_n(self):
        self._test_read_from_pipe(self.limit, 2 * self.limit + 1, n=1<<20)

    def test_read_from_pipe(self):
        self._test_read_from_pipe(self.limit, self.limit - 1, n=-1) # works

    def test_read_more_than_limit_from_pipe(self):
        with self.assertRaises(asyncio.TimeoutError): # we can't read!
            self._test_read_from_pipe(self.limit, 2 * self.limit + 1, n=-1)


if __name__ == "__main__":
    unittest.main()

Reply via email to