Author: Andrews Medina <andrewsmed...@gmail.com> Branch: stdlib-2.7.4-fixed-io Changeset: r65804:43221a2e8365 Date: 2013-07-28 18:46 -0300 http://bitbucket.org/pypy/pypy/changeset/43221a2e8365/
Log: fixed support for io in stdlib 2.7.4 diff --git a/lib-python/2.7/test/test_io.py b/lib-python/2.7/test/test_io.py --- a/lib-python/2.7/test/test_io.py +++ b/lib-python/2.7/test/test_io.py @@ -1004,6 +1004,7 @@ support.gc_collect() self.assertTrue(wr() is None, wr) + @support.impl_detail(cpython=True) def test_args_error(self): # Issue #17275 with self.assertRaisesRegexp(TypeError, "BufferedReader"): @@ -1302,6 +1303,7 @@ with self.open(support.TESTFN, "rb") as f: self.assertEqual(f.read(), b"123xxx") + @support.impl_detail(cpython=True) def test_args_error(self): # Issue #17275 with self.assertRaisesRegexp(TypeError, "BufferedWriter"): @@ -1676,6 +1678,7 @@ CBufferedReaderTest.test_garbage_collection(self) CBufferedWriterTest.test_garbage_collection(self) + @support.impl_detail(cpython=True) def test_args_error(self): # Issue #17275 with self.assertRaisesRegexp(TypeError, "BufferedRandom"): diff --git a/pypy/module/_io/interp_textio.py b/pypy/module/_io/interp_textio.py --- a/pypy/module/_io/interp_textio.py +++ b/pypy/module/_io/interp_textio.py @@ -1,7 +1,7 @@ import sys from pypy.interpreter.baseobjspace import W_Root -from pypy.interpreter.error import OperationError +from pypy.interpreter.error import OperationError, operationerrfmt from pypy.interpreter.gateway import WrappedDefault, interp2app, unwrap_spec from pypy.interpreter.typedef import ( GetSetProperty, TypeDef, generic_new_descr, interp_attrproperty, @@ -327,6 +327,13 @@ self.flags = flags self.input = input + +def check_decoded(space, w_decoded): + if not space.isinstance_w(w_decoded, space.w_unicode): + msg = "decoder should return a string result, not '%T'" + raise operationerrfmt(space.w_TypeError, msg, w_decoded) + + class W_TextIOWrapper(W_TextIOBase): def __init__(self, space): W_TextIOBase.__init__(self, space) @@ -546,9 +553,15 @@ # Read a chunk, decode it, and put the result in self._decoded_chars w_input = space.call_method(self.w_buffer, "read1", space.wrap(self.chunk_size)) + + if not space.isinstance_w(w_input, space.w_str): + msg = "decoder getstate() should have returned a bytes object not '%T'" + raise operationerrfmt(space.w_TypeError, msg, w_input) + eof = space.len_w(w_input) == 0 w_decoded = space.call_method(self.w_decoder, "decode", w_input, space.wrap(eof)) + check_decoded(space, w_decoded) self._set_decoded_chars(space.unicode_w(w_decoded)) if space.len_w(w_decoded) > 0: eof = False @@ -577,10 +590,12 @@ size = convert_size(space, w_size) self._writeflush(space) + if size < 0: # Read everything w_bytes = space.call_method(self.w_buffer, "read") w_decoded = space.call_method(self.w_decoder, "decode", w_bytes, space.w_True) + check_decoded(space, w_decoded) w_result = space.wrap(self._get_decoded_chars(-1)) w_final = space.add(w_result, w_decoded) self.snapshot = None @@ -701,6 +716,10 @@ if not self.w_encoder: raise OperationError(space.w_IOError, space.wrap("not writable")) + if not space.isinstance_w(w_text, space.w_unicode): + msg = "unicode argument expected, got '%T'" + raise operationerrfmt(space.w_TypeError, msg, w_text) + text = space.unicode_w(w_text) textlen = len(text) @@ -845,11 +864,16 @@ # Just like _read_chunk, feed the decoder and save a snapshot. w_chunk = space.call_method(self.w_buffer, "read", space.wrap(cookie.bytes_to_feed)) + if not space.isinstance_w(w_chunk, space.w_str): + msg = "underlying read() should have returned a bytes object, not '%T'" + raise operationerrfmt(space.w_TypeError, msg, w_chunk) + self.snapshot = PositionSnapshot(cookie.dec_flags, space.str_w(w_chunk)) w_decoded = space.call_method(self.w_decoder, "decode", w_chunk, space.wrap(cookie.need_eof)) + check_decoded(space, w_decoded) self._set_decoded_chars(space.unicode_w(w_decoded)) # Skip chars_to_skip of the decoded characters @@ -918,6 +942,7 @@ while i < len(input): w_decoded = space.call_method(self.w_decoder, "decode", space.wrap(input[i])) + check_decoded(space, w_decoded) chars_decoded += len(space.unicode_w(w_decoded)) cookie.bytes_to_feed += 1 @@ -942,6 +967,7 @@ w_decoded = space.call_method(self.w_decoder, "decode", space.wrap(""), space.wrap(1)) # final=1 + check_decoded(space, w_decoded) chars_decoded += len(space.unicode_w(w_decoded)) cookie.need_eof = 1 diff --git a/pypy/module/_io/test/test_fileio.py b/pypy/module/_io/test/test_fileio.py --- a/pypy/module/_io/test/test_fileio.py +++ b/pypy/module/_io/test/test_fileio.py @@ -98,6 +98,13 @@ f.close() f2.close() + def test_writelines_error(self): + import _io + txt = _io.TextIOWrapper(_io.BytesIO()) + raises(TypeError, txt.writelines, [1, 2, 3]) + raises(TypeError, txt.writelines, None) + raises(TypeError, txt.writelines, b'abc') + def test_seek(self): import _io f = _io.FileIO(self.tmpfile, 'rb') @@ -195,7 +202,7 @@ space.appexec([space.wrap(str(tmpfile))], """(tmpfile): import io f = io.open(tmpfile, 'w', encoding='ascii') - f.write('42') + f.write(u'42') # no flush() and no close() import sys; sys._keepalivesomewhereobscure = f """) diff --git a/pypy/module/_io/test/test_textio.py b/pypy/module/_io/test/test_textio.py --- a/pypy/module/_io/test/test_textio.py +++ b/pypy/module/_io/test/test_textio.py @@ -216,6 +216,29 @@ raises(IOError, txt.close) # exception not swallowed assert txt.closed + def test_illegal_decoder(self): + import _io + t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n', + encoding='quopri_codec') + raises(TypeError, t.read, 1) + t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n', + encoding='quopri_codec') + raises(TypeError, t.readline) + t = _io.TextIOWrapper(_io.BytesIO(b'aaaaaa'), newline='\n', + encoding='quopri_codec') + raises(TypeError, t.read) + + def test_read_nonbytes(self): + import _io + class NonbytesStream(_io.StringIO): + read1 = _io.StringIO.read + t = _io.TextIOWrapper(NonbytesStream(u'a')) + raises(TypeError, t.read, 1) + t = _io.TextIOWrapper(NonbytesStream(u'a')) + raises(TypeError, t.readline) + t = _io.TextIOWrapper(NonbytesStream(u'a')) + t.read() == u'a' + class AppTestIncrementalNewlineDecoder: def test_newline_decoder(self): _______________________________________________ pypy-commit mailing list pypy-commit@python.org http://mail.python.org/mailman/listinfo/pypy-commit