Author: Richard Plangger <[email protected]>
Branch: py3.5
Changeset: r87448:865e96c7c129
Date: 2016-09-29 17:43 +0200
http://bitbucket.org/pypy/pypy/changeset/865e96c7c129/

Log:    merged py3.5-bz2-lzma

diff --git a/pypy/module/bz2/interp_bz2.py b/pypy/module/bz2/interp_bz2.py
--- a/pypy/module/bz2/interp_bz2.py
+++ b/pypy/module/bz2/interp_bz2.py
@@ -10,6 +10,7 @@
 from rpython.translator.tool.cbuild import ExternalCompilationInfo
 from rpython.translator.platform import platform as compiler
 from rpython.rlib.rarithmetic import intmask, r_longlong
+from rpython.rlib import rgil
 import sys
 
 
@@ -266,6 +267,7 @@
     must be a number between 1 and 9."""
     def __init__(self, space, compresslevel):
         self.space = space
+        self._lock = space.allocate_lock()
         self.bzs = lltype.malloc(bz_stream.TO, flavor='raw', zero=True)
         try:
             self.running = False
@@ -276,6 +278,15 @@
             raise
         self.register_finalizer(space)
 
+    def lock(self):
+        if not self._lock.acquire(False):
+            rgil.release()
+            self._lock.acquire(True)
+            rgil.acquire()
+
+    def unlock(self):
+        self._lock.release()
+
     def _init_bz2comp(self, compresslevel):
         if compresslevel < 1 or compresslevel > 9:
             raise oefmt(self.space.w_ValueError,
@@ -306,35 +317,39 @@
         to compress, call the flush() method to finish the compression process,
         and return what is left in the internal buffers."""
 
-        datasize = len(data)
+        try:
+            self.lock()
+            datasize = len(data)
 
-        if datasize == 0:
-            return self.space.newbytes("")
+            if datasize == 0:
+                return self.space.newbytes("")
 
-        if not self.running:
-            raise oefmt(self.space.w_ValueError,
-                        "this object was already flushed")
+            if not self.running:
+                raise oefmt(self.space.w_ValueError,
+                            "this object was already flushed")
 
-        in_bufsize = datasize
+            in_bufsize = datasize
 
-        with OutBuffer(self.bzs) as out:
-            with rffi.scoped_nonmovingbuffer(data) as in_buf:
+            with OutBuffer(self.bzs) as out:
+                with rffi.scoped_nonmovingbuffer(data) as in_buf:
 
-                self.bzs.c_next_in = in_buf
-                rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+                    self.bzs.c_next_in = in_buf
+                    rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
 
-                while True:
-                    bzerror = BZ2_bzCompress(self.bzs, BZ_RUN)
-                    if bzerror != BZ_RUN_OK:
-                        _catch_bz2_error(self.space, bzerror)
+                    while True:
+                        bzerror = BZ2_bzCompress(self.bzs, BZ_RUN)
+                        if bzerror != BZ_RUN_OK:
+                            _catch_bz2_error(self.space, bzerror)
 
-                    if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
-                        break
-                    elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
-                        out.prepare_next_chunk()
+                        if rffi.getintfield(self.bzs, 'c_avail_in') == 0:
+                            break
+                        elif rffi.getintfield(self.bzs, 'c_avail_out') == 0:
+                            out.prepare_next_chunk()
 
-                res = out.make_result_string()
-                return self.space.newbytes(res)
+                    res = out.make_result_string()
+                    return self.space.newbytes(res)
+        finally:
+            self.unlock()
 
     def flush(self):
         if not self.running:
@@ -342,19 +357,23 @@
                         "this object was already flushed")
         self.running = False
 
-        with OutBuffer(self.bzs) as out:
-            while True:
-                bzerror = BZ2_bzCompress(self.bzs, BZ_FINISH)
-                if bzerror == BZ_STREAM_END:
-                    break
-                elif bzerror != BZ_FINISH_OK:
-                    _catch_bz2_error(self.space, bzerror)
+        try:
+            self.lock()
+            with OutBuffer(self.bzs) as out:
+                while True:
+                    bzerror = BZ2_bzCompress(self.bzs, BZ_FINISH)
+                    if bzerror == BZ_STREAM_END:
+                        break
+                    elif bzerror != BZ_FINISH_OK:
+                        _catch_bz2_error(self.space, bzerror)
 
-                if rffi.getintfield(self.bzs, 'c_avail_out') == 0:
-                    out.prepare_next_chunk()
+                    if rffi.getintfield(self.bzs, 'c_avail_out') == 0:
+                        out.prepare_next_chunk()
 
-            res = out.make_result_string()
-            return self.space.newbytes(res)
+                res = out.make_result_string()
+                return self.space.newbytes(res)
+        finally:
+            self.unlock()
 
 W_BZ2Compressor.typedef = TypeDef("_bz2.BZ2Compressor",
     __doc__ = W_BZ2Compressor.__doc__,
@@ -380,6 +399,7 @@
 
     def __init__(self, space):
         self.space = space
+        self._lock = space.allocate_lock()
 
         self.bzs = lltype.malloc(bz_stream.TO, flavor='raw', zero=True)
         try:
@@ -396,6 +416,15 @@
             raise
         self.register_finalizer(space)
 
+    def lock(self):
+        if not self._lock.acquire(False):
+            rgil.release()
+            self._lock.acquire(True)
+            rgil.acquire()
+
+    def unlock(self):
+        self._lock.release()
+
     def _init_bz2decomp(self):
         bzerror = BZ2_bzDecompressInit(self.bzs, 0, 0)
         if bzerror != BZ_OK:
@@ -425,11 +454,14 @@
             return space.w_True
 
     def _decompress_buf(self, data, max_length):
-        in_bufsize = len(data)
+        total_in = len(data)
+        in_bufsize = min(total_in, UINT_MAX)
+        total_in -= in_bufsize
         with rffi.scoped_nonmovingbuffer(data) as in_buf:
             # setup the input and the size it can consume
             self.bzs.c_next_in = in_buf
             rffi.setintfield(self.bzs, 'c_avail_in', in_bufsize)
+            self.left_to_process = in_bufsize
 
             with OutBuffer(self.bzs, max_length=max_length) as out:
                 while True:
@@ -450,13 +482,7 @@
                             break
                         out.prepare_next_chunk()
 
-                if not self.running:
-                    self.needs_input = False
-                    if self.left_to_process != 0:
-                        end = len(data)
-                        start = end - self.left_to_process
-                        assert start > 0
-                        self.unused_data = data[start:]
+                self.left_to_process += total_in
                 res = out.make_result_string()
                 return self.space.newbytes(res)
 
@@ -470,30 +496,38 @@
         was found after the end of stream, it'll be ignored and saved in
         unused_data attribute."""
 
-        if not self.running:
-            raise oefmt(self.space.w_EOFError,
-                        "end of stream was already found")
-        datalen = len(data)
-        if len(self.input_buffer) > 0:
-            input_buffer_in_use = True
-            data = self.input_buffer + data
+        try:
+            self.lock()
+            if not self.running:
+                raise oefmt(self.space.w_EOFError,
+                            "end of stream was already found")
             datalen = len(data)
-            result = self._decompress_buf(data, max_length)
-        else:
-            input_buffer_in_use = False
+            if len(self.input_buffer) > 0:
+                data = self.input_buffer + data
+                datalen = len(data)
+                self.input_buffer = ""
+
             result = self._decompress_buf(data, max_length)
 
-        if self.left_to_process == 0:
-            self.input_buffer = ""
-            self.needs_input = True
-        else:
-            self.needs_input = False
-            if not input_buffer_in_use:
-                start = datalen-self.left_to_process
-                assert start > 0
-                self.input_buffer = data[start:]
-
-        return result
+            if not self.running: # eq. with eof == Ture
+                self.needs_input = False
+                if self.left_to_process != 0:
+                    start = datalen - self.left_to_process
+                    assert start > 0
+                    self.unused_data = data[start:]
+                    self.left_to_process = 0
+            elif self.left_to_process == 0:
+                self.input_buffer = ""
+                self.needs_input = True
+            else:
+                self.needs_input = False
+                if self.left_to_process > 0:
+                    start = datalen-self.left_to_process
+                    assert start >= 0
+                    self.input_buffer = data[start:]
+            return result
+        finally:
+            self.unlock()
 
 
 
diff --git a/pypy/module/bz2/test/test_bz2_compdecomp.py 
b/pypy/module/bz2/test/test_bz2_compdecomp.py
--- a/pypy/module/bz2/test/test_bz2_compdecomp.py
+++ b/pypy/module/bz2/test/test_bz2_compdecomp.py
@@ -213,8 +213,8 @@
         assert len(decomp[-1]) == 100
 
         while not bz2d.eof:
-            decomp.append(bz2d.decompress(b"", max_length=50))
-            assert len(decomp[-1]) <= 50
+            decomp.append(bz2d.decompress(b"", max_length=200))
+            assert len(decomp[-1]) <= 200
 
         assert b''.join(decomp) == self.TEXT
 
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to