https://github.com/python/cpython/commit/97804d80b09184fc30faaf171b2ac70d0bcec5d7
commit: 97804d80b09184fc30faaf171b2ac70d0bcec5d7
branch: 3.13
author: Miss Islington (bot) <[email protected]>
committer: emmatyping <[email protected]>
date: 2026-03-10T21:08:06-07:00
summary:

[3.13] gh-145607: Ensure BIG_DATA has two compressed blocks in test_bz2 
(GH-145730) (#145734)

gh-145607: Ensure BIG_DATA has two compressed blocks in test_bz2 (GH-145730)
(cherry picked from commit 19676e5fc28bdee8325a062a31ddeee60960cf75)

Co-authored-by: Emma Smith <[email protected]>

files:
M Lib/test/test_bz2.py

diff --git a/Lib/test/test_bz2.py b/Lib/test/test_bz2.py
index 7d786be1d25b1c..b5cd202a613725 100644
--- a/Lib/test/test_bz2.py
+++ b/Lib/test/test_bz2.py
@@ -66,18 +66,28 @@ class BaseTest(unittest.TestCase):
     EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
     BAD_DATA = b'this is not a valid bzip2 file'
 
-    # Some tests need more than one block of uncompressed data. Since one block
-    # is at least 100,000 bytes, we gather some data dynamically and compress 
it.
-    # Note that this assumes that compression works correctly, so we cannot
-    # simply use the bigger test data for all tests.
+    # Some tests need more than one block of data. The bz2 module does not
+    # support flushing a block during compression, so we must read in data 
until
+    # there are at least 2 blocks. Since different orderings of Python files 
may
+    # be compressed differently, we need to check the compression output for
+    # more than one bzip2 block header magic, a hex encoding of Pi
+    # (0x314159265359)
+    bz2_block_magic = bytes.fromhex('314159265359')
     test_size = 0
-    BIG_TEXT = bytearray(128*1024)
+    BIG_TEXT = b''
+    BIG_DATA = b''
+    compressor = BZ2Compressor(1)
     for fname in 
glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')):
         with open(fname, 'rb') as fh:
-            test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
-        if test_size > 128*1024:
+            data = fh.read()
+            BIG_DATA += compressor.compress(data)
+            BIG_TEXT += data
+        # TODO(emmatyping): if it is impossible for a block header to cross
+        # multiple outputs, we can just search the output of each compress call
+        # which should be more efficient
+        if BIG_DATA.count(bz2_block_magic) > 1:
+            BIG_DATA += compressor.flush()
             break
-    BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)
 
     def setUp(self):
         fd, self.filename = tempfile.mkstemp()

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]

Reply via email to