https://github.com/python/cpython/commit/37f3deb571c02eccd8edc1457fcfc0eeeac909ce commit: 37f3deb571c02eccd8edc1457fcfc0eeeac909ce branch: 3.15 author: Miss Islington (bot) <[email protected]> committer: serhiy-storchaka <[email protected]> date: 2026-05-13T10:54:10Z summary:
[3.15] bpo-45509: Check gzip headers for corrupted fields (GH-29028) (GH-149769) Check the header checksum it the HCRC field is present. (cherry picked from commit dd94457893a1dd2c99c2405e197f54a7692cbe09) Co-authored-by: Ruben Vorderman <[email protected]> files: A Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst M Lib/gzip.py M Lib/test/test_gzip.py diff --git a/Lib/gzip.py b/Lib/gzip.py index 971063aa24f871..a89ebf806c8572 100644 --- a/Lib/gzip.py +++ b/Lib/gzip.py @@ -484,40 +484,63 @@ def _read_exact(fp, n): return data +def _read_until_null(fp, append_to): + '''Read until the first encountered null byte in fp. + Append to given byte array object''' + while True: + s = fp.read(1) + append_to += s + if not s or s == b'\000': + break + + def _read_gzip_header(fp): '''Read a gzip header from `fp` and progress to the end of the header. Returns last mtime if header was present or None otherwise. ''' magic = fp.read(2) - if magic == b'': + if not magic: return None if magic != b'\037\213': raise BadGzipFile('Not a gzipped file (%r)' % magic) - - (method, flag, last_mtime) = struct.unpack("<BBIxx", _read_exact(fp, 8)) + base_header = _read_exact(fp, 8) + (method, flag, last_mtime) = struct.unpack("<BBIxx", base_header) if method != 8: raise BadGzipFile('Unknown compression method') - if flag & FEXTRA: - # Read & discard the extra field, if present - extra_len, = struct.unpack("<H", _read_exact(fp, 2)) - _read_exact(fp, extra_len) - if flag & FNAME: + # Most common cases are no flags (gzip.compress, zlib.compress) or only + # FNAME set (GzipFile, gzip command line application). Exit early + # in those cases. + if not flag: + return last_mtime + if flag == FNAME: # Read and discard a null-terminated string containing the filename while True: s = fp.read(1) if not s or s==b'\000': break + return last_mtime + + # Processing for more complex flags. Save header parts for FHCRC checking. + header = bytearray(magic + base_header) + if flag & FEXTRA: + extra_len_bytes = _read_exact(fp, 2) + extra_len, = struct.unpack("<H", extra_len_bytes) + header += extra_len_bytes + header += _read_exact(fp, extra_len) + if flag & FNAME: + _read_until_null(fp, append_to=header) if flag & FCOMMENT: - # Read and discard a null-terminated string containing a comment - while True: - s = fp.read(1) - if not s or s==b'\000': - break + _read_until_null(fp, append_to=header) if flag & FHCRC: - _read_exact(fp, 2) # Read & discard the 16-bit header CRC + # Header CRC is the last 16 bits of a crc32. + header_crc, = struct.unpack("<H", _read_exact(fp, 2)) + true_crc = zlib.crc32(header) & 0xFFFF + if header_crc != true_crc: + raise BadGzipFile(f"Corrupted gzip header. Checksums do not " + f"match: {true_crc:04x} != {header_crc:04x}") return last_mtime diff --git a/Lib/test/test_gzip.py b/Lib/test/test_gzip.py index 442d30fc970fa9..b3b7c8f87e4f9f 100644 --- a/Lib/test/test_gzip.py +++ b/Lib/test/test_gzip.py @@ -795,6 +795,35 @@ def test_decompress_missing_trailer(self): compressed_data = gzip.compress(data1) self.assertRaises(EOFError, gzip.decompress, compressed_data[:-8]) + def test_truncated_header(self): + truncated_headers = [ + b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00", # Missing OS byte + b"\x1f\x8b\x08\x02\x00\x00\x00\x00\x00\xff", # FHRC, but no checksum + b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff", # FEXTRA, but no xlen + b"\x1f\x8b\x08\x04\x00\x00\x00\x00\x00\xff\xaa\x00", # FEXTRA, xlen, but no data + b"\x1f\x8b\x08\x08\x00\x00\x00\x00\x00\xff", # FNAME but no fname + b"\x1f\x8b\x08\x10\x00\x00\x00\x00\x00\xff", # FCOMMENT, but no fcomment + ] + for header in truncated_headers: + with self.subTest(header=header): + with self.assertRaises(EOFError): + gzip.decompress(header) + + def test_corrupted_gzip_header(self): + header = (b"\x1f\x8b\x08\x1f\x00\x00\x00\x00\x00\xff" # All flags set + b"\x05\x00" # Xlen = 5 + b"extra" + b"name\x00" + b"comment\x00") + true_crc = zlib.crc32(header) & 0xFFFF + corrupted_crc = true_crc ^ 0xFFFF + corrupted_header = header + corrupted_crc.to_bytes(2, "little") + with self.assertRaises(gzip.BadGzipFile) as err: + gzip.decompress(corrupted_header) + self.assertEqual(str(err.exception), + f"Corrupted gzip header. Checksums do not " + f"match: {true_crc:04x} != {corrupted_crc:04x}") + def test_read_truncated(self): data = data1*50 # Drop the CRC (4 bytes) and file size (4 bytes). diff --git a/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst b/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst new file mode 100644 index 00000000000000..80c38c03f8fe78 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2021-10-18-13-46-55.bpo-45509.Upwb60.rst @@ -0,0 +1 @@ +Gzip headers are now checked for corrupted NAME, COMMENT and HCRC fields. _______________________________________________ Python-checkins mailing list -- [email protected] To unsubscribe send an email to [email protected] https://mail.python.org/mailman3//lists/python-checkins.python.org Member address: [email protected]
