Author: Richard Plangger <[email protected]>
Branch: py3.5
Changeset: r87448:865e96c7c129
Date: 2016-09-29 17:43 +0200
http://bitbucket.org/pypy/pypy/changeset/865e96c7c129/
Log: merged py3.5-bz2-lzma
diff --git a/pypy/module/bz2/interp_bz2.py b/pypy/module/bz2/interp_bz2.py
--- a/pypy/module/bz2/interp_bz2.py
+++ b/pypy/module/bz2/interp_bz2.py
@@ -10,6 +10,7 @@
from rpython.translator.tool.cbuild import ExternalCompilationInfo
from rpython.translator.platform import platform as compiler
from rpython.rlib.rarithmetic import intmask, r_longlong
+from rpython.rlib import rgil
import sys
@@ -266,6 +267,7 @@
must be a number between 1 and 9."""
def __init__(self, space, compresslevel):
self.space = space
+ self._lock = space.allocate_lock()
self.bzs = lltype.malloc(bz_stream.TO, flavor='raw', zero=True)
try:
self.running = False
@@ -276,6 +278,15 @@
raise
self.register_finalizer(space)
+ def lock(self):
+ if not self._lock.acquire(False):
+ rgil.release()
+ self._lock.acquire(True)
+ rgil.acquire()
+
+ def unlock(self):
+ self._lock.release()
+
def _init_bz2comp(self, compresslevel):
if compresslevel < 1 or compresslevel > 9:
raise oefmt(self.space.w_ValueError,
@@ -306,35 +317,39 @@
to compress, call the flush() method to finish the compression process,
and return what is left in the internal buffers."""
- datasize = len(data)
+ try:
+ self.lock()
+ datasize = len(data)
- if datasize == 0:
- return self.space.newbytes("")
+ if datasize == 0:
+ return self.space.newbytes("")
- if not self.running:
- raise oefmt(self.space.w_ValueError,
- "this object was already flushed")
+ if not self.running:
+ raise oefmt(self.space.w_ValueError,
+ "this object was already flushed")
- in_bufsize = datasize
+ in_bufsize = datasize
- with OutBuffer(self.bzs) as out:
- with rffi.scoped_nonmovingbuffer(data) as in_buf:
+ with OutBuffer(self.bzs) as out:
+ with rffi.scoped_nonmovingbuffer(data) as in_buf:
- self.bzs.c_next_in = in_buf
- rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+ self.bzs.c_next_in = in_buf
+ rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
- while True:
- bzerror = BZ2_bzCompress(self.bzs, BZ_RUN)
- if bzerror != BZ_RUN_OK:
- _catch_bz2_error(self.space, bzerror)
+ while True:
+ bzerror = BZ2_bzCompress(self.bzs, BZ_RUN)
+ if bzerror != BZ_RUN_OK:
+ _catch_bz2_error(self.space, bzerror)
- if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
- break
- elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
- out.prepare_next_chunk()
+ if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
+ break
+ elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
+ out.prepare_next_chunk()
- res = out.make_result_string()
- return self.space.newbytes(res)
+ res = out.make_result_string()
+ return self.space.newbytes(res)
+ finally:
+ self.unlock()
def flush(self):
if not self.running:
@@ -342,19 +357,23 @@
"this object was already flushed")
self.running = False
- with OutBuffer(self.bzs) as out:
- while True:
- bzerror = BZ2_bzCompress(self.bzs, BZ_FINISH)
- if bzerror == BZ_STREAM_END:
- break
- elif bzerror != BZ_FINISH_OK:
- _catch_bz2_error(self.space, bzerror)
+ try:
+ self.lock()
+ with OutBuffer(self.bzs) as out:
+ while True:
+ bzerror = BZ2_bzCompress(self.bzs, BZ_FINISH)
+ if bzerror == BZ_STREAM_END:
+ break
+ elif bzerror != BZ_FINISH_OK:
+ _catch_bz2_error(self.space, bzerror)
- if rffi.getintfield(self.bzs, 'c_avail_out') == 0:
- out.prepare_next_chunk()
+ if rffi.getintfield(self.bzs, 'c_avail_out') == 0:
+ out.prepare_next_chunk()
- res = out.make_result_string()
- return self.space.newbytes(res)
+ res = out.make_result_string()
+ return self.space.newbytes(res)
+ finally:
+ self.unlock()
W_BZ2Compressor.typedef = TypeDef("_bz2.BZ2Compressor",
__doc__ = W_BZ2Compressor.__doc__,
@@ -380,6 +399,7 @@
def __init__(self, space):
self.space = space
+ self._lock = space.allocate_lock()
self.bzs = lltype.malloc(bz_stream.TO, flavor='raw', zero=True)
try:
@@ -396,6 +416,15 @@
raise
self.register_finalizer(space)
+ def lock(self):
+ if not self._lock.acquire(False):
+ rgil.release()
+ self._lock.acquire(True)
+ rgil.acquire()
+
+ def unlock(self):
+ self._lock.release()
+
def _init_bz2decomp(self):
bzerror = BZ2_bzDecompressInit(self.bzs, 0, 0)
if bzerror != BZ_OK:
@@ -425,11 +454,14 @@
return space.w_True
def _decompress_buf(self, data, max_length):
- in_bufsize = len(data)
+ total_in = len(data)
+ in_bufsize = min(total_in, UINT_MAX)
+ total_in -= in_bufsize
with rffi.scoped_nonmovingbuffer(data) as in_buf:
# setup the input and the size it can consume
self.bzs.c_next_in = in_buf
rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+ self.left_to_process = in_bufsize
with OutBuffer(self.bzs, max_length=max_length) as out:
while True:
@@ -450,13 +482,7 @@
break
out.prepare_next_chunk()
- if not self.running:
- self.needs_input = False
- if self.left_to_process != 0:
- end = len(data)
- start = end - self.left_to_process
- assert start > 0
- self.unused_data = data[start:]
+ self.left_to_process += total_in
res = out.make_result_string()
return self.space.newbytes(res)
@@ -470,30 +496,38 @@
was found after the end of stream, it'll be ignored and saved in
unused_data attribute."""
- if not self.running:
- raise oefmt(self.space.w_EOFError,
- "end of stream was already found")
- datalen = len(data)
- if len(self.input_buffer) > 0:
- input_buffer_in_use = True
- data = self.input_buffer + data
+ try:
+ self.lock()
+ if not self.running:
+ raise oefmt(self.space.w_EOFError,
+ "end of stream was already found")
datalen = len(data)
- result = self._decompress_buf(data, max_length)
- else:
- input_buffer_in_use = False
+ if len(self.input_buffer) > 0:
+ data = self.input_buffer + data
+ datalen = len(data)
+ self.input_buffer = ""
+
result = self._decompress_buf(data, max_length)
- if self.left_to_process == 0:
- self.input_buffer = ""
- self.needs_input = True
- else:
- self.needs_input = False
- if not input_buffer_in_use:
- start = datalen-self.left_to_process
- assert start > 0
- self.input_buffer = data[start:]
-
- return result
+ if not self.running: # eq. with eof == Ture
+ self.needs_input = False
+ if self.left_to_process != 0:
+ start = datalen - self.left_to_process
+ assert start > 0
+ self.unused_data = data[start:]
+ self.left_to_process = 0
+ elif self.left_to_process == 0:
+ self.input_buffer = ""
+ self.needs_input = True
+ else:
+ self.needs_input = False
+ if self.left_to_process > 0:
+ start = datalen-self.left_to_process
+ assert start >= 0
+ self.input_buffer = data[start:]
+ return result
+ finally:
+ self.unlock()
diff --git a/pypy/module/bz2/test/test_bz2_compdecomp.py
b/pypy/module/bz2/test/test_bz2_compdecomp.py
--- a/pypy/module/bz2/test/test_bz2_compdecomp.py
+++ b/pypy/module/bz2/test/test_bz2_compdecomp.py
@@ -213,8 +213,8 @@
assert len(decomp[-1]) == 100
while not bz2d.eof:
- decomp.append(bz2d.decompress(b"", max_length=50))
- assert len(decomp[-1]) <= 50
+ decomp.append(bz2d.decompress(b"", max_length=200))
+ assert len(decomp[-1]) <= 200
assert b''.join(decomp) == self.TEXT
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit