Hi,

I tried to implement support for parallel ZSTD compression. The
library provides an option (ZSTD_c_nbWorkers) to specify the
number of compression workers. The number of parallel
workers can be set as part of compression parameter and if this
option is specified then the library performs parallel compression
based on the specified number of workers.

User can specify the number of parallel worker as part of
--compress option by appending an integer value after at sign (@).
(-Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL][@WORKERS])

Please find the attached patch v1 with the above changes.

Note: ZSTD library version 1.5.x supports parallel compression
by default and if the library version is lower than 1.5.x then
parallel compression is enabled only the source is compiled with build
macro ZSTD_MULTITHREAD. If the linked library version doesn't
support parallel compression then setting the value of parameter
ZSTD_c_nbWorkers to a value other than 0 will be no-op and
returns an error.

Thanks,
Dipesh
From 688ad1e3f9b43bf911e8c3837497a874e4a6937f Mon Sep 17 00:00:00 2001
From: Dipesh Pandit <dipesh.pan...@enterprisedb.com>
Date: Mon, 14 Mar 2022 18:39:02 +0530
Subject: [PATCH] support parallel zstd compression

---
 doc/src/sgml/ref/pg_basebackup.sgml           | 11 ++-
 src/backend/replication/basebackup.c          | 14 +++-
 src/backend/replication/basebackup_zstd.c     | 15 ++++-
 src/bin/pg_basebackup/bbstreamer.h            |  3 +-
 src/bin/pg_basebackup/bbstreamer_zstd.c       | 11 ++-
 src/bin/pg_basebackup/pg_basebackup.c         | 97 +++++++++++++++++++++------
 src/bin/pg_verifybackup/t/008_untar.pl        |  8 +++
 src/bin/pg_verifybackup/t/010_client_untar.pl |  8 +++
 src/include/replication/basebackup_sink.h     |  2 +-
 9 files changed, 142 insertions(+), 27 deletions(-)

diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index 4a630b5..87feca0 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -399,9 +399,9 @@ PostgreSQL documentation
 
      <varlistentry>
       <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
-      <term><option>-Z [{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>]</term>
+      <term><option>-Z [{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>][@<replaceable>workers</replaceable>]</term>
       <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
-      <term><option>--compress=[{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>]</term>
+      <term><option>--compress=[{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>][@<replaceable>workers</replaceable>]</term>
       <listitem>
        <para>
         Requests compression of the backup. If <literal>client</literal> or
@@ -428,6 +428,13 @@ PostgreSQL documentation
         the level is 0.
        </para>
        <para>
+        Compression workers can be specified optionally by appending the
+        number of workers after an at sign (<literal>@</literal>). It 
+        defines the degree of parallelism while compressing the archive.
+        Currently, parallel compression is supported only for
+        <literal>zstd</literal> compressed archives.
+       </para>
+       <para>
         When the tar format is used with <literal>gzip</literal>,
         <literal>lz4</literal>, or <literal>zstd</literal>, the suffix
         <filename>.gz</filename>, <filename>.lz4</filename>, or
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 2378ce5..8217fa9 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -82,6 +82,7 @@ typedef struct
 	backup_manifest_option manifest;
 	basebackup_compression_type	compression;
 	int			compression_level;
+	int			compression_workers;
 	pg_checksum_type manifest_checksum_type;
 } basebackup_options;
 
@@ -718,6 +719,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	char	   *target_str = "compat";	/* placate compiler */
 	bool		o_compression = false;
 	bool		o_compression_level = false;
+	bool		o_compression_workers = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	opt->target = BACKUP_TARGET_CLIENT;
@@ -925,6 +927,15 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			opt->compression_level = defGetInt32(defel);
 			o_compression_level = true;
 		}
+		else if (strcmp(defel->defname, "compression_workers") == 0)
+		{
+			if (o_compression_workers)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+			opt->compression_workers = defGetInt32(defel);
+			o_compression_workers = true;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1030,7 +1041,8 @@ SendBaseBackup(BaseBackupCmd *cmd)
 	else if (opt.compression == BACKUP_COMPRESSION_LZ4)
 		sink = bbsink_lz4_new(sink, opt.compression_level);
 	else if (opt.compression == BACKUP_COMPRESSION_ZSTD)
-		sink = bbsink_zstd_new(sink, opt.compression_level);
+		sink = bbsink_zstd_new(sink, opt.compression_level,
+							   opt.compression_workers);
 
 	/* Set up progress reporting. */
 	sink = bbsink_progress_new(sink, opt.progress);
diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c
index e3f9b1d..54b91eb 100644
--- a/src/backend/replication/basebackup_zstd.c
+++ b/src/backend/replication/basebackup_zstd.c
@@ -28,6 +28,9 @@ typedef struct bbsink_zstd
 	/* Compression level */
 	int			compresslevel;
 
+	/* Compression workers*/
+	int			compressworkers;
+
 	ZSTD_CCtx  *cctx;
 	ZSTD_outBuffer zstd_outBuf;
 } bbsink_zstd;
@@ -59,7 +62,7 @@ const bbsink_ops bbsink_zstd_ops = {
  * designated compression level.
  */
 bbsink *
-bbsink_zstd_new(bbsink *next, int compresslevel)
+bbsink_zstd_new(bbsink *next, int compresslevel, int compressworkers)
 {
 #ifndef HAVE_LIBZSTD
 	ereport(ERROR,
@@ -81,6 +84,7 @@ bbsink_zstd_new(bbsink *next, int compresslevel)
 	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops;
 	sink->base.bbs_next = next;
 	sink->compresslevel = compresslevel;
+	sink->compressworkers = compressworkers;
 
 	return &sink->base;
 #endif
@@ -96,6 +100,7 @@ bbsink_zstd_begin_backup(bbsink *sink)
 {
 	bbsink_zstd *mysink = (bbsink_zstd *) sink;
 	size_t		output_buffer_bound;
+	size_t		ret;
 
 	mysink->cctx = ZSTD_createCCtx();
 	if (!mysink->cctx)
@@ -104,6 +109,14 @@ bbsink_zstd_begin_backup(bbsink *sink)
 	ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel,
 						   mysink->compresslevel);
 
+	ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers,
+								 mysink->compressworkers);
+
+	if (ZSTD_isError(ret))
+		elog(ERROR,
+			 "could not compress data: %s",
+			 ZSTD_getErrorName(ret));
+
 	/*
 	 * We need our own buffer, because we're going to pass different data to
 	 * the next sink than what gets passed to us.
diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h
index 02d4c05..dbaf6d6 100644
--- a/src/bin/pg_basebackup/bbstreamer.h
+++ b/src/bin/pg_basebackup/bbstreamer.h
@@ -210,7 +210,8 @@ extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
 												 int compresslevel);
 extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next,
-												  int compresslevel);
+												  int compresslevel,
+												  int compressworkers);
 extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index cc68367..f3d453e 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -63,10 +63,12 @@ const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
  * blocks.
  */
 bbstreamer *
-bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel)
+bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel,
+							   int compressworkers)
 {
 #ifdef HAVE_LIBZSTD
 	bbstreamer_zstd_frame *streamer;
+	size_t				  ret;
 
 	Assert(next != NULL);
 
@@ -87,6 +89,13 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel)
 	ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel,
 						   compresslevel);
 
+	ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers,
+								 compressworkers);
+
+	if (ZSTD_isError(ret))
+		pg_log_error("could not compress data: %s",
+					 ZSTD_getErrorName(ret));
+
 	/* Initialize the ZSTD output buffer. */
 	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
 	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index d265ee3..55a321d 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -133,6 +133,7 @@ static bool showprogress = false;
 static bool estimatesize = true;
 static int	verbose = 0;
 static int	compresslevel = 0;
+static int	compressworkers = 0;
 static WalCompressionMethod compressmethod = COMPRESSION_NONE;
 static CompressionLocation compressloc = COMPRESS_LOCATION_UNSPECIFIED;
 static IncludeWal includewal = STREAM_WAL;
@@ -405,8 +406,8 @@ usage(void)
 	printf(_("  -X, --wal-method=none|fetch|stream\n"
 			 "                         include required WAL files with specified method\n"));
 	printf(_("  -z, --gzip             compress tar output\n"));
-	printf(_("  -Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL]\n"
-			 "                         compress tar output with given compression method or level\n"));
+	printf(_("  -Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL][@WORKERS]\n"
+			 "                         compress tar output with given compression method or level or workers\n"));
 	printf(_("  -Z, --compress=none    do not compress tar output\n"));
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -c, --checkpoint=fast|spread\n"
@@ -1005,29 +1006,36 @@ parse_max_rate(char *src)
 
 /*
  * Utility wrapper to parse the values specified for -Z/--compress.
- * *methodres and *levelres will be optionally filled with values coming
- * from the parsed results.
+ * *methodres, *levelres and *workerres will be optionally filled with values
+ * coming from the parsed result.
  */
 static void
 parse_compress_options(char *src, WalCompressionMethod *methodres,
-					   CompressionLocation *locationres, int *levelres)
+					   CompressionLocation *locationres, int *levelres,
+					   int *workerres)
 {
 	char	   *sep;
-	int			firstlen;
-	char	   *firstpart;
+	int			firstlen,
+				secondlen;
+	char	   *firstpart,
+			   *secondpart;
 
 	/*
-	 * clear 'levelres' so that if there are multiple compression options,
-	 * the last one fully overrides the earlier ones
+	 * clear 'levelres' and 'workerres' so that if there are multiple
+	 * compression options, the last one fully overrides the earlier ones.
 	 */
 	*levelres = 0;
+	*workerres = 0;
 
-	/* check if the option is split in two */
+	/* check if the option is split in two using either ':' or '@'. */
 	sep = strchr(src, ':');
 
+	if (sep == NULL)
+		sep = strchr(src, '@');
+
 	/*
 	 * The first part of the option value could be a method name, or just a
-	 * level value.
+	 * level value or compression workers.
 	 */
 	firstlen = (sep != NULL) ? (sep - src) : strlen(src);
 	firstpart = pg_malloc(firstlen + 1);
@@ -1107,32 +1115,76 @@ parse_compress_options(char *src, WalCompressionMethod *methodres,
 		return;
 	}
 
+	/* Check for the second part of the input option. */
+	sep = strchr(src, ':');
+
+	if (sep != NULL)
+	{
+		/* Check the contents after the colon separator. */
+		sep++;
+		if (*sep == '\0')
+		{
+			pg_log_error("no compression level defined for method %s", firstpart);
+			exit(1);
+		}
+
+		/* Check if the option can be further split into two. */
+		src = sep;
+		sep = strchr(src, '@');
+
+		/* The second part of the value is compression level. */
+		secondlen = (sep != NULL) ? (sep - src) : strlen(src);
+		secondpart = pg_malloc(secondlen + 1);
+		memcpy(secondpart, src, secondlen);
+		secondpart[secondlen] = '\0';
+
+		/*
+		 * For any of the methods currently supported, the data after the
+		 * separator can just be an integer.
+		 */
+		if (!option_parse_int(secondpart, "-Z/--compress", 0, INT_MAX,
+					levelres))
+			exit(1);
+
+		free(secondpart);
+	}
+
+	/* Check for the third part of the input option. */
+	sep = strchr(src, '@');
+
 	if (sep == NULL)
 	{
 		/*
-		 * The caller specified a method without a colon separator, so let any
-		 * subsequent checks assign a default level.
+		 * The caller specified a method without a '@' separator, so let any
+		 * subsequent checks assign a default number of workers.
 		 */
 		free(firstpart);
 		return;
 	}
 
-	/* Check the contents after the colon separator. */
+	/* Check the contents after the '@' separator. */
 	sep++;
 	if (*sep == '\0')
 	{
-		pg_log_error("no compression level defined for method %s", firstpart);
+		pg_log_error("compression workers are not defined for method %s", firstpart);
 		exit(1);
 	}
 
 	/*
-	 * For any of the methods currently supported, the data after the
-	 * separator can just be an integer.
+	 * The data after '@' separator can just be an integer and it identifies
+	 * the number of compression workers.
 	 */
 	if (!option_parse_int(sep, "-Z/--compress", 0, INT_MAX,
-						  levelres))
+						  workerres))
 		exit(1);
 
+	if (*methodres != COMPRESSION_ZSTD)
+	{
+		pg_log_error("cannot use compression workers with method %s",
+					 firstpart);
+		exit(1);
+	}
+
 	free(firstpart);
 }
 
@@ -1341,7 +1393,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 			streamer = bbstreamer_plain_writer_new(archive_filename,
 												   archive_file);
 			streamer = bbstreamer_zstd_compressor_new(streamer,
-													  compresslevel);
+													  compresslevel,
+													  compressworkers);
 		}
 		else
 		{
@@ -2082,6 +2135,9 @@ BaseBackup(void)
 		if (compresslevel >= 1) /* not 0 or Z_DEFAULT_COMPRESSION */
 			AppendIntegerCommandOption(&buf, use_new_option_syntax,
 									   "COMPRESSION_LEVEL", compresslevel);
+		if (compressworkers > 1)
+			AppendIntegerCommandOption(&buf, use_new_option_syntax,
+									   "COMPRESSION_WORKERS", compressworkers);
 	}
 
 	if (verbose)
@@ -2626,7 +2682,8 @@ main(int argc, char **argv)
 				break;
 			case 'Z':
 				parse_compress_options(optarg, &compressmethod,
-									   &compressloc, &compresslevel);
+									   &compressloc, &compresslevel,
+									   &compressworkers);
 				break;
 			case 'c':
 				if (pg_strcasecmp(optarg, "fast") == 0)
diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl
index efbc910..86e2c8a 100644
--- a/src/bin/pg_verifybackup/t/008_untar.pl
+++ b/src/bin/pg_verifybackup/t/008_untar.pl
@@ -50,6 +50,14 @@ my @test_configuration = (
 		'decompress_program' => $ENV{'ZSTD'},
 		'decompress_flags' => [ '-d' ],
 		'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
+	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags' => ['--compress', 'server-zstd@4'],
+		'backup_archive' => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
 	}
 );
 
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
index c2a6161..ac5ae31 100644
--- a/src/bin/pg_verifybackup/t/010_client_untar.pl
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -50,6 +50,14 @@ my @test_configuration = (
 		'decompress_program' => $ENV{'ZSTD'},
 		'decompress_flags' => [ '-d' ],
 		'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
+	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags' => ['--compress', 'client-zstd:5@4'],
+		'backup_archive' => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
 	}
 );
 
diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h
index a7f1675..5c1dd32 100644
--- a/src/include/replication/basebackup_sink.h
+++ b/src/include/replication/basebackup_sink.h
@@ -285,7 +285,7 @@ extern void bbsink_forward_cleanup(bbsink *sink);
 extern bbsink *bbsink_copystream_new(bool send_to_client);
 extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel);
 extern bbsink *bbsink_lz4_new(bbsink *next, int compresslevel);
-extern bbsink *bbsink_zstd_new(bbsink *next, int compresslevel);
+extern bbsink *bbsink_zstd_new(bbsink *next, int compresslevel, int compressworkers);
 extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size);
 extern bbsink *bbsink_server_new(bbsink *next, char *pathname);
 extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);
-- 
1.8.3.1

Reply via email to