Author: Ronan Lamy <[email protected]>
Branch: unicode-utf8
Changeset: r93177:a40f7eee2bcf
Date: 2017-11-26 01:27 +0000
http://bitbucket.org/pypy/pypy/changeset/a40f7eee2bcf/
Log: hg merge default
diff --git a/extra_tests/test_textio.py b/extra_tests/test_textio.py
--- a/extra_tests/test_textio.py
+++ b/extra_tests/test_textio.py
@@ -14,7 +14,8 @@
mode=st.sampled_from(['\r', '\n', '\r\n', '']),
limit=st.integers(min_value=-1))
def test_readline(txt, mode, limit):
- textio = TextIOWrapper(BytesIO(txt.encode('utf-8')), newline=mode)
+ textio = TextIOWrapper(
+ BytesIO(txt.encode('utf-8')), encoding='utf-8', newline=mode)
lines = []
while True:
line = textio.readline(limit)
diff --git a/pypy/module/_io/interp_stringio.py
b/pypy/module/_io/interp_stringio.py
--- a/pypy/module/_io/interp_stringio.py
+++ b/pypy/module/_io/interp_stringio.py
@@ -2,21 +2,115 @@
from pypy.interpreter.typedef import (
TypeDef, generic_new_descr, GetSetProperty)
from pypy.interpreter.gateway import interp2app, unwrap_spec, WrappedDefault
-from pypy.module._io.interp_textio import W_TextIOBase,
W_IncrementalNewlineDecoder
+from pypy.module._io.interp_textio import (
+ W_TextIOBase, W_IncrementalNewlineDecoder)
from pypy.module._io.interp_iobase import convert_size
+class UnicodeIO(object):
+ def __init__(self, data=None, pos=0):
+ if data is None:
+ data = []
+ self.data = data
+ self.pos = pos
+
+ def resize(self, newlength):
+ if len(self.data) > newlength:
+ self.data = self.data[:newlength]
+ if len(self.data) < newlength:
+ self.data.extend([u'\0'] * (newlength - len(self.data)))
+
+ def read(self, size):
+ start = self.pos
+ available = len(self.data) - start
+ if available <= 0:
+ return u''
+ if size >= 0 and size <= available:
+ end = start + size
+ else:
+ end = len(self.data)
+ assert 0 <= start <= end
+ self.pos = end
+ return u''.join(self.data[start:end])
+
+ def _convert_limit(self, limit):
+ if limit < 0 or limit > len(self.data) - self.pos:
+ limit = len(self.data) - self.pos
+ assert limit >= 0
+ return limit
+
+ def readline_universal(self, limit):
+ # Universal newline search. Find any of \r, \r\n, \n
+ limit = self._convert_limit(limit)
+ start = self.pos
+ end = start + limit
+ pos = start
+ while pos < end:
+ ch = self.data[pos]
+ pos += 1
+ if ch == '\n':
+ break
+ if ch == '\r':
+ if pos >= end:
+ break
+ if self.data[pos] == '\n':
+ pos += 1
+ break
+ else:
+ break
+ self.pos = pos
+ result = u''.join(self.data[start:pos])
+ return result
+
+ def readline(self, marker, limit):
+ start = self.pos
+ limit = self._convert_limit(limit)
+ end = start + limit
+ found = False
+ for pos in range(start, end - len(marker) + 1):
+ ch = self.data[pos]
+ if ch == marker[0]:
+ for j in range(1, len(marker)):
+ if self.data[pos + j] != marker[j]:
+ break # from inner loop
+ else:
+ pos += len(marker)
+ found = True
+ break
+ if not found:
+ pos = end
+ self.pos = pos
+ result = u''.join(self.data[start:pos])
+ return result
+
+ def write(self, string):
+ length = len(string)
+ if self.pos + length > len(self.data):
+ self.resize(self.pos + length)
+
+ for i in range(length):
+ self.data[self.pos + i] = string[i]
+ self.pos += length
+
+ def seek(self, pos):
+ self.pos = pos
+
+ def truncate(self, size):
+ if size < len(self.data):
+ self.resize(size)
+
+ def getvalue(self):
+ return u''.join(self.data)
+
class W_StringIO(W_TextIOBase):
def __init__(self, space):
W_TextIOBase.__init__(self, space)
- self.buf = []
- self.pos = 0
+ self.buf = UnicodeIO()
- @unwrap_spec(w_newline = WrappedDefault("\n"))
+ @unwrap_spec(w_newline=WrappedDefault("\n"))
def descr_init(self, space, w_initvalue=None, w_newline=None):
# In case __init__ is called multiple times
- self.buf = []
- self.pos = 0
+ self.buf = UnicodeIO()
self.w_decoder = None
self.readnl = None
self.writenl = None
@@ -27,7 +121,7 @@
newline = space.unicode_w(w_newline)
if (newline is not None and newline != u"" and newline != u"\n" and
- newline != u"\r" and newline != u"\r\n"):
+ newline != u"\r" and newline != u"\r\n"):
# Not using oefmt() because I don't know how to use it
# with unicode
raise OperationError(space.w_ValueError,
@@ -50,7 +144,7 @@
if not space.is_none(w_initvalue):
self.write_w(space, w_initvalue)
- self.pos = 0
+ self.buf.pos = 0
def descr_getstate(self, space):
w_initialval = self.getvalue_w(space)
@@ -58,9 +152,9 @@
if self.readnl is None:
w_readnl = space.w_None
else:
- w_readnl = space.str(space.newunicode(self.readnl)) # YYY
+ w_readnl = space.str(space.newunicode(self.readnl)) # YYY
return space.newtuple([
- w_initialval, w_readnl, space.newint(self.pos), w_dict
+ w_initialval, w_readnl, space.newint(self.buf.pos), w_dict
])
def descr_setstate(self, space, w_state):
@@ -69,34 +163,33 @@
# We allow the state tuple to be longer than 4, because we may need
# someday to extend the object's state without breaking
# backwards-compatibility
- if not space.isinstance_w(w_state, space.w_tuple) or
space.len_w(w_state) < 4:
+ if (not space.isinstance_w(w_state, space.w_tuple)
+ or space.len_w(w_state) < 4):
raise oefmt(space.w_TypeError,
"%T.__setstate__ argument should be a 4-tuple, got %T",
self, w_state)
w_initval, w_readnl, w_pos, w_dict = space.unpackiterable(w_state, 4)
+ if not space.isinstance_w(w_initval, space.w_unicode):
+ raise oefmt(space.w_TypeError,
+ "unicode argument expected, got '%T'", w_initval)
# Initialize state
- self.descr_init(space, w_initval, w_readnl)
+ self.descr_init(space, None, w_readnl)
- # Restore the buffer state. Even if __init__ did initialize the buffer,
- # we have to initialize it again since __init__ may translates the
- # newlines in the inital_value string. We clearly do not want that
+ # Restore the buffer state. We're not doing it via __init__
# because the string value in the state tuple has already been
# translated once by __init__. So we do not take any chance and replace
# object's buffer completely
initval = space.unicode_w(w_initval)
- size = len(initval)
- self.resize_buffer(size)
- self.buf = list(initval)
pos = space.getindex_w(w_pos, space.w_TypeError)
if pos < 0:
raise oefmt(space.w_ValueError,
"position value cannot be negative")
- self.pos = pos
+ self.buf = UnicodeIO(list(initval), pos)
if not space.is_w(w_dict, space.w_None):
if not space.isinstance_w(w_dict, space.w_dict):
- raise oefmt(space.w_TypeError,
- "fourth item of state should be a dict, got a %T",
- w_dict)
+ raise oefmt(
+ space.w_TypeError,
+ "fourth item of state should be a dict, got a %T", w_dict)
# Alternatively, we could replace the internal dictionary
# completely. However, it seems more practical to just update it.
space.call_method(self.w_dict, "update", w_dict)
@@ -107,86 +200,47 @@
message = "I/O operation on closed file"
raise OperationError(space.w_ValueError, space.newtext(message))
- def resize_buffer(self, newlength):
- if len(self.buf) > newlength:
- self.buf = self.buf[:newlength]
- if len(self.buf) < newlength:
- self.buf.extend([u'\0'] * (newlength - len(self.buf)))
-
- def write(self, string):
- length = len(string)
- if self.pos + length > len(self.buf):
- self.resize_buffer(self.pos + length)
-
- for i in range(length):
- self.buf[self.pos + i] = string[i]
- self.pos += length
-
def write_w(self, space, w_obj):
if not space.isinstance_w(w_obj, space.w_unicode):
raise oefmt(space.w_TypeError,
"unicode argument expected, got '%T'", w_obj)
self._check_closed(space)
-
orig_size = space.len_w(w_obj)
if self.w_decoder is not None:
w_decoded = space.call_method(
- self.w_decoder, "decode", w_obj, space.w_True
- )
+ self.w_decoder, "decode", w_obj, space.w_True)
else:
w_decoded = w_obj
-
if self.writenl:
w_decoded = space.call_method(
- w_decoded, "replace", space.newtext("\n"),
space.newunicode(self.writenl)
- )
+ w_decoded, "replace",
+ space.newtext("\n"), space.newunicode(self.writenl))
+ string = space.unicode_w(w_decoded)
+ if string:
+ self.buf.write(string)
- string = space.unicode_w(w_decoded)
- size = len(string)
-
- if size:
- self.write(string)
return space.newint(orig_size)
def read_w(self, space, w_size=None):
self._check_closed(space)
size = convert_size(space, w_size)
- start = self.pos
- available = len(self.buf) - start
- if available <= 0:
- return space.newunicode(u"")
- if size >= 0 and size <= available:
- end = start + size
- else:
- end = len(self.buf)
- assert 0 <= start <= end
- self.pos = end
- return space.newunicode(u''.join(self.buf[start:end]))
+ return space.newunicode(self.buf.read(size))
def readline_w(self, space, w_limit=None):
self._check_closed(space)
limit = convert_size(space, w_limit)
+ if self.readuniversal:
+ result = self.buf.readline_universal(limit)
+ else:
+ if self.readtranslate:
+ # Newlines are already translated, only search for \n
+ newline = u'\n'
+ else:
+ newline = self.readnl
+ result = self.buf.readline(newline, limit)
+ return space.newunicode(result)
- if self.pos >= len(self.buf):
- return space.newunicode(u"")
-
- start = self.pos
- if limit < 0 or limit > len(self.buf) - self.pos:
- limit = len(self.buf) - self.pos
- assert limit >= 0
-
- endpos, found = self._find_line_ending(
- # XXX: super inefficient, makes a copy of the entire contents.
- u"".join(self.buf),
- start,
- limit
- )
- if not found:
- endpos = start + limit
- assert endpos >= 0
- self.pos = endpos
- return space.newunicode(u"".join(self.buf[start:endpos]))
@unwrap_spec(pos=int, mode=int)
def seek_w(self, space, pos, mode=0):
@@ -202,32 +256,27 @@
# XXX: this makes almost no sense, but its how CPython does it.
if mode == 1:
- pos = self.pos
+ pos = self.buf.pos
elif mode == 2:
- pos = len(self.buf)
-
+ pos = len(self.buf.data)
assert pos >= 0
- self.pos = pos
+ self.buf.seek(pos)
return space.newint(pos)
def truncate_w(self, space, w_size=None):
self._check_closed(space)
if space.is_none(w_size):
- size = self.pos
+ size = self.buf.pos
else:
size = space.int_w(w_size)
-
if size < 0:
raise oefmt(space.w_ValueError, "Negative size value %d", size)
-
- if size < len(self.buf):
- self.resize_buffer(size)
-
+ self.buf.truncate(size)
return space.newint(size)
def getvalue_w(self, space):
self._check_closed(space)
- return space.newunicode(u''.join(self.buf))
+ return space.newunicode(self.buf.getvalue())
def readable_w(self, space):
self._check_closed(space)
diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py
--- a/pypy/module/_io/interp_textio.py
+++ b/pypy/module/_io/interp_textio.py
@@ -221,50 +221,6 @@
def newlines_get_w(self, space):
return space.w_None
- def _find_newline_universal(self, line, start, limit):
- # Universal newline search. Find any of \r, \r\n, \n
- # The decoder ensures that \r\n are not split in two pieces
- limit = min(limit, len(line) - start)
- end = start + limit
- i = start
- while i < end:
- ch = line[i]
- i += 1
- if ch == '\n':
- return i, True
- if ch == '\r':
- if i >= end:
- break
- if line[i] == '\n':
- return i + 1, True
- else:
- return i, True
- return end, False
-
- def _find_marker(self, marker, line, start, limit):
- limit = min(limit, len(line) - start)
- end = start + limit
- for i in range(start, end - len(marker) + 1):
- ch = line[i]
- if ch == marker[0]:
- for j in range(1, len(marker)):
- if line[i + j] != marker[j]:
- break # from inner loop
- else:
- return i + len(marker), True
- return end - len(marker) + 1, False
-
- def _find_line_ending(self, line, start, limit):
- if self.readuniversal:
- return self._find_newline_universal(line, start, limit)
- if self.readtranslate:
- # Newlines are already translated, only search for \n
- newline = '\n'
- else:
- # Non-universal mode.
- newline = self.readnl
- return self._find_marker(newline, line, start, limit)
-
W_TextIOBase.typedef = TypeDef(
'_io._TextIOBase', W_IOBase.typedef,
__new__ = generic_new_descr(W_TextIOBase),
@@ -340,6 +296,126 @@
self.input = input
+class DecodeBuffer(object):
+ def __init__(self, text=None):
+ self.text = text
+ self.pos = 0
+
+ def set(self, space, w_decoded):
+ check_decoded(space, w_decoded)
+ self.text = space.unicode_w(w_decoded)
+ self.pos = 0
+
+ def reset(self):
+ self.text = None
+ self.pos = 0
+
+ def get_chars(self, size):
+ if self.text is None:
+ return u""
+
+ available = len(self.text) - self.pos
+ if size < 0 or size > available:
+ size = available
+ assert size >= 0
+
+ if self.pos > 0 or size < available:
+ start = self.pos
+ end = self.pos + size
+ assert start >= 0
+ assert end >= 0
+ chars = self.text[start:end]
+ else:
+ chars = self.text
+
+ self.pos += size
+ return chars
+
+ def has_data(self):
+ return (self.text is not None and not self.exhausted())
+
+ def exhausted(self):
+ return self.pos >= len(self.text)
+
+ def next_char(self):
+ if self.exhausted():
+ raise StopIteration
+ ch = self.text[self.pos]
+ self.pos += 1
+ return ch
+
+ def peek_char(self):
+ # like next_char, but doesn't advance pos
+ if self.exhausted():
+ raise StopIteration
+ ch = self.text[self.pos]
+ return ch
+
+ def find_newline_universal(self, limit):
+ # Universal newline search. Find any of \r, \r\n, \n
+ # The decoder ensures that \r\n are not split in two pieces
+ if limit < 0:
+ limit = sys.maxint
+ scanned = 0
+ while scanned < limit:
+ try:
+ ch = self.next_char()
+ except StopIteration:
+ return False
+ if ch == u'\n':
+ return True
+ if ch == u'\r':
+ if scanned >= limit:
+ return False
+ try:
+ ch = self.peek_char()
+ except StopIteration:
+ return False
+ if ch == u'\n':
+ self.next_char()
+ return True
+ else:
+ return True
+ return False
+
+ def find_crlf(self, limit):
+ if limit < 0:
+ limit = sys.maxint
+ scanned = 0
+ while scanned < limit:
+ try:
+ ch = self.next_char()
+ except StopIteration:
+ return False
+ scanned += 1
+ if ch == u'\r':
+ if scanned >= limit:
+ return False
+ try:
+ if self.peek_char() == u'\n':
+ self.next_char()
+ return True
+ except StopIteration:
+ # This is the tricky case: we found a \r right at the end
+ self.pos -= 1
+ return False
+ return False
+
+ def find_char(self, marker, limit):
+ if limit < 0:
+ limit = sys.maxint
+ scanned = 0
+ while scanned < limit:
+ try:
+ ch = self.next_char()
+ except StopIteration:
+ return False
+ if ch == marker:
+ return True
+ scanned += 1
+ return False
+
+
def check_decoded(space, w_decoded):
if not space.isinstance_w(w_decoded, space.w_unicode):
msg = "decoder should return a string result, not '%T'"
@@ -353,8 +429,7 @@
self.w_encoder = None
self.w_decoder = None
- self.decoded_chars = None # buffer for text returned from decoder
- self.decoded_chars_used = 0 # offset into _decoded_chars for read()
+ self.decoded = DecodeBuffer()
self.pending_bytes = None # list of bytes objects waiting to be
# written, or NULL
self.chunk_size = 8192
@@ -522,44 +597,10 @@
# _____________________________________________________________
# read methods
- def _unset_decoded(self):
- self.decoded_chars = None
- self.decoded_chars_used = 0
-
- def _set_decoded(self, space, w_decoded):
- check_decoded(space, w_decoded)
- self.decoded_chars = space.utf8_w(w_decoded)
- self.decoded_chars_used = 0
-
- def _get_decoded_chars(self, size):
- if self.decoded_chars is None:
- return ""
-
- available = len(self.decoded_chars) - self.decoded_chars_used
- if size < 0 or size > available:
- size = available
- assert size >= 0
-
- if self.decoded_chars_used > 0 or size < available:
- start = self.decoded_chars_used
- end = self.decoded_chars_used + size
- assert start >= 0
- assert end >= 0
- chars = self.decoded_chars[start:end]
- else:
- chars = self.decoded_chars
-
- self.decoded_chars_used += size
- return chars
-
- def _has_data(self):
- return (self.decoded_chars is not None and
- self.decoded_chars_used < len(self.decoded_chars))
-
def _read_chunk(self, space):
"""Read and decode the next chunk of data from the BufferedReader.
The return value is True unless EOF was reached. The decoded string
- is placed in self._decoded_chars (replacing its previous value).
+ is placed in self.decoded (replacing its previous value).
The entire input chunk is sent to the decoder, though some of it may
remain buffered in the decoder, yet to be converted."""
@@ -579,7 +620,7 @@
dec_buffer = None
dec_flags = 0
- # Read a chunk, decode it, and put the result in self._decoded_chars
+ # Read a chunk, decode it, and put the result in self.decoded
w_input = space.call_method(self.w_buffer, "read1",
space.newint(self.chunk_size))
@@ -591,7 +632,7 @@
eof = space.len_w(w_input) == 0
w_decoded = space.call_method(self.w_decoder, "decode",
w_input, space.newbool(eof))
- self._set_decoded(space, w_decoded)
+ self.decoded.set(space, w_decoded)
if space.len_w(w_decoded) > 0:
eof = False
@@ -604,10 +645,10 @@
return not eof
def _ensure_data(self, space):
- while not self._has_data():
+ while not self.decoded.has_data():
try:
if not self._read_chunk(space):
- self._unset_decoded()
+ self.decoded.reset()
self.snapshot = None
return False
except OperationError as e:
@@ -640,7 +681,7 @@
w_bytes = space.call_method(self.w_buffer, "read")
w_decoded = space.call_method(self.w_decoder, "decode", w_bytes,
space.w_True)
check_decoded(space, w_decoded)
- w_result = space.new_from_utf8(self._get_decoded_chars(-1))
+ w_result = space.new_from_utf8(self.decoded.get_chars(-1))
w_final = space.add(w_result, w_decoded)
self.snapshot = None
return w_final
@@ -652,82 +693,79 @@
while remaining > 0:
if not self._ensure_data(space):
break
- data = self._get_decoded_chars(remaining)
+ data = self.decoded.get_chars(remaining)
builder.append(data)
remaining -= len(data)
return space.new_from_utf8(builder.build())
+ def _scan_line_ending(self, limit):
+ if self.readuniversal:
+ return self.decoded.find_newline_universal(limit)
+ else:
+ if self.readtranslate:
+ # Newlines are already translated, only search for \n
+ newline = u'\n'
+ else:
+ # Non-universal mode.
+ newline = self.readnl
+ if newline == u'\r\n':
+ return self.decoded.find_crlf(limit)
+ else:
+ return self.decoded.find_char(newline[0], limit)
+
def readline_w(self, space, w_limit=None):
self._check_attached(space)
self._check_closed(space)
self._writeflush(space)
limit = convert_size(space, w_limit)
-
- line = None
remnant = None
builder = StringBuilder()
-
while True:
# First, get some data if necessary
has_data = self._ensure_data(space)
if not has_data:
# end of file
- start = end_scan = 0
+ if remnant:
+ builder.append(remnant)
break
if remnant:
assert not self.readtranslate and self.readnl == '\r\n'
- assert self.decoded_chars_used == 0
- if remnant == '\r' and self.decoded_chars[0] == '\n':
+ assert self.decoded.pos == 0
+ if remnant == '\r' and self.decoded.text[0] == '\n':
builder.append('\r\n')
- self.decoded_chars_used = 1
- line = remnant = None
- start = end_scan = 0
+ self.decoded.pos = 1
+ remnant = None
break
else:
builder.append(remnant)
remnant = None
continue
- line = self.decoded_chars
- start = self.decoded_chars_used
if limit > 0:
remaining = limit - builder.getlength()
assert remaining >= 0
else:
- remaining = sys.maxint
- end_scan, found = self._find_line_ending(line, start, remaining)
- assert end_scan >= 0
- if found:
+ remaining = -1
+ start = self.decoded.pos
+ assert start >= 0
+ found = self._scan_line_ending(remaining)
+ end_scan = self.decoded.pos
+ if end_scan > start:
+ s = self.decoded.text[start:end_scan]
+ builder.append(s)
+
+ if found or (limit >= 0 and builder.getlength() >= limit):
break
- if limit >= 0 and end_scan - start >= remaining:
- # Didn't find line ending, but reached length limit
- break
-
- # No line ending seen yet - put aside current data
- if end_scan > start:
- s = line[start:end_scan]
- builder.append(s)
-
# There may be some remaining chars we'll have to prepend to the
# next chunk of data
- if end_scan < len(line):
- remnant = line[end_scan:]
- line = None
+ if not self.decoded.exhausted():
+ remnant = self.decoded.get_chars(-1)
# We have consumed the buffer
- self._unset_decoded()
-
- if line:
- # Our line ends in the current buffer
- self.decoded_chars_used = end_scan
- if start > 0 or end_scan < len(line):
- line = line[start:end_scan]
- builder.append(line)
- elif remnant:
- builder.append(remnant)
+ self.decoded.reset()
result = builder.build()
return space.new_from_utf8(result)
@@ -861,7 +899,7 @@
raise oefmt(space.w_IOError,
"can't do nonzero end-relative seeks")
space.call_method(self, "flush")
- self._unset_decoded()
+ self.decoded.reset()
self.snapshot = None
if self.w_decoder:
space.call_method(self.w_decoder, "reset")
@@ -886,7 +924,7 @@
# Seek back to the safe start point
space.call_method(self.w_buffer, "seek",
space.newint(cookie.start_pos))
- self._unset_decoded()
+ self.decoded.reset()
self.snapshot = None
# Restore the decoder to its state from the safe start point.
@@ -907,13 +945,13 @@
w_decoded = space.call_method(self.w_decoder, "decode",
w_chunk,
space.newbool(bool(cookie.need_eof)))
- self._set_decoded(space, w_decoded)
+ self.decoded.set(space, w_decoded)
# Skip chars_to_skip of the decoded characters
- if len(self.decoded_chars) < cookie.chars_to_skip:
+ if len(self.decoded.text) < cookie.chars_to_skip:
raise oefmt(space.w_IOError,
"can't restore logical file position")
- self.decoded_chars_used = cookie.chars_to_skip
+ self.decoded.pos = cookie.chars_to_skip
else:
self.snapshot = PositionSnapshot(cookie.dec_flags, "")
@@ -939,7 +977,7 @@
w_pos = space.call_method(self.w_buffer, "tell")
if self.w_decoder is None or self.snapshot is None:
- assert not self.decoded_chars
+ assert not self.decoded.text
return w_pos
cookie = PositionCookie(space.bigint_w(w_pos))
@@ -950,11 +988,11 @@
cookie.start_pos -= len(input)
# How many decoded characters have been used up since the snapshot?
- if not self.decoded_chars_used:
+ if not self.decoded.pos:
# We haven't moved from the snapshot point.
return space.newlong_from_rbigint(cookie.pack())
- chars_to_skip = self.decoded_chars_used
+ chars_to_skip = self.decoded.pos
# Starting from the snapshot position, we will walk the decoder
# forward until it gives us enough decoded characters.
diff --git a/pypy/module/_io/test/test_interp_textio.py
b/pypy/module/_io/test/test_interp_textio.py
--- a/pypy/module/_io/test/test_interp_textio.py
+++ b/pypy/module/_io/test/test_interp_textio.py
@@ -1,6 +1,10 @@
-from hypothesis import given, strategies as st, assume
+import pytest
+try:
+ from hypothesis import given, strategies as st, assume
+except ImportError:
+ pytest.skip("hypothesis required")
from pypy.module._io.interp_bytesio import W_BytesIO
-from pypy.module._io.interp_textio import W_TextIOWrapper
+from pypy.module._io.interp_textio import W_TextIOWrapper, DecodeBuffer
LINESEP = ['', '\r', '\n', '\r\n']
@@ -31,3 +35,34 @@
else:
break
assert u''.join(lines) == txt
+
+@given(st.text())
+def test_read_buffer(text):
+ buf = DecodeBuffer(text)
+ assert buf.get_chars(-1) == text
+ assert buf.exhausted()
+
+@given(st.text(), st.lists(st.integers(min_value=0)))
+def test_readn_buffer(text, sizes):
+ buf = DecodeBuffer(text)
+ strings = []
+ for n in sizes:
+ s = buf.get_chars(n)
+ if not buf.exhausted():
+ assert len(s) == n
+ else:
+ assert len(s) <= n
+ strings.append(s)
+ assert ''.join(strings) == text[:sum(sizes)]
+
+@given(st.text())
+def test_next_char(text):
+ buf = DecodeBuffer(text)
+ chars = []
+ try:
+ while True:
+ chars.append(buf.next_char())
+ except StopIteration:
+ pass
+ assert buf.exhausted()
+ assert u''.join(chars) == text
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit