Author: Amaury Forgeot d'Arc <[email protected]>
Branch: py3k
Changeset: r50644:e20a4198d35c
Date: 2011-12-17 21:52 +0100
http://bitbucket.org/pypy/pypy/changeset/e20a4198d35c/
Log: Unicode fixes for the bz2 module
diff --git a/pypy/module/bz2/interp_bz2.py b/pypy/module/bz2/interp_bz2.py
--- a/pypy/module/bz2/interp_bz2.py
+++ b/pypy/module/bz2/interp_bz2.py
@@ -4,7 +4,7 @@
from pypy.rpython.lltypesystem import lltype
from pypy.interpreter.error import OperationError, operationerrfmt
from pypy.interpreter.baseobjspace import Wrappable
-from pypy.interpreter.typedef import TypeDef, interp_attrproperty
+from pypy.interpreter.typedef import TypeDef, interp_attrproperty_bytes
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.rlib.streamio import Stream
from pypy.translator.tool.cbuild import ExternalCompilationInfo
@@ -410,7 +410,7 @@
if self.decompressor.running:
raise OperationError(self.space.w_EOFError,
self.space.wrap("compressed file ended before
the logical end-of-the-stream was detected"))
- result = self.space.str_w(w_result)
+ result = self.space.bytes_w(w_result)
self.readlength += len(result)
if len(self.buffer) != self.pos:
pos = self.pos
@@ -438,7 +438,7 @@
self.finished = True
return ""
raise
- self.buffer = self.space.str_w(w_read)
+ self.buffer = self.space.bytes_w(w_read)
self.pos = 0
if len(self.buffer) - self.pos >= n:
pos = self.pos
@@ -476,11 +476,11 @@
self.writtenlength = 0
def close(self):
- self.stream.write(self.space.str_w(self.compressor.flush()))
+ self.stream.write(self.space.bytes_w(self.compressor.flush()))
self.stream.close()
def write(self, data):
- self.stream.write(self.space.str_w(self.compressor.compress(data)))
+ self.stream.write(self.space.bytes_w(self.compressor.compress(data)))
self.writtenlength += len(data)
def tell(self):
@@ -548,7 +548,7 @@
datasize = len(data)
if datasize == 0:
- return self.space.wrap("")
+ return self.space.wrapbytes("")
if not self.running:
raise OperationError(self.space.w_ValueError,
@@ -576,7 +576,7 @@
out.prepare_next_chunk()
res = out.make_result_string()
- return self.space.wrap(res)
+ return self.space.wrapbytes(res)
def flush(self):
if not self.running:
@@ -596,7 +596,7 @@
out.prepare_next_chunk()
res = out.make_result_string()
- return self.space.wrap(res)
+ return self.space.wrapbytes(res)
W_BZ2Compressor.typedef = TypeDef("BZ2Compressor",
__doc__ = W_BZ2Compressor.__doc__,
@@ -650,7 +650,7 @@
unused_data attribute."""
if data == '':
- return self.space.wrap('')
+ return self.space.wrapbytes('')
if not self.running:
raise OperationError(self.space.w_EOFError,
self.space.wrap("end of stream was already found"))
@@ -684,13 +684,13 @@
out.prepare_next_chunk()
res = out.make_result_string()
- return self.space.wrap(res)
+ return self.space.wrapbytes(res)
W_BZ2Decompressor.typedef = TypeDef("BZ2Decompressor",
__doc__ = W_BZ2Decompressor.__doc__,
__new__ = interp2app(descr_decompressor__new__),
- unused_data = interp_attrproperty("unused_data", W_BZ2Decompressor),
+ unused_data = interp_attrproperty_bytes("unused_data", W_BZ2Decompressor),
decompress = interp2app(W_BZ2Decompressor.decompress),
)
@@ -738,7 +738,7 @@
res = out.make_result_string()
BZ2_bzCompressEnd(bzs)
- return space.wrap(res)
+ return space.wrapbytes(res)
@unwrap_spec(data='bufferstr')
def decompress(space, data):
@@ -749,7 +749,7 @@
in_bufsize = len(data)
if in_bufsize == 0:
- return space.wrap("")
+ return space.wrapbytes("")
with lltype.scoped_alloc(bz_stream.TO, zero=True) as bzs:
with lltype.scoped_alloc(rffi.CCHARP.TO, in_bufsize) as in_buf:
@@ -780,4 +780,4 @@
res = out.make_result_string()
BZ2_bzDecompressEnd(bzs)
- return space.wrap(res)
+ return space.wrapbytes(res)
diff --git a/pypy/module/bz2/test/test_bz2_compdecomp.py
b/pypy/module/bz2/test/test_bz2_compdecomp.py
--- a/pypy/module/bz2/test/test_bz2_compdecomp.py
+++ b/pypy/module/bz2/test/test_bz2_compdecomp.py
@@ -1,6 +1,7 @@
from pypy.conftest import gettestobjspace
from pypy.module.bz2.test.support import CheckAllocation
from pypy.module.bz2 import interp_bz2
+from pypy.interpreter.gateway import interp2app
import os, py
HUGE_OK = False
@@ -12,7 +13,7 @@
def setup_module(mod):
DATA =
'BZh91AY&SY.\xc8N\x18\x00\x01>_\x80\x00\x10@\x02\xff\xf0\x01\x07n\x00?\xe7\xff\xe00\x01\x99\xaa\x00\xc0\x03F\x86\x8c#&\x83F\x9a\x03\x06\xa6\xd0\xa6\x93M\x0fQ\xa7\xa8\x06\x804hh\x12$\x11\xa4i4\xf14S\xd2<Q\xb5\x0fH\xd3\xd4\xdd\xd5\x87\xbb\xf8\x94\r\x8f\xafI\x12\xe1\xc9\xf8/E\x00pu\x89\x12]\xc9\xbbDL\nQ\x0e\t1\x12\xdf\xa0\xc0\x97\xac2O9\x89\x13\x94\x0e\x1c7\x0ed\x95I\x0c\xaaJ\xa4\x18L\x10\x05#\x9c\xaf\xba\xbc/\x97\x8a#C\xc8\xe1\x8cW\xf9\xe2\xd0\xd6M\xa7\x8bXa<e\x84t\xcbL\xb3\xa7\xd9\xcd\xd1\xcb\x84.\xaf\xb3\xab\xab\xad`n}\xa0lh\tE,\x8eZ\x15\x17VH>\x88\xe5\xcd9gd6\x0b\n\xe9\x9b\xd5\x8a\x99\xf7\x08.K\x8ev\xfb\xf7xw\xbb\xdf\xa1\x92\xf1\xdd|/";\xa2\xba\x9f\xd5\xb1#A\xb6\xf6\xb3o\xc9\xc5y\\\xebO\xe7\x85\x9a\xbc\xb6f8\x952\xd5\xd7"%\x89>V,\xf7\xa6z\xe2\x9f\xa3\xdf\x11\x11"\xd6E)I\xa9\x13^\xca\xf3r\xd0\x03U\x922\xf26\xec\xb6\xed\x8b\xc3U\x13\x9d\xc5\x170\xa4\xfa^\x92\xacDF\x8a\x97\xd6\x19\xfe\xdd\xb8\xbd\x1a\x9a\x19\xa3\x80ankR\x8b\xe5\xd83]\xa9\xc6\x08\x82f\xf6\xb9"6l$\xb8j@\xc0\x
8a\xb0l1..\xbak\x83ls\x15\xbc\xf4\xc1\x13\xbe\xf8E\xb8\x9d\r\xa8\x9dk\x84\xd3n\xfa\xacQ\x07\xb1%y\xaav\xb4\x08\xe0z\x1b\x16\xf5\x04\xe9\xcc\xb9\x08z\x1en7.G\xfc]\xc9\x14\xe1B@\xbb!8`'
- def decompress(self, data):
+ def decompress(data):
import popen2
import bz2
pop = popen2.Popen3("bunzip2", capturestderr=1)
@@ -40,9 +41,11 @@
def setup_class(cls):
space = gettestobjspace(usemodules=('bz2',))
cls.space = space
- cls.w_TEXT = space.wrap(TEXT)
- cls.w_decompress = space.wrap(decompress)
+ cls.w_TEXT = space.wrapbytes(TEXT)
cls.w_HUGE_OK = space.wrap(HUGE_OK)
+ def decompress_w(space, w_data):
+ return space.wrapbytes(decompress(space.bytes_w(w_data)))
+ cls.w_decompress = space.wrap(interp2app(decompress_w))
def test_creation(self):
from bz2 import BZ2Compressor
@@ -59,7 +62,7 @@
bz2c = BZ2Compressor()
raises(TypeError, bz2c.compress)
data = bz2c.compress(self.TEXT)
- data = "%s%s" % (data, bz2c.flush())
+ data += bz2c.flush()
assert self.decompress(data) == self.TEXT
def test_compress_huge_data(self):
@@ -71,7 +74,7 @@
bz2c = BZ2Compressor()
raises(TypeError, bz2c.compress)
data = bz2c.compress(HUGE_DATA)
- data = "%s%s" % (data, bz2c.flush())
+ data += bz2c.flush()
assert self.decompress(data) == HUGE_DATA
def test_compress_chunks_10(self):
@@ -79,30 +82,30 @@
bz2c = BZ2Compressor()
n = 0
- data = ""
+ data = b""
while True:
temp = self.TEXT[n * 10:(n + 1) * 10]
if not temp:
break
- data = "%s%s" % (data, bz2c.compress(temp))
+ data += bz2c.compress(temp)
n += 1
- data = "%s%s" % (data, bz2c.flush())
+ data += bz2c.flush()
assert self.decompress(data) == self.TEXT
def test_buffer(self):
from bz2 import BZ2Compressor
bz2c = BZ2Compressor()
data = bz2c.compress(buffer(self.TEXT))
- data = "%s%s" % (data, bz2c.flush())
+ data += bz2c.flush()
assert self.decompress(data) == self.TEXT
class AppTestBZ2Decompressor(CheckAllocation):
def setup_class(cls):
space = gettestobjspace(usemodules=('bz2',))
cls.space = space
- cls.w_TEXT = space.wrap(TEXT)
- cls.w_DATA = space.wrap(DATA)
- cls.w_BUGGY_DATA = space.wrap(BUGGY_DATA)
+ cls.w_TEXT = space.wrapbytes(TEXT)
+ cls.w_DATA = space.wrapbytes(DATA)
+ cls.w_BUGGY_DATA = space.wrapbytes(BUGGY_DATA)
def test_creation(self):
from bz2 import BZ2Decompressor
@@ -115,7 +118,7 @@
from bz2 import BZ2Decompressor
bz2d = BZ2Decompressor()
- assert bz2d.unused_data == ""
+ assert bz2d.unused_data == b""
def test_decompress(self):
from bz2 import BZ2Decompressor
@@ -129,13 +132,13 @@
from bz2 import BZ2Decompressor
bz2d = BZ2Decompressor()
- decompressed_data = ""
+ decompressed_data = b""
n = 0
while True:
temp = self.DATA[n * 10:(n + 1) * 10]
if not temp:
break
- decompressed_data = "%s%s" % (decompressed_data,
bz2d.decompress(temp))
+ decompressed_data += bz2d.decompress(temp)
n += 1
assert decompressed_data == self.TEXT
@@ -145,7 +148,7 @@
from bz2 import BZ2Decompressor
bz2d = BZ2Decompressor()
- unused_data = "this is unused data"
+ unused_data = b"this is unused data"
decompressed_data = bz2d.decompress(self.DATA + unused_data)
assert decompressed_data == self.TEXT
assert bz2d.unused_data == unused_data
@@ -155,7 +158,7 @@
bz2d = BZ2Decompressor()
bz2d.decompress(self.DATA)
- raises(EOFError, bz2d.decompress, "foo")
+ raises(EOFError, bz2d.decompress, b"foo")
def test_buffer(self):
from bz2 import BZ2Decompressor
@@ -167,24 +170,26 @@
from bz2 import BZ2Decompressor
bz2d = BZ2Decompressor()
decompressed_data = bz2d.decompress(self.BUGGY_DATA)
- assert decompressed_data == ''
+ assert decompressed_data == b''
raises(IOError, bz2d.decompress, self.BUGGY_DATA)
class AppTestBZ2ModuleFunctions(CheckAllocation):
def setup_class(cls):
space = gettestobjspace(usemodules=('bz2',))
cls.space = space
- cls.w_TEXT = space.wrap(TEXT)
- cls.w_DATA = space.wrap(DATA)
- cls.w_decompress = space.wrap(decompress)
+ cls.w_TEXT = space.wrapbytes(TEXT)
+ cls.w_DATA = space.wrapbytes(DATA)
cls.w_HUGE_OK = space.wrap(HUGE_OK)
+ def decompress_w(space, w_data):
+ return space.wrapbytes(decompress(space.bytes_w(w_data)))
+ cls.w_decompress = space.wrap(interp2app(decompress_w))
def test_compress_function(self):
from bz2 import compress
raises(TypeError, compress, 123)
- raises(ValueError, compress, "foo", 10)
- raises(TypeError, compress, "foo", "foo")
+ raises(ValueError, compress, b"foo", 10)
+ raises(TypeError, compress, b"foo", b"foo")
data = compress(self.TEXT)
assert self.decompress(data) == self.TEXT
@@ -203,7 +208,7 @@
import bz2
raises(TypeError, bz2.decompress)
- assert bz2.decompress("") == ""
+ assert bz2.decompress(b"") == b""
decompressed_data = bz2.decompress(self.DATA)
assert decompressed_data == self.TEXT
diff --git a/pypy/module/bz2/test/test_bz2_file.py
b/pypy/module/bz2/test/test_bz2_file.py
--- a/pypy/module/bz2/test/test_bz2_file.py
+++ b/pypy/module/bz2/test/test_bz2_file.py
@@ -3,6 +3,7 @@
import py
from pypy.conftest import gettestobjspace
from pypy.module.bz2.test.support import CheckAllocation
+from pypy.interpreter.gateway import interp2app
import os
import random
@@ -24,7 +25,7 @@
data = DATA[:100]
f.write(data, 'wb')
- def decompress(self, data):
+ def decompress(data):
import popen2
import bz2
pop = popen2.Popen3("bunzip2", capturestderr=1)
@@ -51,14 +52,16 @@
def setup_class(cls):
space = gettestobjspace(usemodules=('bz2',))
cls.space = space
- cls.w_TEXT = space.wrap(TEXT)
- cls.w_DATA = space.wrap(DATA)
- cls.w_DATA_CRLF = space.wrap(DATA_CRLF)
+ cls.w_TEXT = space.wrapbytes(TEXT)
+ cls.w_DATA = space.wrapbytes(DATA)
+ cls.w_DATA_CRLF = space.wrapbytes(DATA_CRLF)
cls.w_temppath = space.wrap(str(py.test.ensuretemp("bz2").join("foo")))
cls.w_create_temp_file = space.wrap(create_temp_file)
- cls.w_decompress = space.wrap(decompress)
+ def decompress_w(space, w_data):
+ return space.wrapbytes(decompress(space.bytes_w(w_data)))
+ cls.w_decompress = space.wrap(interp2app(decompress_w))
cls.w_create_broken_temp_file = space.wrap(create_broken_temp_file)
- cls.w_random_data = space.wrap(RANDOM_DATA)
+ cls.w_random_data = space.wrapbytes(RANDOM_DATA)
def test_attributes(self):
from bz2 import BZ2File
@@ -224,13 +227,13 @@
def test_readline(self):
from bz2 import BZ2File
- from cStringIO import StringIO
+ from io import BytesIO
self.create_temp_file()
bz2f = BZ2File(self.temppath)
# XXX
#raises(TypeError, bz2f.readline, None)
- sio = StringIO(self.TEXT)
+ sio = BytesIO(self.TEXT)
for line in sio.readlines():
line_read = bz2f.readline()
assert line_read == line
@@ -317,33 +320,33 @@
def test_readlines(self):
from bz2 import BZ2File
- from cStringIO import StringIO
+ from io import BytesIO
self.create_temp_file()
bz2f = BZ2File(self.temppath)
# XXX
#raises(TypeError, bz2f.readlines, None)
- sio = StringIO(self.TEXT)
+ sio = BytesIO(self.TEXT)
assert bz2f.readlines() == sio.readlines()
bz2f.close()
def test_iterator(self):
from bz2 import BZ2File
- from cStringIO import StringIO
+ from io import BytesIO
self.create_temp_file()
bz2f = BZ2File(self.temppath)
- sio = StringIO(self.TEXT)
+ sio = BytesIO(self.TEXT)
assert list(iter(bz2f)) == sio.readlines()
bz2f.close()
def test_xreadlines(self):
from bz2 import BZ2File
- from cStringIO import StringIO
+ from io import BytesIO
self.create_temp_file()
bz2f = BZ2File(self.temppath)
- sio = StringIO(self.TEXT)
+ sio = BytesIO(self.TEXT)
assert list(bz2f.xreadlines()) == sio.readlines()
bz2f.close()
@@ -351,7 +354,7 @@
# readlines()/xreadlines() for files containing no newline
from bz2 import BZ2File
- DATA =
'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00
\x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
+ DATA =
b'BZh91AY&SY\xd9b\x89]\x00\x00\x00\x03\x80\x04\x00\x02\x00\x0c\x00
\x00!\x9ah3M\x13<]\xc9\x14\xe1BCe\x8a%t'
f = open(self.temppath, "wb")
f.write(DATA)
f.close()
@@ -398,11 +401,11 @@
def test_writelines(self):
from bz2 import BZ2File
- from cStringIO import StringIO
+ from io import BytesIO
bz2f = BZ2File(self.temppath, 'w')
raises(TypeError, bz2f.writelines)
- sio = StringIO(self.TEXT)
+ sio = BytesIO(self.TEXT)
bz2f.writelines(sio.readlines())
bz2f.close()
f = open(self.temppath, "rb")
@@ -413,8 +416,8 @@
from bz2 import BZ2File
bz2f = BZ2File(self.temppath, 'r')
- raises(IOError, bz2f.write, "abc")
- raises(IOError, bz2f.writelines, ["abc"])
+ raises(IOError, bz2f.write, b"abc")
+ raises(IOError, bz2f.writelines, [b"abc"])
bz2f.close()
def test_write_bigger_file(self):
@@ -432,11 +435,11 @@
with BZ2File(self.temppath, 'w') as f:
assert not f.closed
- f.write("abc")
+ f.write(b"abc")
assert f.closed
with BZ2File(self.temppath, 'r') as f:
data = f.read()
- assert data == "abc"
+ assert data == b"abc"
assert f.closed
diff --git a/pypy/module/bz2/test/test_large.py
b/pypy/module/bz2/test/test_large.py
--- a/pypy/module/bz2/test/test_large.py
+++ b/pypy/module/bz2/test/test_large.py
@@ -8,7 +8,7 @@
py.test.skip("skipping this very slow test; try 'pypy-c -A'")
cls.space = gettestobjspace(usemodules=('bz2',))
largetest_bz2 = py.path.local(__file__).dirpath().join("largetest.bz2")
- cls.w_compressed_data = cls.space.wrap(largetest_bz2.read('rb'))
+ cls.w_compressed_data = cls.space.wrapbytes(largetest_bz2.read('rb'))
def test_decompress(self):
from bz2 import decompress
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit