From 7edb4f420982be174478666defd0dadab31362ae Mon Sep 17 00:00:00 2001
From: Jeevan Ladhe <jeevan.ladhe@enterprisedb.com>
Date: Wed, 16 Feb 2022 22:22:27 +0530
Subject: [PATCH 3/4] ZSTD: add client-side compression support.

ZSTD compression can now be performed on the client using
pg_basebackup -Ft --compress client-zstd[:LEVEL].

Example:
pg_basebackup -D /tmp/zstd_client -Ft -Xnone --compress=client-zstd
---
 src/bin/pg_basebackup/Makefile                |   1 +
 src/bin/pg_basebackup/bbstreamer.h            |   2 +
 src/bin/pg_basebackup/bbstreamer_zstd.c       | 202 ++++++++++++++++++
 src/bin/pg_basebackup/pg_basebackup.c         |  28 ++-
 src/bin/pg_verifybackup/t/010_client_untar.pl |   8 +
 src/tools/msvc/Mkvcbuild.pm                   |   1 +
 6 files changed, 240 insertions(+), 2 deletions(-)
 create mode 100644 src/bin/pg_basebackup/bbstreamer_zstd.c
 mode change 100644 => 100755 src/bin/pg_verifybackup/t/010_client_untar.pl

diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 1d0db4f9d0..0035ebcef5 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -44,6 +44,7 @@ BBOBJS = \
 	bbstreamer_gzip.o \
 	bbstreamer_inject.o \
 	bbstreamer_lz4.o \
+	bbstreamer_zstd.o \
 	bbstreamer_tar.o
 
 all: pg_basebackup pg_receivewal pg_recvlogical
diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h
index c2de77bacc..bfc624a863 100644
--- a/src/bin/pg_basebackup/bbstreamer.h
+++ b/src/bin/pg_basebackup/bbstreamer.h
@@ -209,6 +209,8 @@ extern bbstreamer *bbstreamer_gzip_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
 												 int compresslevel);
 extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
+extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next,
+												  int compresslevel);
 extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
new file mode 100644
index 0000000000..0b20267cf4
--- /dev/null
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -0,0 +1,202 @@
+/*-------------------------------------------------------------------------
+ *
+ * bbstreamer_zstd.c
+ *
+ * Portions Copyright (c) 1996-2022, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/bbstreamer_zstd.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <unistd.h>
+
+#ifdef HAVE_LIBZSTD
+#include <zstd.h>
+#endif
+
+#include "bbstreamer.h"
+#include "common/logging.h"
+
+#ifdef HAVE_LIBZSTD
+
+typedef struct bbstreamer_zstd_frame
+{
+	bbstreamer	base;
+
+	ZSTD_CCtx  *cctx;
+	ZSTD_outBuffer zstd_outBuf;
+} bbstreamer_zstd_frame;
+
+static void bbstreamer_zstd_compressor_content(bbstreamer *streamer,
+											   bbstreamer_member *member,
+											   const char *data, int len,
+											   bbstreamer_archive_context context);
+static void bbstreamer_zstd_compressor_finalize(bbstreamer *streamer);
+static void bbstreamer_zstd_compressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
+	.content = bbstreamer_zstd_compressor_content,
+	.finalize = bbstreamer_zstd_compressor_finalize,
+	.free = bbstreamer_zstd_compressor_free
+};
+#endif
+
+/*
+ * Create a new base backup streamer that performs zstd compression of tar
+ * blocks.
+ */
+bbstreamer *
+bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel)
+{
+#ifdef HAVE_LIBZSTD
+	bbstreamer_zstd_frame *streamer;
+
+	Assert(next != NULL);
+
+	streamer = palloc0(sizeof(bbstreamer_zstd_frame));
+
+	*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+		&bbstreamer_zstd_compressor_ops;
+
+	streamer->base.bbs_next = next;
+	initStringInfo(&streamer->base.bbs_buffer);
+	enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
+
+	streamer->cctx = ZSTD_createCCtx();
+	if (!streamer->cctx)
+		pg_log_error("could not create zstd compression context");
+
+	/* Initialize stream compression preferences */
+	ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
+						   compresslevel);
+
+	/* Initialize the ZSTD output buffer. */
+	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
+	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
+	streamer->zstd_outBuf.pos = 0;
+
+	return &streamer->base;
+#else
+	pg_log_error("this build does not support zstd compression");
+	exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBZSTD
+/*
+ * Compress the input data to output buffer.
+ *
+ * Find out the compression bound based on input data length for each
+ * invocation to make sure that output buffer has enough capacity to
+ * accommodate the compressed data. In case if the output buffer
+ * capacity falls short of compression bound then forward the content
+ * of output buffer to next streamer and empty the buffer.
+ */
+static void
+bbstreamer_zstd_compressor_content(bbstreamer *streamer,
+								   bbstreamer_member *member,
+								   const char *data, int len,
+								   bbstreamer_archive_context context)
+{
+	bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+	ZSTD_inBuffer inBuf = {data, len, 0};
+
+	while (inBuf.pos < inBuf.size)
+	{
+		size_t		yet_to_flush;
+		size_t		required_outBuf_bound = ZSTD_compressBound(inBuf.size - inBuf.pos);
+
+		/*
+		 * If the output buffer is not left with enough space, send the
+		 * compressed bytes to the next streamer, and empty the buffer.
+		 */
+		if ((mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos) <=
+			required_outBuf_bound)
+		{
+			bbstreamer_content(mystreamer->base.bbs_next, member,
+							   mystreamer->zstd_outBuf.dst,
+							   mystreamer->zstd_outBuf.pos,
+							   context);
+
+			/* Reset the ZSTD output buffer. */
+			mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+			mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+			mystreamer->zstd_outBuf.pos = 0;
+		}
+
+		yet_to_flush = ZSTD_compressStream2(mystreamer->cctx, &mystreamer->zstd_outBuf,
+											&inBuf, ZSTD_e_continue);
+
+		if (ZSTD_isError(yet_to_flush))
+			pg_log_error("could not compress data: %s", ZSTD_getErrorName(yet_to_flush));
+	}
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_zstd_compressor_finalize(bbstreamer *streamer)
+{
+	bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+	size_t		yet_to_flush;
+
+	do
+	{
+		ZSTD_inBuffer in = {NULL, 0, 0};
+		size_t		required_outBuf_bound = ZSTD_compressBound(0);
+
+		/*
+		 * If the output buffer is not left with enough space, send the
+		 * compressed bytes to the next streamer, and empty the buffer.
+		 */
+		if ((mystreamer->zstd_outBuf.size - mystreamer->zstd_outBuf.pos) <=
+			required_outBuf_bound)
+		{
+			bbstreamer_content(mystreamer->base.bbs_next, NULL,
+							   mystreamer->zstd_outBuf.dst,
+							   mystreamer->zstd_outBuf.pos,
+							   BBSTREAMER_UNKNOWN);
+
+			/* Reset the ZSTD output buffer. */
+			mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+			mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+			mystreamer->zstd_outBuf.pos = 0;
+		}
+
+		yet_to_flush = ZSTD_compressStream2(mystreamer->cctx,
+											&mystreamer->zstd_outBuf,
+											&in, ZSTD_e_end);
+
+		if (ZSTD_isError(yet_to_flush))
+			pg_log_error("could not compress data: %s", ZSTD_getErrorName(yet_to_flush));
+
+	} while (yet_to_flush > 0);
+
+	/* Make sure to pass any remaining bytes to the next streamer. */
+	if (mystreamer->zstd_outBuf.pos > 0)
+		bbstreamer_content(mystreamer->base.bbs_next, NULL,
+						   mystreamer->zstd_outBuf.dst,
+						   mystreamer->zstd_outBuf.pos,
+						   BBSTREAMER_UNKNOWN);
+
+	bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_zstd_compressor_free(bbstreamer *streamer)
+{
+	bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+
+	bbstreamer_free(streamer->bbs_next);
+	ZSTD_freeCCtx(mystreamer->cctx);
+	pfree(streamer->bbs_buffer.data);
+	pfree(streamer);
+}
+#endif
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 7202a5eae7..7ba752c1c9 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1023,6 +1023,16 @@ parse_compress_options(char *src, WalCompressionMethod *methodres,
 		*methodres = COMPRESSION_LZ4;
 		*locationres = COMPRESS_LOCATION_SERVER;
 	}
+	else if (pg_strcasecmp(firstpart, "zstd") == 0)
+	{
+		*methodres = COMPRESSION_ZSTD;
+		*locationres = COMPRESS_LOCATION_UNSPECIFIED;
+	}
+	else if (pg_strcasecmp(firstpart, "client-zstd") == 0)
+	{
+		*methodres = COMPRESSION_ZSTD;
+		*locationres = COMPRESS_LOCATION_CLIENT;
+	}
 	else if (pg_strcasecmp(firstpart, "server-zstd") == 0)
 	{
 		*methodres = COMPRESSION_ZSTD;
@@ -1146,7 +1156,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	bool		inject_manifest;
 	bool		is_tar,
 				is_tar_gz,
-				is_tar_lz4;
+				is_tar_lz4,
+				is_tar_zstd;
 	bool		must_parse_archive;
 	int			archive_name_len = strlen(archive_name);
 
@@ -1169,6 +1180,10 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	is_tar_lz4 = (archive_name_len > 8 &&
 				  strcmp(archive_name + archive_name_len - 4, ".lz4") == 0);
 
+	/* Is this a ZSTD archive? */
+	is_tar_zstd = (archive_name_len > 8 &&
+				   strcmp(archive_name + archive_name_len - 4, ".zst") == 0);
+
 	/*
 	 * We have to parse the archive if (1) we're suppose to extract it, or if
 	 * (2) we need to inject backup_manifest or recovery configuration into it.
@@ -1178,7 +1193,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 		(spclocation == NULL && writerecoveryconf));
 
 	/* At present, we only know how to parse tar archives. */
-	if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4)
+	if (must_parse_archive && !is_tar && !is_tar_gz && !is_tar_lz4
+		&& !is_tar_zstd)
 	{
 		pg_log_error("unable to parse archive: %s", archive_name);
 		pg_log_info("only tar archives can be parsed");
@@ -1250,6 +1266,14 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 			streamer = bbstreamer_lz4_compressor_new(streamer,
 													 compresslevel);
 		}
+		else if (compressmethod == COMPRESSION_ZSTD)
+		{
+			strlcat(archive_filename, ".zst", sizeof(archive_filename));
+			streamer = bbstreamer_plain_writer_new(archive_filename,
+												   archive_file);
+			streamer = bbstreamer_zstd_compressor_new(streamer,
+													  compresslevel);
+		}
 		else
 		{
 			Assert(false);		/* not reachable */
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
old mode 100644
new mode 100755
index 3616529390..c2a6161be6
--- a/src/bin/pg_verifybackup/t/010_client_untar.pl
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -42,6 +42,14 @@ my @test_configuration = (
 		'decompress_flags' => [ '-d' ],
 		'output_file' => 'base.tar',
 		'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
+	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags' => ['--compress', 'client-zstd:5'],
+		'backup_archive' => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
 	}
 );
 
diff --git a/src/tools/msvc/Mkvcbuild.pm b/src/tools/msvc/Mkvcbuild.pm
index bab81bd459..901e755d01 100644
--- a/src/tools/msvc/Mkvcbuild.pm
+++ b/src/tools/msvc/Mkvcbuild.pm
@@ -380,6 +380,7 @@ sub mkvcbuild
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_gzip.c');
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_inject.c');
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_lz4.c');
+	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_zstd.c');
 	$pgbasebackup->AddFile('src/bin/pg_basebackup/bbstreamer_tar.c');
 	$pgbasebackup->AddLibrary('ws2_32.lib');
 
-- 
2.25.1

