Raghuram Devarakonda added the comment:

I am attaching the patch io.diff that does the following:

- If a chunk ends in "\r", read the next chunk to check if it starts
with "\n". This is obviously a very simplified solution that can be
optimized.

- invoke _replacenl on the complete string read instead of what is being
returned in each read call.

I also incorporated the test case by Amaury and added one more.

With this patch in place, the following tests failed (on SuSE 10.1):

test_doctest test_mailbox test_nis test_old_mailbox
test_pickletools test_pty test_sys

The failures (other than known test_mailbox and test_old_mailbox) didn't
look like they are caused by this fix.

Added file: http://bugs.python.org/file8703/io.diff

__________________________________
Tracker <[EMAIL PROTECTED]>
<http://bugs.python.org/issue1395>
__________________________________
Index: Lib/io.py
===================================================================
--- Lib/io.py	(revision 58880)
+++ Lib/io.py	(working copy)
@@ -1248,13 +1248,19 @@
             self._snapshot = None
             return self._replacenl(res)
         else:
-            while len(res) < n:
+            nl = False
+            while nl or len(res) < n:
                 readahead, pending = self._read_chunk()
+                if pending and pending[-1] == "\r":
+                    nl = True
+                else:
+                    nl = False
                 res += pending
                 if not readahead:
                     break
+            res = self._replacenl(res)
             self._pending = res[n:]
-            return self._replacenl(res[:n])
+            return res[:n]
 
     def __next__(self):
         self._telling = False
Index: Lib/test/test_io.py
===================================================================
--- Lib/test/test_io.py	(revision 58880)
+++ Lib/test/test_io.py	(working copy)
@@ -741,7 +741,28 @@
                 print("Reading using readline(): %6.3f seconds" % (t3-t2))
                 print("Using readline()+tell():  %6.3f seconds" % (t4-t3))
 
+    def testReadOneByOne(self):
+        txt = io.TextIOWrapper(io.BytesIO(b"AA\r\nBB"))
+        reads = ""
+        while True:
+            c = txt.read(1)
+            if not c:
+                break
+            reads += c
+        self.assertEquals(reads, "AA\nBB")
 
+    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
+    def testReadByChunk(self):
+        # make sure "\r\n" straddles 128 char boundary.
+        txt = io.TextIOWrapper(io.BytesIO(b"A" * 127 + b"\r\nB"))
+        reads = ""
+        while True:
+            c = txt.read(128)
+            if not c:
+                break
+            reads += c
+        self.assertEquals(reads, "A"*127+"\nB")
+
 # XXX Tests for open()
 
 class MiscIOTest(unittest.TestCase):
_______________________________________________
Python-bugs-list mailing list 
Unsubscribe: 
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to