https://github.com/python/cpython/commit/dd94457893a1dd2c99c2405e197f54a7692cbe09
commit: dd94457893a1dd2c99c2405e197f54a7692cbe09
branch: main
author: Ruben Vorderman <[email protected]>
committer: serhiy-storchaka <[email protected]>
date: 2026-05-13T10:20:33Z
summary:

bpo-45509: Check gzip headers for corrupted fields (GH-29028)

Check the header checksum it the HCRC field is present.

files:
A Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
M Lib/gzip.py
M Lib/test/test_gzip.py

diff --git a/Lib/gzip.py b/Lib/gzip.py
index 971063aa24f871..a89ebf806c8572 100644
--- a/Lib/gzip.py
+++ b/Lib/gzip.py
@@ -484,40 +484,63 @@ def _read_exact(fp, n):
     return data
 
 
+def _read_until_null(fp, append_to):
+    '''Read until the first encountered null byte in fp.
+       Append to given byte array object'''
+    while True:
+        s = fp.read(1)
+        append_to += s
+        if not s or s == b'\000':
+            break
+
+
 def _read_gzip_header(fp):
     '''Read a gzip header from `fp` and progress to the end of the header.
 
     Returns last mtime if header was present or None otherwise.
     '''
     magic = fp.read(2)
-    if magic == b'':
+    if not magic:
         return None
 
     if magic != b'\037\213':
         raise BadGzipFile('Not a gzipped file (%r)' % magic)
-
-    (method, flag, last_mtime) = struct.unpack("<BBIxx", _read_exact(fp, 8))
+    base_header = _read_exact(fp, 8)
+    (method, flag, last_mtime) = struct.unpack("<BBIxx", base_header)
     if method != 8:
         raise BadGzipFile('Unknown compression method')
 
-    if flag & FEXTRA:
-        # Read & discard the extra field, if present
-        extra_len, = struct.unpack("<H", _read_exact(fp, 2))
-        _read_exact(fp, extra_len)
-    if flag & FNAME:
+    # Most common cases are no flags (gzip.compress, zlib.compress) or only
+    # FNAME set (GzipFile, gzip command line application). Exit early
+    # in those cases.
+    if not flag:
+        return last_mtime
+    if flag == FNAME:
         # Read and discard a null-terminated string containing the filename
         while True:
             s = fp.read(1)
             if not s or s==b'\000':
                 break
+        return last_mtime
+
+    # Processing for more complex flags. Save header parts for FHCRC checking.
+    header = bytearray(magic + base_header)
+    if flag & FEXTRA:
+        extra_len_bytes = _read_exact(fp, 2)
+        extra_len, = struct.unpack("<H", extra_len_bytes)
+        header += extra_len_bytes
+        header += _read_exact(fp, extra_len)
+    if flag & FNAME:
+        _read_until_null(fp, append_to=header)
     if flag & FCOMMENT:
-        # Read and discard a null-terminated string containing a comment
-        while True:
-            s = fp.read(1)
-            if not s or s==b'\000':
-                break
+        _read_until_null(fp, append_to=header)
     if flag & FHCRC:
-        _read_exact(fp, 2)     # Read & discard the 16-bit header CRC
+        # Header CRC is the last 16 bits of a crc32.
+        header_crc, = struct.unpack("<H", _read_exact(fp, 2))
+        true_crc = zlib.crc32(header) & 0xFFFF
+        if header_crc != true_crc:
+            raise BadGzipFile(f"Corrupted gzip header. Checksums do not "
+                               f"match: {true_crc:04x} != {header_crc:04x}")
     return last_mtime
 
 
diff --git a/Lib/test/test_gzip.py b/Lib/test/test_gzip.py
index 442d30fc970fa9..b3b7c8f87e4f9f 100644
--- a/Lib/test/test_gzip.py
+++ b/Lib/test/test_gzip.py
@@ -795,6 +795,35 @@ def test_decompress_missing_trailer(self):
         compressed_data = gzip.compress(data1)
         self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
 
+    def test_truncated_header(self):
+        truncated_headers = [
+            b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00",             # Missing OS 
byte
+            b"\x1f\x8b\x08\x02\x00\x00\x00\x00\x00\xff",         # FHRC, but 
no checksum
+            b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff",         # FEXTRA, but 
no xlen
+            b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\xaa\x00", # FEXTRA, 
xlen, but no data
+            b"\x1f\x8b\x08\x08\x00\x00\x00\x00\x00\xff",         # FNAME but 
no fname
+            b"\x1f\x8b\x08\x10\x00\x00\x00\x00\x00\xff",         # FCOMMENT, 
but no fcomment
+        ]
+        for header in truncated_headers:
+            with self.subTest(header=header):
+                with self.assertRaises(EOFError):
+                    gzip.decompress(header)
+
+    def test_corrupted_gzip_header(self):
+        header = (b"\x1f\x8b\x08\x1f\x00\x00\x00\x00\x00\xff"  # All flags set
+                  b"\x05\x00"  # Xlen = 5
+                  b"extra"
+                  b"name\x00"
+                  b"comment\x00")
+        true_crc = zlib.crc32(header) & 0xFFFF
+        corrupted_crc = true_crc ^ 0xFFFF
+        corrupted_header = header + corrupted_crc.to_bytes(2, "little")
+        with self.assertRaises(gzip.BadGzipFile) as err:
+            gzip.decompress(corrupted_header)
+        self.assertEqual(str(err.exception),
+                         f"Corrupted gzip header. Checksums do not "
+                         f"match: {true_crc:04x} != {corrupted_crc:04x}")
+
     def test_read_truncated(self):
         data = data1*50
         # Drop the CRC (4 bytes) and file size (4 bytes).
diff --git a/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst 
b/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
new file mode 100644
index 00000000000000..80c38c03f8fe78
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
@@ -0,0 +1 @@
+Gzip headers are now checked for corrupted NAME, COMMENT and HCRC fields.

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]

Reply via email to