https://github.com/python/cpython/commit/dd94457893a1dd2c99c2405e197f54a7692cbe09
commit: dd94457893a1dd2c99c2405e197f54a7692cbe09
branch: main
author: Ruben Vorderman <[email protected]>
committer: serhiy-storchaka <[email protected]>
date: 2026-05-13T10:20:33Z
summary:
bpo-45509: Check gzip headers for corrupted fields (GH-29028)
Check the header checksum it the HCRC field is present.
files:
A Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
M Lib/gzip.py
M Lib/test/test_gzip.py
diff --git a/Lib/gzip.py b/Lib/gzip.py
index 971063aa24f871..a89ebf806c8572 100644
--- a/Lib/gzip.py
+++ b/Lib/gzip.py
@@ -484,40 +484,63 @@ def _read_exact(fp, n):
return data
+def _read_until_null(fp, append_to):
+ '''Read until the first encountered null byte in fp.
+ Append to given byte array object'''
+ while True:
+ s = fp.read(1)
+ append_to += s
+ if not s or s == b'\000':
+ break
+
+
def _read_gzip_header(fp):
'''Read a gzip header from `fp` and progress to the end of the header.
Returns last mtime if header was present or None otherwise.
'''
magic = fp.read(2)
- if magic == b'':
+ if not magic:
return None
if magic != b'\037\213':
raise BadGzipFile('Not a gzipped file (%r)' % magic)
-
- (method, flag, last_mtime) = struct.unpack("<BBIxx", _read_exact(fp, 8))
+ base_header = _read_exact(fp, 8)
+ (method, flag, last_mtime) = struct.unpack("<BBIxx", base_header)
if method != 8:
raise BadGzipFile('Unknown compression method')
- if flag & FEXTRA:
- # Read & discard the extra field, if present
- extra_len, = struct.unpack("<H", _read_exact(fp, 2))
- _read_exact(fp, extra_len)
- if flag & FNAME:
+ # Most common cases are no flags (gzip.compress, zlib.compress) or only
+ # FNAME set (GzipFile, gzip command line application). Exit early
+ # in those cases.
+ if not flag:
+ return last_mtime
+ if flag == FNAME:
# Read and discard a null-terminated string containing the filename
while True:
s = fp.read(1)
if not s or s==b'\000':
break
+ return last_mtime
+
+ # Processing for more complex flags. Save header parts for FHCRC checking.
+ header = bytearray(magic + base_header)
+ if flag & FEXTRA:
+ extra_len_bytes = _read_exact(fp, 2)
+ extra_len, = struct.unpack("<H", extra_len_bytes)
+ header += extra_len_bytes
+ header += _read_exact(fp, extra_len)
+ if flag & FNAME:
+ _read_until_null(fp, append_to=header)
if flag & FCOMMENT:
- # Read and discard a null-terminated string containing a comment
- while True:
- s = fp.read(1)
- if not s or s==b'\000':
- break
+ _read_until_null(fp, append_to=header)
if flag & FHCRC:
- _read_exact(fp, 2) # Read & discard the 16-bit header CRC
+ # Header CRC is the last 16 bits of a crc32.
+ header_crc, = struct.unpack("<H", _read_exact(fp, 2))
+ true_crc = zlib.crc32(header) & 0xFFFF
+ if header_crc != true_crc:
+ raise BadGzipFile(f"Corrupted gzip header. Checksums do not "
+ f"match: {true_crc:04x} != {header_crc:04x}")
return last_mtime
diff --git a/Lib/test/test_gzip.py b/Lib/test/test_gzip.py
index 442d30fc970fa9..b3b7c8f87e4f9f 100644
--- a/Lib/test/test_gzip.py
+++ b/Lib/test/test_gzip.py
@@ -795,6 +795,35 @@ def test_decompress_missing_trailer(self):
compressed_data = gzip.compress(data1)
self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8])
+ def test_truncated_header(self):
+ truncated_headers = [
+ b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00", # Missing OS
byte
+ b"\x1f\x8b\x08\x02\x00\x00\x00\x00\x00\xff", # FHRC, but
no checksum
+ b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff", # FEXTRA, but
no xlen
+ b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\xaa\x00", # FEXTRA,
xlen, but no data
+ b"\x1f\x8b\x08\x08\x00\x00\x00\x00\x00\xff", # FNAME but
no fname
+ b"\x1f\x8b\x08\x10\x00\x00\x00\x00\x00\xff", # FCOMMENT,
but no fcomment
+ ]
+ for header in truncated_headers:
+ with self.subTest(header=header):
+ with self.assertRaises(EOFError):
+ gzip.decompress(header)
+
+ def test_corrupted_gzip_header(self):
+ header = (b"\x1f\x8b\x08\x1f\x00\x00\x00\x00\x00\xff" # All flags set
+ b"\x05\x00" # Xlen = 5
+ b"extra"
+ b"name\x00"
+ b"comment\x00")
+ true_crc = zlib.crc32(header) & 0xFFFF
+ corrupted_crc = true_crc ^ 0xFFFF
+ corrupted_header = header + corrupted_crc.to_bytes(2, "little")
+ with self.assertRaises(gzip.BadGzipFile) as err:
+ gzip.decompress(corrupted_header)
+ self.assertEqual(str(err.exception),
+ f"Corrupted gzip header. Checksums do not "
+ f"match: {true_crc:04x} != {corrupted_crc:04x}")
+
def test_read_truncated(self):
data = data1*50
# Drop the CRC (4 bytes) and file size (4 bytes).
diff --git a/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
b/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
new file mode 100644
index 00000000000000..80c38c03f8fe78
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst
@@ -0,0 +1 @@
+Gzip headers are now checked for corrupted NAME, COMMENT and HCRC fields.
_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3//lists/python-checkins.python.org
Member address: [email protected]