Hi, I tried to implement support for parallel ZSTD compression. The library provides an option (ZSTD_c_nbWorkers) to specify the number of compression workers. The number of parallel workers can be set as part of compression parameter and if this option is specified then the library performs parallel compression based on the specified number of workers.
User can specify the number of parallel worker as part of --compress option by appending an integer value after at sign (@). (-Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL][@WORKERS]) Please find the attached patch v1 with the above changes. Note: ZSTD library version 1.5.x supports parallel compression by default and if the library version is lower than 1.5.x then parallel compression is enabled only the source is compiled with build macro ZSTD_MULTITHREAD. If the linked library version doesn't support parallel compression then setting the value of parameter ZSTD_c_nbWorkers to a value other than 0 will be no-op and returns an error. Thanks, Dipesh
From 688ad1e3f9b43bf911e8c3837497a874e4a6937f Mon Sep 17 00:00:00 2001 From: Dipesh Pandit <dipesh.pan...@enterprisedb.com> Date: Mon, 14 Mar 2022 18:39:02 +0530 Subject: [PATCH] support parallel zstd compression --- doc/src/sgml/ref/pg_basebackup.sgml | 11 ++- src/backend/replication/basebackup.c | 14 +++- src/backend/replication/basebackup_zstd.c | 15 ++++- src/bin/pg_basebackup/bbstreamer.h | 3 +- src/bin/pg_basebackup/bbstreamer_zstd.c | 11 ++- src/bin/pg_basebackup/pg_basebackup.c | 97 +++++++++++++++++++++------ src/bin/pg_verifybackup/t/008_untar.pl | 8 +++ src/bin/pg_verifybackup/t/010_client_untar.pl | 8 +++ src/include/replication/basebackup_sink.h | 2 +- 9 files changed, 142 insertions(+), 27 deletions(-) diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index 4a630b5..87feca0 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -399,9 +399,9 @@ PostgreSQL documentation <varlistentry> <term><option>-Z <replaceable class="parameter">level</replaceable></option></term> - <term><option>-Z [{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>]</term> + <term><option>-Z [{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>][@<replaceable>workers</replaceable>]</term> <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term> - <term><option>--compress=[{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>]</term> + <term><option>--compress=[{client|server}-]<replaceable class="parameter">method</replaceable></option>[:<replaceable>level</replaceable>][@<replaceable>workers</replaceable>]</term> <listitem> <para> Requests compression of the backup. If <literal>client</literal> or @@ -428,6 +428,13 @@ PostgreSQL documentation the level is 0. </para> <para> + Compression workers can be specified optionally by appending the + number of workers after an at sign (<literal>@</literal>). It + defines the degree of parallelism while compressing the archive. + Currently, parallel compression is supported only for + <literal>zstd</literal> compressed archives. + </para> + <para> When the tar format is used with <literal>gzip</literal>, <literal>lz4</literal>, or <literal>zstd</literal>, the suffix <filename>.gz</filename>, <filename>.lz4</filename>, or diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index 2378ce5..8217fa9 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -82,6 +82,7 @@ typedef struct backup_manifest_option manifest; basebackup_compression_type compression; int compression_level; + int compression_workers; pg_checksum_type manifest_checksum_type; } basebackup_options; @@ -718,6 +719,7 @@ parse_basebackup_options(List *options, basebackup_options *opt) char *target_str = "compat"; /* placate compiler */ bool o_compression = false; bool o_compression_level = false; + bool o_compression_workers = false; MemSet(opt, 0, sizeof(*opt)); opt->target = BACKUP_TARGET_CLIENT; @@ -925,6 +927,15 @@ parse_basebackup_options(List *options, basebackup_options *opt) opt->compression_level = defGetInt32(defel); o_compression_level = true; } + else if (strcmp(defel->defname, "compression_workers") == 0) + { + if (o_compression_workers) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("duplicate option \"%s\"", defel->defname))); + opt->compression_workers = defGetInt32(defel); + o_compression_workers = true; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1030,7 +1041,8 @@ SendBaseBackup(BaseBackupCmd *cmd) else if (opt.compression == BACKUP_COMPRESSION_LZ4) sink = bbsink_lz4_new(sink, opt.compression_level); else if (opt.compression == BACKUP_COMPRESSION_ZSTD) - sink = bbsink_zstd_new(sink, opt.compression_level); + sink = bbsink_zstd_new(sink, opt.compression_level, + opt.compression_workers); /* Set up progress reporting. */ sink = bbsink_progress_new(sink, opt.progress); diff --git a/src/backend/replication/basebackup_zstd.c b/src/backend/replication/basebackup_zstd.c index e3f9b1d..54b91eb 100644 --- a/src/backend/replication/basebackup_zstd.c +++ b/src/backend/replication/basebackup_zstd.c @@ -28,6 +28,9 @@ typedef struct bbsink_zstd /* Compression level */ int compresslevel; + /* Compression workers*/ + int compressworkers; + ZSTD_CCtx *cctx; ZSTD_outBuffer zstd_outBuf; } bbsink_zstd; @@ -59,7 +62,7 @@ const bbsink_ops bbsink_zstd_ops = { * designated compression level. */ bbsink * -bbsink_zstd_new(bbsink *next, int compresslevel) +bbsink_zstd_new(bbsink *next, int compresslevel, int compressworkers) { #ifndef HAVE_LIBZSTD ereport(ERROR, @@ -81,6 +84,7 @@ bbsink_zstd_new(bbsink *next, int compresslevel) *((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_zstd_ops; sink->base.bbs_next = next; sink->compresslevel = compresslevel; + sink->compressworkers = compressworkers; return &sink->base; #endif @@ -96,6 +100,7 @@ bbsink_zstd_begin_backup(bbsink *sink) { bbsink_zstd *mysink = (bbsink_zstd *) sink; size_t output_buffer_bound; + size_t ret; mysink->cctx = ZSTD_createCCtx(); if (!mysink->cctx) @@ -104,6 +109,14 @@ bbsink_zstd_begin_backup(bbsink *sink) ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_compressionLevel, mysink->compresslevel); + ret = ZSTD_CCtx_setParameter(mysink->cctx, ZSTD_c_nbWorkers, + mysink->compressworkers); + + if (ZSTD_isError(ret)) + elog(ERROR, + "could not compress data: %s", + ZSTD_getErrorName(ret)); + /* * We need our own buffer, because we're going to pass different data to * the next sink than what gets passed to us. diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h index 02d4c05..dbaf6d6 100644 --- a/src/bin/pg_basebackup/bbstreamer.h +++ b/src/bin/pg_basebackup/bbstreamer.h @@ -210,7 +210,8 @@ extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next, int compresslevel); extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next, - int compresslevel); + int compresslevel, + int compressworkers); extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next); extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next); diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index cc68367..f3d453e 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -63,10 +63,12 @@ const bbstreamer_ops bbstreamer_zstd_decompressor_ops = { * blocks. */ bbstreamer * -bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel) +bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel, + int compressworkers) { #ifdef HAVE_LIBZSTD bbstreamer_zstd_frame *streamer; + size_t ret; Assert(next != NULL); @@ -87,6 +89,13 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, int compresslevel) ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_compressionLevel, compresslevel); + ret = ZSTD_CCtx_setParameter(streamer->cctx, ZSTD_c_nbWorkers, + compressworkers); + + if (ZSTD_isError(ret)) + pg_log_error("could not compress data: %s", + ZSTD_getErrorName(ret)); + /* Initialize the ZSTD output buffer. */ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index d265ee3..55a321d 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -133,6 +133,7 @@ static bool showprogress = false; static bool estimatesize = true; static int verbose = 0; static int compresslevel = 0; +static int compressworkers = 0; static WalCompressionMethod compressmethod = COMPRESSION_NONE; static CompressionLocation compressloc = COMPRESS_LOCATION_UNSPECIFIED; static IncludeWal includewal = STREAM_WAL; @@ -405,8 +406,8 @@ usage(void) printf(_(" -X, --wal-method=none|fetch|stream\n" " include required WAL files with specified method\n")); printf(_(" -z, --gzip compress tar output\n")); - printf(_(" -Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL]\n" - " compress tar output with given compression method or level\n")); + printf(_(" -Z, --compress=[{client|server}-]{gzip|lz4|zstd}[:LEVEL][@WORKERS]\n" + " compress tar output with given compression method or level or workers\n")); printf(_(" -Z, --compress=none do not compress tar output\n")); printf(_("\nGeneral options:\n")); printf(_(" -c, --checkpoint=fast|spread\n" @@ -1005,29 +1006,36 @@ parse_max_rate(char *src) /* * Utility wrapper to parse the values specified for -Z/--compress. - * *methodres and *levelres will be optionally filled with values coming - * from the parsed results. + * *methodres, *levelres and *workerres will be optionally filled with values + * coming from the parsed result. */ static void parse_compress_options(char *src, WalCompressionMethod *methodres, - CompressionLocation *locationres, int *levelres) + CompressionLocation *locationres, int *levelres, + int *workerres) { char *sep; - int firstlen; - char *firstpart; + int firstlen, + secondlen; + char *firstpart, + *secondpart; /* - * clear 'levelres' so that if there are multiple compression options, - * the last one fully overrides the earlier ones + * clear 'levelres' and 'workerres' so that if there are multiple + * compression options, the last one fully overrides the earlier ones. */ *levelres = 0; + *workerres = 0; - /* check if the option is split in two */ + /* check if the option is split in two using either ':' or '@'. */ sep = strchr(src, ':'); + if (sep == NULL) + sep = strchr(src, '@'); + /* * The first part of the option value could be a method name, or just a - * level value. + * level value or compression workers. */ firstlen = (sep != NULL) ? (sep - src) : strlen(src); firstpart = pg_malloc(firstlen + 1); @@ -1107,32 +1115,76 @@ parse_compress_options(char *src, WalCompressionMethod *methodres, return; } + /* Check for the second part of the input option. */ + sep = strchr(src, ':'); + + if (sep != NULL) + { + /* Check the contents after the colon separator. */ + sep++; + if (*sep == '\0') + { + pg_log_error("no compression level defined for method %s", firstpart); + exit(1); + } + + /* Check if the option can be further split into two. */ + src = sep; + sep = strchr(src, '@'); + + /* The second part of the value is compression level. */ + secondlen = (sep != NULL) ? (sep - src) : strlen(src); + secondpart = pg_malloc(secondlen + 1); + memcpy(secondpart, src, secondlen); + secondpart[secondlen] = '\0'; + + /* + * For any of the methods currently supported, the data after the + * separator can just be an integer. + */ + if (!option_parse_int(secondpart, "-Z/--compress", 0, INT_MAX, + levelres)) + exit(1); + + free(secondpart); + } + + /* Check for the third part of the input option. */ + sep = strchr(src, '@'); + if (sep == NULL) { /* - * The caller specified a method without a colon separator, so let any - * subsequent checks assign a default level. + * The caller specified a method without a '@' separator, so let any + * subsequent checks assign a default number of workers. */ free(firstpart); return; } - /* Check the contents after the colon separator. */ + /* Check the contents after the '@' separator. */ sep++; if (*sep == '\0') { - pg_log_error("no compression level defined for method %s", firstpart); + pg_log_error("compression workers are not defined for method %s", firstpart); exit(1); } /* - * For any of the methods currently supported, the data after the - * separator can just be an integer. + * The data after '@' separator can just be an integer and it identifies + * the number of compression workers. */ if (!option_parse_int(sep, "-Z/--compress", 0, INT_MAX, - levelres)) + workerres)) exit(1); + if (*methodres != COMPRESSION_ZSTD) + { + pg_log_error("cannot use compression workers with method %s", + firstpart); + exit(1); + } + free(firstpart); } @@ -1341,7 +1393,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation, streamer = bbstreamer_plain_writer_new(archive_filename, archive_file); streamer = bbstreamer_zstd_compressor_new(streamer, - compresslevel); + compresslevel, + compressworkers); } else { @@ -2082,6 +2135,9 @@ BaseBackup(void) if (compresslevel >= 1) /* not 0 or Z_DEFAULT_COMPRESSION */ AppendIntegerCommandOption(&buf, use_new_option_syntax, "COMPRESSION_LEVEL", compresslevel); + if (compressworkers > 1) + AppendIntegerCommandOption(&buf, use_new_option_syntax, + "COMPRESSION_WORKERS", compressworkers); } if (verbose) @@ -2626,7 +2682,8 @@ main(int argc, char **argv) break; case 'Z': parse_compress_options(optarg, &compressmethod, - &compressloc, &compresslevel); + &compressloc, &compresslevel, + &compressworkers); break; case 'c': if (pg_strcasecmp(optarg, "fast") == 0) diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl index efbc910..86e2c8a 100644 --- a/src/bin/pg_verifybackup/t/008_untar.pl +++ b/src/bin/pg_verifybackup/t/008_untar.pl @@ -50,6 +50,14 @@ my @test_configuration = ( 'decompress_program' => $ENV{'ZSTD'}, 'decompress_flags' => [ '-d' ], 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'server-zstd@4'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl index c2a6161..ac5ae31 100644 --- a/src/bin/pg_verifybackup/t/010_client_untar.pl +++ b/src/bin/pg_verifybackup/t/010_client_untar.pl @@ -50,6 +50,14 @@ my @test_configuration = ( 'decompress_program' => $ENV{'ZSTD'}, 'decompress_flags' => [ '-d' ], 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'client-zstd:5@4'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define HAVE_LIBZSTD 1") } ); diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h index a7f1675..5c1dd32 100644 --- a/src/include/replication/basebackup_sink.h +++ b/src/include/replication/basebackup_sink.h @@ -285,7 +285,7 @@ extern void bbsink_forward_cleanup(bbsink *sink); extern bbsink *bbsink_copystream_new(bool send_to_client); extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel); extern bbsink *bbsink_lz4_new(bbsink *next, int compresslevel); -extern bbsink *bbsink_zstd_new(bbsink *next, int compresslevel); +extern bbsink *bbsink_zstd_new(bbsink *next, int compresslevel, int compressworkers); extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size); extern bbsink *bbsink_server_new(bbsink *next, char *pathname); extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate); -- 1.8.3.1