After the recent rash of fixes to the astreamer code, I thought it might be a good idea to take a closer look for more issues. The attached proposes six fixes. Full disclosure: I found two of these (those in astreamer_tar_parser_free() and astreamer_extractor_content() ), and claude found the rest. I believe all those it found are indeed things that should be fixed (and backpatched). The wrong data pointer issue is one I suspect it would have been quite hard to find.


cheers


andrew


--
Andrew Dunstan
EDB: https://www.enterprisedb.com
From 4f66842c7870711285b47503e05467a35d4ba499 Mon Sep 17 00:00:00 2001
From: Andrew Dunstan <[email protected]>
Date: Mon, 23 Mar 2026 16:17:08 -0400
Subject: [PATCH v1] Fix multiple bugs in astreamer pipeline code.

astreamer_tar_parser_content() sent the wrong data pointer when
forwarding MEMBER_TRAILER padding to the next streamer.  After
astreamer_buffer_until() buffers the padding bytes, the 'data'
pointer has been advanced past them, but the code passed 'data'
instead of bbs_buffer.data.  This caused the downstream consumer
to receive bytes from after the padding rather than the padding
itself, and could read past the end of the input buffer.

astreamer_gzip_decompressor_content() only checked for
Z_STREAM_ERROR from inflate(), silently ignoring Z_DATA_ERROR
(corrupted data) and Z_MEM_ERROR (out of memory).  Fix by
treating any return other than Z_OK, Z_STREAM_END, and
Z_BUF_ERROR as fatal.  Also break out of the decompression loop
on Z_STREAM_END to avoid calling inflate() on a finished stream.

astreamer_gzip_decompressor_free() never called inflateEnd() to
release zlib's internal decompression state.  Both the lz4 and
zstd decompressor free functions properly release their contexts.

astreamer_tar_parser_free() neglected to pfree() the streamer
struct itself, leaking it.  Every other astreamer free function
frees its own struct.

astreamer_extractor_content() did not check the return value of
fclose() when closing an extracted file.  A deferred write error
(e.g., disk full on buffered I/O) would be silently lost.  The
plain writer's finalize in the same file already checks fclose.
---
 src/fe_utils/astreamer_file.c |  4 +++-
 src/fe_utils/astreamer_gzip.c | 14 ++++++++++++--
 src/fe_utils/astreamer_tar.c  |  4 +++-
 3 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/src/fe_utils/astreamer_file.c b/src/fe_utils/astreamer_file.c
index 6e63a41af0d..158e9a14f2c 100644
--- a/src/fe_utils/astreamer_file.c
+++ b/src/fe_utils/astreamer_file.c
@@ -266,7 +266,9 @@ astreamer_extractor_content(astreamer *streamer, astreamer_member *member,
 		case ASTREAMER_MEMBER_TRAILER:
 			if (mystreamer->file == NULL)
 				break;
-			fclose(mystreamer->file);
+			if (fclose(mystreamer->file) != 0)
+				pg_fatal("could not close file \"%s\": %m",
+						 mystreamer->filename);
 			mystreamer->file = NULL;
 			break;
 
diff --git a/src/fe_utils/astreamer_gzip.c b/src/fe_utils/astreamer_gzip.c
index df392f67cab..440af74bd94 100644
--- a/src/fe_utils/astreamer_gzip.c
+++ b/src/fe_utils/astreamer_gzip.c
@@ -316,8 +316,9 @@ astreamer_gzip_decompressor_content(astreamer *streamer,
 		 */
 		res = inflate(zs, Z_NO_FLUSH);
 
-		if (res == Z_STREAM_ERROR)
-			pg_fatal("could not decompress data: %s", zs->msg);
+		if (res != Z_OK && res != Z_STREAM_END && res != Z_BUF_ERROR)
+			pg_fatal("could not decompress data: %s",
+					 zs->msg ? zs->msg : "unknown error");
 
 		mystreamer->bytes_written =
 			mystreamer->base.bbs_buffer.maxlen - zs->avail_out;
@@ -330,6 +331,10 @@ astreamer_gzip_decompressor_content(astreamer *streamer,
 							  mystreamer->base.bbs_buffer.maxlen, context);
 			mystreamer->bytes_written = 0;
 		}
+
+		/* If we've hit the end of the compressed stream, stop. */
+		if (res == Z_STREAM_END)
+			break;
 	}
 }
 
@@ -362,7 +367,12 @@ astreamer_gzip_decompressor_finalize(astreamer *streamer)
 static void
 astreamer_gzip_decompressor_free(astreamer *streamer)
 {
+	astreamer_gzip_decompressor *mystreamer;
+
+	mystreamer = (astreamer_gzip_decompressor *) streamer;
+
 	astreamer_free(streamer->bbs_next);
+	inflateEnd(&mystreamer->zstream);
 	pfree(streamer->bbs_buffer.data);
 	pfree(streamer);
 }
diff --git a/src/fe_utils/astreamer_tar.c b/src/fe_utils/astreamer_tar.c
index f8be5e4ff8a..3b094fc0328 100644
--- a/src/fe_utils/astreamer_tar.c
+++ b/src/fe_utils/astreamer_tar.c
@@ -224,7 +224,8 @@ astreamer_tar_parser_content(astreamer *streamer, astreamer_member *member,
 				/* OK, now we can send it. */
 				astreamer_content(mystreamer->base.bbs_next,
 								  &mystreamer->member,
-								  data, mystreamer->pad_bytes_expected,
+								  mystreamer->base.bbs_buffer.data,
+								  mystreamer->pad_bytes_expected,
 								  ASTREAMER_MEMBER_TRAILER);
 
 				/* Expect next file header. */
@@ -346,6 +347,7 @@ astreamer_tar_parser_free(astreamer *streamer)
 {
 	pfree(streamer->bbs_buffer.data);
 	astreamer_free(streamer->bbs_next);
+	pfree(streamer);
 }
 
 /*
-- 
2.43.0

Reply via email to