On Mon, Apr 03, 2023 at 11:26:09PM +0200, Tomas Vondra wrote: > On 4/3/23 21:17, Justin Pryzby wrote: > > On Sat, Apr 01, 2023 at 10:26:01PM +0200, Tomas Vondra wrote: > >>> Feel free to mess around with threads (but I'd much rather see the patch > >>> progress for zstd:long). > >> > >> OK, understood. The long mode patch is pretty simple. IIUC it does not > >> change the format, i.e. in the worst case we could leave it for PG17 > >> too. Correct? > > > > Right, libzstd only has one "format", which is the same as what's used > > by the commandline tool. zstd:long doesn't change the format of the > > output: the library just uses a larger memory buffer to allow better > > compression. There's no format change for zstd:workers, either. > > OK. I plan to do a bit more review/testing on this, and get it committed > over the next day or two, likely including the long mode. One thing I > noticed today is that maybe long_distance should be a bool, not int. > Yes, ZSTD_c_enableLongDistanceMatching() accepts int, but it'd be > cleaner to cast the value during a call and keep it bool otherwise.
Thanks for noticing. Evidently I wrote it using "int" to get the feature working, and then later wrote the bool parsing bits but never changed the data structure. This also updates a few comments, indentation, removes a useless assertion, and updates the warning about zstd:workers. -- Justin
>From df0eb4d3c4799f24e58f1e5b0a9470e5af355ad6 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sat, 7 Jan 2023 15:45:06 -0600 Subject: [PATCH 1/4] pg_dump: zstd compression Previously proposed at: 20201221194924.gi30...@telsasoft.com --- doc/src/sgml/ref/pg_dump.sgml | 13 +- src/bin/pg_dump/Makefile | 2 + src/bin/pg_dump/compress_io.c | 66 ++-- src/bin/pg_dump/compress_zstd.c | 537 ++++++++++++++++++++++++++ src/bin/pg_dump/compress_zstd.h | 25 ++ src/bin/pg_dump/meson.build | 4 +- src/bin/pg_dump/pg_backup_archiver.c | 9 +- src/bin/pg_dump/pg_backup_directory.c | 2 + src/bin/pg_dump/pg_dump.c | 20 +- src/bin/pg_dump/t/002_pg_dump.pl | 79 +++- src/tools/pginclude/cpluspluscheck | 1 + src/tools/pgindent/typedefs.list | 1 + 12 files changed, 705 insertions(+), 54 deletions(-) create mode 100644 src/bin/pg_dump/compress_zstd.c create mode 100644 src/bin/pg_dump/compress_zstd.h diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index 77299878e02..8de38e0fd0d 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -330,8 +330,9 @@ PostgreSQL documentation machine-readable format that <application>pg_restore</application> can read. A directory format archive can be manipulated with standard Unix tools; for example, files in an uncompressed archive - can be compressed with the <application>gzip</application> or - <application>lz4</application> tools. + can be compressed with the <application>gzip</application>, + <application>lz4</application>, or + <application>zstd</application> tools. This format is compressed by default using <literal>gzip</literal> and also supports parallel dumps. </para> @@ -655,7 +656,8 @@ PostgreSQL documentation <para> Specify the compression method and/or the compression level to use. The compression method can be set to <literal>gzip</literal>, - <literal>lz4</literal>, or <literal>none</literal> for no compression. + <literal>lz4</literal>, <literal>zstd</literal>, + or <literal>none</literal> for no compression. A compression detail string can optionally be specified. If the detail string is an integer, it specifies the compression level. Otherwise, it should be a comma-separated list of items, each of the @@ -676,8 +678,9 @@ PostgreSQL documentation individual table-data segments, and the default is to compress using <literal>gzip</literal> at a moderate level. For plain text output, setting a nonzero compression level causes the entire output file to be compressed, - as though it had been fed through <application>gzip</application> or - <application>lz4</application>; but the default is not to compress. + as though it had been fed through <application>gzip</application>, + <application>lz4</application>, or <application>zstd</application>; + but the default is not to compress. </para> <para> The tar archive format currently does not support compression at all. diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile index eb8f59459a1..24de7593a6a 100644 --- a/src/bin/pg_dump/Makefile +++ b/src/bin/pg_dump/Makefile @@ -18,6 +18,7 @@ include $(top_builddir)/src/Makefile.global export GZIP_PROGRAM=$(GZIP) export LZ4 +export ZSTD export with_icu override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS) @@ -29,6 +30,7 @@ OBJS = \ compress_io.o \ compress_lz4.o \ compress_none.o \ + compress_zstd.o \ dumputils.o \ parallel.o \ pg_backup_archiver.o \ diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c index 0972a4f934a..4f06bb024f9 100644 --- a/src/bin/pg_dump/compress_io.c +++ b/src/bin/pg_dump/compress_io.c @@ -52,8 +52,8 @@ * * InitDiscoverCompressFileHandle tries to infer the compression by the * filename suffix. If the suffix is not yet known then it tries to simply - * open the file and if it fails, it tries to open the same file with the .gz - * suffix, and then again with the .lz4 suffix. + * open the file and if it fails, it tries to open the same file with + * compressed suffixes. * * IDENTIFICATION * src/bin/pg_dump/compress_io.c @@ -69,6 +69,7 @@ #include "compress_io.h" #include "compress_lz4.h" #include "compress_none.h" +#include "compress_zstd.h" #include "pg_backup_utils.h" /*---------------------- @@ -77,7 +78,8 @@ */ /* - * Checks whether a compression algorithm is supported. + * Checks whether support for a compression algorithm is implemented in + * pg_dump/restore. * * On success returns NULL, otherwise returns a malloc'ed string which can be * used by the caller in an error message. @@ -98,6 +100,10 @@ supports_compression(const pg_compress_specification compression_spec) if (algorithm == PG_COMPRESSION_LZ4) supported = true; #endif +#ifdef USE_ZSTD + if (algorithm == PG_COMPRESSION_ZSTD) + supported = true; +#endif if (!supported) return psprintf("this build does not support compression with %s", @@ -130,6 +136,8 @@ AllocateCompressor(const pg_compress_specification compression_spec, InitCompressorGzip(cs, compression_spec); else if (compression_spec.algorithm == PG_COMPRESSION_LZ4) InitCompressorLZ4(cs, compression_spec); + else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD) + InitCompressorZstd(cs, compression_spec); return cs; } @@ -196,20 +204,30 @@ InitCompressFileHandle(const pg_compress_specification compression_spec) InitCompressFileHandleGzip(CFH, compression_spec); else if (compression_spec.algorithm == PG_COMPRESSION_LZ4) InitCompressFileHandleLZ4(CFH, compression_spec); + else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD) + InitCompressFileHandleZstd(CFH, compression_spec); return CFH; } +static bool +check_compressed_file(const char *path, char **fname, char *ext) +{ + free_keep_errno(*fname); + *fname = psprintf("%s.%s", path, ext); + return (access(*fname, F_OK) == 0); +} + /* * Open a file for reading. 'path' is the file to open, and 'mode' should * be either "r" or "rb". * * If the file at 'path' contains the suffix of a supported compression method, - * currently this includes ".gz" and ".lz4", then this compression will be used + * currently this includes ".gz", ".lz4" and ".zst", then this compression will be used * throughout. Otherwise the compression will be inferred by iteratively trying * to open the file at 'path', first as is, then by appending known compression * suffixes. So if you pass "foo" as 'path', this will open either "foo" or - * "foo.gz" or "foo.lz4", trying in that order. + * "foo.{gz,lz4,zst}", trying in that order. * * On failure, return NULL with an error code in errno. */ @@ -229,36 +247,20 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode) if (hasSuffix(fname, ".gz")) compression_spec.algorithm = PG_COMPRESSION_GZIP; + else if (hasSuffix(fname, ".lz4")) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + else if (hasSuffix(fname, ".zst")) + compression_spec.algorithm = PG_COMPRESSION_ZSTD; else { - bool exists; - - exists = (stat(path, &st) == 0); - /* avoid unused warning if it is not built with compression */ - if (exists) + if (stat(path, &st) == 0) compression_spec.algorithm = PG_COMPRESSION_NONE; -#ifdef HAVE_LIBZ - if (!exists) - { - free_keep_errno(fname); - fname = psprintf("%s.gz", path); - exists = (stat(fname, &st) == 0); - - if (exists) - compression_spec.algorithm = PG_COMPRESSION_GZIP; - } -#endif -#ifdef USE_LZ4 - if (!exists) - { - free_keep_errno(fname); - fname = psprintf("%s.lz4", path); - exists = (stat(fname, &st) == 0); - - if (exists) - compression_spec.algorithm = PG_COMPRESSION_LZ4; - } -#endif + else if (check_compressed_file(path, &fname, "gz")) + compression_spec.algorithm = PG_COMPRESSION_GZIP; + else if (check_compressed_file(path, &fname, "lz4")) + compression_spec.algorithm = PG_COMPRESSION_LZ4; + else if (check_compressed_file(path, &fname, "zst")) + compression_spec.algorithm = PG_COMPRESSION_ZSTD; } CFH = InitCompressFileHandle(compression_spec); diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c new file mode 100644 index 00000000000..cf85c3a4c93 --- /dev/null +++ b/src/bin/pg_dump/compress_zstd.c @@ -0,0 +1,537 @@ +/*------------------------------------------------------------------------- + * + * compress_zstd.c + * Routines for archivers to write a Zstd compressed data stream. + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_zstd.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres_fe.h" + +#include "pg_backup_utils.h" +#include "compress_zstd.h" + +#ifndef USE_ZSTD + +void +InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec) +{ + pg_fatal("this build does not support compression with %s", "ZSTD"); +} + +void +InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec) +{ + pg_fatal("this build does not support compression with %s", "ZSTD"); +} + +#else + +#include <zstd.h> + +typedef struct ZstdCompressorState +{ + /* This is a normal file to which we read/write compressed data */ + FILE *fp; + + ZSTD_CStream *cstream; + ZSTD_DStream *dstream; + ZSTD_outBuffer output; + ZSTD_inBuffer input; + + /* pointer to a static string like from strerror(), for Zstd_write() */ + const char *zstderror; +} ZstdCompressorState; + +static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress); +static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs); +static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen); +static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs); + +static void +_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream, + ZSTD_cParameter param, int value, char *paramname) +{ + size_t res; + + res = ZSTD_CCtx_setParameter(cstream, param, value); + if (ZSTD_isError(res)) + pg_fatal("could not set compression parameter: \"%s\": %s", + paramname, ZSTD_getErrorName(res)); +} + +/* Return a compression stream with parameters set per argument */ +static ZSTD_CStream * +_ZstdCStreamParams(pg_compress_specification compress) +{ + ZSTD_CStream *cstream; + + cstream = ZSTD_createCStream(); + if (cstream == NULL) + pg_fatal("could not initialize compression library"); + + _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel, + compress.level, "level"); + + return cstream; +} + +/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */ +static void +_ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + + /* Loop while there's any input or until flushed */ + while (input->pos != input->size || flush) + { + size_t res; + + output->pos = 0; + res = ZSTD_compressStream2(zstdcs->cstream, output, + input, flush ? ZSTD_e_end : ZSTD_e_continue); + + if (ZSTD_isError(res)) + pg_fatal("could not compress data: %s", ZSTD_getErrorName(res)); + + /* + * Extra paranoia: avoid zero-length chunks, since a zero length chunk + * is the EOF marker in the custom format. This should never happen + * but... + */ + if (output->pos > 0) + cs->writeF(AH, output->dst, output->pos); + + if (res == 0) + break; /* End of frame or all input consumed */ + } +} + +static void +EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + + if (cs->readF != NULL) + { + Assert(zstdcs->cstream == NULL); + ZSTD_freeDStream(zstdcs->dstream); + pg_free(unconstify(void *, zstdcs->input.src)); + } + else if (cs->writeF != NULL) + { + Assert(zstdcs->dstream == NULL); + _ZstdWriteCommon(AH, cs, true); + ZSTD_freeCStream(zstdcs->cstream); + pg_free(zstdcs->output.dst); + } + + pg_free(zstdcs); +} + +static void +WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs, + const void *data, size_t dLen) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + + zstdcs->input.src = data; + zstdcs->input.size = dLen; + zstdcs->input.pos = 0; + + _ZstdWriteCommon(AH, cs, false); +} + +static void +ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data; + ZSTD_outBuffer *output = &zstdcs->output; + ZSTD_inBuffer *input = &zstdcs->input; + size_t input_allocated_size = ZSTD_DStreamInSize(); + size_t res; + + for (;;) + { + size_t cnt; + + /* + * Read compressed data. Note that readF can resize the buffer; the + * new size is tracked and used for future loops. + */ + input->size = input_allocated_size; + cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size); + + /* ensure that readF didn't *shrink* the buffer */ + Assert(input->size >= input_allocated_size); + input_allocated_size = input->size; + input->size = cnt; + input->pos = 0; + + if (cnt == 0) + break; + + /* Now decompress */ + while (input->pos < input->size) + { + output->pos = 0; + res = ZSTD_decompressStream(zstdcs->dstream, output, input); + if (ZSTD_isError(res)) + pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + + /* + * then write the decompressed data to the output handle + */ + ((char *) output->dst)[output->pos] = '\0'; + ahwrite(output->dst, 1, output->pos, AH); + + if (res == 0) + break; /* End of frame */ + } + } +} + +/* Public routine that supports Zstd compressed data I/O */ +void +InitCompressorZstd(CompressorState *cs, + const pg_compress_specification compression_spec) +{ + ZstdCompressorState *zstdcs; + + cs->readData = ReadDataFromArchiveZstd; + cs->writeData = WriteDataToArchiveZstd; + cs->end = EndCompressorZstd; + + cs->compression_spec = compression_spec; + + zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs)); + cs->private_data = zstdcs; + + /* We expect that exactly one of readF/writeF is specified */ + Assert((cs->readF == NULL) != (cs->writeF == NULL)); + + if (cs->readF != NULL) + { + zstdcs->dstream = ZSTD_createDStream(); + if (zstdcs->dstream == NULL) + pg_fatal("could not initialize compression library"); + + zstdcs->input.size = ZSTD_DStreamInSize(); + zstdcs->input.src = pg_malloc(zstdcs->input.size); + + /* + * output.size is the buffer size we tell zstd it can output to. + * Allocate an additional byte such that ReadDataFromArchiveZstd() can + * call ahwrite() with a null-terminated string, which is an optimized + * case in ExecuteSqlCommandBuf(). + */ + zstdcs->output.size = ZSTD_DStreamOutSize(); + zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1); + } + else if (cs->writeF != NULL) + { + zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec); + + zstdcs->output.size = ZSTD_CStreamOutSize(); + zstdcs->output.dst = pg_malloc(zstdcs->output.size); + zstdcs->output.pos = 0; + } +} + +/* + * Compressed stream API + */ + +static bool +Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + size_t input_allocated_size = ZSTD_DStreamInSize(); + size_t res, + cnt; + + output->size = size; + output->dst = ptr; + output->pos = 0; + + for (;;) + { + Assert(input->pos <= input->size); + Assert(input->size <= input_allocated_size); + + /* + * If the input is completely consumed, start back at the beginning + */ + if (input->pos == input->size) + { + /* input->size is size produced by "fread" */ + input->size = 0; + /* input->pos is position consumed by decompress */ + input->pos = 0; + } + + /* read compressed data if we must produce more input */ + if (input->pos == input->size) + { + cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp); + input->size = cnt; + + Assert(cnt <= input_allocated_size); + + /* If we have no more input to consume, we're done */ + if (cnt == 0) + break; + } + + while (input->pos < input->size) + { + /* now decompress */ + res = ZSTD_decompressStream(zstdcs->dstream, output, input); + + if (ZSTD_isError(res)) + pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res)); + + if (output->pos == output->size) + break; /* No more room for output */ + + if (res == 0) + break; /* End of frame */ + } + + if (output->pos == output->size) + break; /* We read all the data that fits */ + } + + if (rdsize != NULL) + *rdsize = output->pos; + + return true; +} + +static bool +Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + size_t res, + cnt; + + input->src = ptr; + input->size = size; + input->pos = 0; + + /* Consume all input, to be flushed later */ + while (input->pos != input->size) + { + output->pos = 0; + res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue); + if (ZSTD_isError(res)) + { + zstdcs->zstderror = ZSTD_getErrorName(res); + return false; + } + + cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); + if (cnt != output->pos) + { + zstdcs->zstderror = strerror(errno); + return false; + } + } + + return size; +} + +static int +Zstd_getc(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + int ret; + + if (CFH->read_func(&ret, 1, NULL, CFH) != 1) + { + if (feof(zstdcs->fp)) + pg_fatal("could not read from input file: end of file"); + else + pg_fatal("could not read from input file: %m"); + } + return ret; +} + +static char * +Zstd_gets(char *buf, int len, CompressFileHandle *CFH) +{ + int i; + + Assert(len > 0); + + /* + * Read one byte at a time until newline or EOF. This is only used to read + * the list of LOs, and the I/O is buffered anyway. + */ + for (i = 0; i < len - 1; ++i) + { + size_t readsz; + + if (!CFH->read_func(&buf[i], 1, &readsz, CFH)) + break; + if (readsz != 1) + break; + if (buf[i] == '\n') + { + ++i; + break; + } + } + buf[i] = '\0'; + return i > 0 ? buf : NULL; +} + +static bool +Zstd_close(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + + if (zstdcs->cstream) + { + size_t res, + cnt; + ZSTD_inBuffer *input = &zstdcs->input; + ZSTD_outBuffer *output = &zstdcs->output; + + /* Loop until the compression buffers are fully consumed */ + for (;;) + { + output->pos = 0; + res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end); + if (ZSTD_isError(res)) + { + zstdcs->zstderror = ZSTD_getErrorName(res); + return false; + } + + cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp); + if (cnt != output->pos) + { + zstdcs->zstderror = strerror(errno); + return false; + } + + if (res == 0) + break; /* End of frame */ + } + + ZSTD_freeCStream(zstdcs->cstream); + pg_free(zstdcs->output.dst); + } + + if (zstdcs->dstream) + { + ZSTD_freeDStream(zstdcs->dstream); + pg_free(unconstify(void *, zstdcs->input.src)); + } + + if (fclose(zstdcs->fp) != 0) + return false; + + pg_free(zstdcs); + return true; +} + +static bool +Zstd_eof(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + + return feof(zstdcs->fp); +} + +static bool +Zstd_open(const char *path, int fd, const char *mode, + CompressFileHandle *CFH) +{ + FILE *fp; + ZstdCompressorState *zstdcs; + + if (fd >= 0) + fp = fdopen(fd, mode); + else + fp = fopen(path, mode); + + if (fp == NULL) + return false; + + zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs)); + CFH->private_data = zstdcs; + zstdcs->fp = fp; + + if (mode[0] == 'r') + { + zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize()); + zstdcs->dstream = ZSTD_createDStream(); + if (zstdcs->dstream == NULL) + pg_fatal("could not initialize compression library"); + } + else if (mode[0] == 'w' || mode[0] == 'a') + { + zstdcs->output.size = ZSTD_CStreamOutSize(); + zstdcs->output.dst = pg_malloc0(zstdcs->output.size); + zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec); + if (zstdcs->cstream == NULL) + pg_fatal("could not initialize compression library"); + } + else + pg_fatal("unhandled mode"); + + return true; +} + +static bool +Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH) +{ + char fname[MAXPGPATH]; + + sprintf(fname, "%s.zst", path); + return CFH->open_func(fname, -1, mode, CFH); +} + +static const char * +Zstd_get_error(CompressFileHandle *CFH) +{ + ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data; + + return zstdcs->zstderror; +} + +void +InitCompressFileHandleZstd(CompressFileHandle *CFH, + const pg_compress_specification compression_spec) +{ + CFH->open_func = Zstd_open; + CFH->open_write_func = Zstd_open_write; + CFH->read_func = Zstd_read; + CFH->write_func = Zstd_write; + CFH->gets_func = Zstd_gets; + CFH->getc_func = Zstd_getc; + CFH->close_func = Zstd_close; + CFH->eof_func = Zstd_eof; + CFH->get_error_func = Zstd_get_error; + + CFH->compression_spec = compression_spec; + + CFH->private_data = NULL; +} + +#endif /* USE_ZSTD */ diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h new file mode 100644 index 00000000000..2aaa6b100b1 --- /dev/null +++ b/src/bin/pg_dump/compress_zstd.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * compress_zstd.h + * Zstd interface to compress_io.c routines + * + * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/bin/pg_dump/compress_zstd.h + * + *------------------------------------------------------------------------- + */ + +#ifndef COMPRESS_ZSTD_H +#define COMPRESS_ZSTD_H + +#include "compress_io.h" + +extern void InitCompressorZstd(CompressorState *cs, + const pg_compress_specification compression_spec); +extern void InitCompressFileHandleZstd(CompressFileHandle *CFH, + const pg_compress_specification compression_spec); + +#endif /* COMPRESS_ZSTD_H */ diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build index b2fb7ac77fd..9d59a106f36 100644 --- a/src/bin/pg_dump/meson.build +++ b/src/bin/pg_dump/meson.build @@ -5,6 +5,7 @@ pg_dump_common_sources = files( 'compress_io.c', 'compress_lz4.c', 'compress_none.c', + 'compress_zstd.c', 'dumputils.c', 'parallel.c', 'pg_backup_archiver.c', @@ -19,7 +20,7 @@ pg_dump_common_sources = files( pg_dump_common = static_library('libpgdump_common', pg_dump_common_sources, c_pch: pch_postgres_fe_h, - dependencies: [frontend_code, libpq, lz4, zlib], + dependencies: [frontend_code, libpq, lz4, zlib, zstd], kwargs: internal_lib_args, ) @@ -90,6 +91,7 @@ tests += { 'env': { 'GZIP_PROGRAM': gzip.path(), 'LZ4': program_lz4.found() ? program_lz4.path() : '', + 'ZSTD': program_zstd.found() ? program_zstd.path() : '', 'with_icu': icu.found() ? 'yes' : 'no', }, 'tests': [ diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c index ab77e373e91..e8ee6b1ad86 100644 --- a/src/bin/pg_dump/pg_backup_archiver.c +++ b/src/bin/pg_dump/pg_backup_archiver.c @@ -2120,7 +2120,7 @@ _discoverArchiveFormat(ArchiveHandle *AH) /* * Check if the specified archive is a directory. If so, check if - * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it. + * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it. */ if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode)) { @@ -2131,10 +2131,17 @@ _discoverArchiveFormat(ArchiveHandle *AH) if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz")) return AH->format; #endif + #ifdef USE_LZ4 if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4")) return AH->format; #endif + +#ifdef USE_ZSTD + if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst")) + return AH->format; +#endif + pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)", AH->fSpec); fh = NULL; /* keep compiler quiet */ diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c index abaaa3b10e3..2177d5ff425 100644 --- a/src/bin/pg_dump/pg_backup_directory.c +++ b/src/bin/pg_dump/pg_backup_directory.c @@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH) strlcat(fname, ".gz", sizeof(fname)); else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4) strlcat(fname, ".lz4", sizeof(fname)); + else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD) + strlcat(fname, ".zst", sizeof(fname)); if (stat(fname, &st) == 0) te->dataLength = st.st_size; diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 6abbcff6834..a426984046b 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -56,6 +56,7 @@ #include "catalog/pg_type_d.h" #include "common/connect.h" #include "common/relpath.h" +#include "compress_io.h" #include "dumputils.h" #include "fe_utils/option_utils.h" #include "fe_utils/string_utils.h" @@ -735,18 +736,13 @@ main(int argc, char **argv) pg_fatal("invalid compression specification: %s", error_detail); - switch (compression_algorithm) - { - case PG_COMPRESSION_NONE: - /* fallthrough */ - case PG_COMPRESSION_GZIP: - /* fallthrough */ - case PG_COMPRESSION_LZ4: - break; - case PG_COMPRESSION_ZSTD: - pg_fatal("compression with %s is not yet supported", "ZSTD"); - break; - } + error_detail = supports_compression(compression_spec); + if (error_detail != NULL) + pg_fatal("%s", error_detail); + + if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS) + pg_log_warning("compression option \"%s\" is not currently supported by pg_dump", + "workers"); /* * Custom and directory formats are compressed by default with gzip when diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 42215f82f7a..74f23ae7f74 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -54,8 +54,9 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir; # those lines) to validate that part of the process. my $supports_icu = ($ENV{with_icu} eq 'yes'); -my $supports_lz4 = check_pg_config("#define USE_LZ4 1"); my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1"); +my $supports_lz4 = check_pg_config("#define USE_LZ4 1"); +my $supports_zstd = check_pg_config("#define USE_ZSTD 1"); my %pgdump_runs = ( binary_upgrade => { @@ -213,6 +214,77 @@ my %pgdump_runs = ( }, }, + compression_zstd_custom => { + test_key => 'compression', + compile_option => 'zstd', + dump_cmd => [ + 'pg_dump', '--format=custom', + '--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump", + 'postgres', + ], + restore_cmd => [ + 'pg_restore', + "--file=$tempdir/compression_zstd_custom.sql", + "$tempdir/compression_zstd_custom.dump", + ], + command_like => { + command => [ + 'pg_restore', + '-l', "$tempdir/compression_zstd_custom.dump", + ], + expected => qr/Compression: zstd/, + name => 'data content is zstd compressed' + }, + }, + + compression_zstd_dir => { + test_key => 'compression', + compile_option => 'zstd', + dump_cmd => [ + 'pg_dump', '--jobs=2', + '--format=directory', '--compress=zstd:1', + "--file=$tempdir/compression_zstd_dir", 'postgres', + ], + # Give coverage for manually compressed blob.toc files during + # restore. + compress_cmd => { + program => $ENV{'ZSTD'}, + args => [ + '-z', '-f', '--rm', + "$tempdir/compression_zstd_dir/blobs.toc", + "-o", "$tempdir/compression_zstd_dir/blobs.toc.zst", + ], + }, + # Verify that data files were compressed + glob_patterns => [ + "$tempdir/compression_zstd_dir/toc.dat", + "$tempdir/compression_zstd_dir/*.dat.zst", + ], + restore_cmd => [ + 'pg_restore', '--jobs=2', + "--file=$tempdir/compression_zstd_dir.sql", + "$tempdir/compression_zstd_dir", + ], + }, + + compression_zstd_plain => { + test_key => 'compression', + compile_option => 'zstd', + dump_cmd => [ + 'pg_dump', '--format=plain', '--compress=zstd', + "--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres', + ], + # Decompress the generated file to run through the tests. + compress_cmd => { + program => $ENV{'ZSTD'}, + args => [ + '-d', '-f', + "$tempdir/compression_zstd_plain.sql.zst", + "-o", "$tempdir/compression_zstd_plain.sql", + ], + }, + }, + clean => { dump_cmd => [ 'pg_dump', @@ -4648,10 +4720,11 @@ foreach my $run (sort keys %pgdump_runs) my $test_key = $run; my $run_db = 'postgres'; - # Skip command-level tests for gzip/lz4 if there is no support for it. + # Skip command-level tests for gzip/lz4/zstd if the tool is not supported if ($pgdump_runs{$run}->{compile_option} && (($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) || - ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))) + ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4) || + ($pgdump_runs{$run}->{compile_option} eq 'zstd' && !$supports_zstd))) { note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support"; next; diff --git a/src/tools/pginclude/cpluspluscheck b/src/tools/pginclude/cpluspluscheck index 58039934756..10fb51585c9 100755 --- a/src/tools/pginclude/cpluspluscheck +++ b/src/tools/pginclude/cpluspluscheck @@ -154,6 +154,7 @@ do test "$f" = src/bin/pg_dump/compress_io.h && continue test "$f" = src/bin/pg_dump/compress_lz4.h && continue test "$f" = src/bin/pg_dump/compress_none.h && continue + test "$f" = src/bin/pg_dump/compress_zstd.h && continue test "$f" = src/bin/pg_dump/parallel.h && continue test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue test "$f" = src/bin/pg_dump/pg_dump.h && continue diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list index 5c0410869f7..065acb6f50b 100644 --- a/src/tools/pgindent/typedefs.list +++ b/src/tools/pgindent/typedefs.list @@ -3937,3 +3937,4 @@ yyscan_t z_stream z_streamp zic_t +ZSTD_CStream -- 2.34.1
>From c82621cb1b1d5ab70dee2245be06ca29cabf8c35 Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Sun, 27 Mar 2022 11:55:01 -0500 Subject: [PATCH 2/4] zstd: support long distance mode in pg_dump/basebackup First proposed here: 20220327205020.gm28...@telsasoft.com --- doc/src/sgml/protocol.sgml | 10 +++- doc/src/sgml/ref/pg_basebackup.sgml | 4 +- doc/src/sgml/ref/pg_dump.sgml | 2 + src/backend/backup/basebackup_zstd.c | 12 ++++ src/bin/pg_basebackup/bbstreamer_zstd.c | 13 +++++ src/bin/pg_basebackup/t/010_pg_basebackup.pl | 9 ++- src/bin/pg_dump/compress_zstd.c | 5 ++ src/bin/pg_dump/t/002_pg_dump.pl | 3 +- src/bin/pg_verifybackup/t/008_untar.pl | 8 +++ src/bin/pg_verifybackup/t/010_client_untar.pl | 8 +++ src/common/compression.c | 57 ++++++++++++++++++- src/include/common/compression.h | 2 + 12 files changed, 127 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 8b5e7b1ad7f..b11d9a6ba35 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -2729,7 +2729,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" level. Otherwise, it should be a comma-separated list of items, each of the form <replaceable>keyword</replaceable> or <replaceable>keyword=value</replaceable>. Currently, the supported - keywords are <literal>level</literal> and <literal>workers</literal>. + keywords are <literal>level</literal>, <literal>long</literal> and + <literal>workers</literal>. </para> <para> @@ -2746,6 +2747,13 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" <literal>3</literal>). </para> + <para> + The <literal>long</literal> keyword enables long-distance matching + mode, for improved compression ratio, at the expense of higher memory + use. Long-distance mode is supported only for + <literal>zstd</literal>. + </para> + <para> The <literal>workers</literal> keyword sets the number of threads that should be used for parallel compression. Parallel compression diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml index db3ad9cd5eb..79d3e657c32 100644 --- a/doc/src/sgml/ref/pg_basebackup.sgml +++ b/doc/src/sgml/ref/pg_basebackup.sgml @@ -424,8 +424,8 @@ PostgreSQL documentation level. Otherwise, it should be a comma-separated list of items, each of the form <literal>keyword</literal> or <literal>keyword=value</literal>. - Currently, the supported keywords are <literal>level</literal> - and <literal>workers</literal>. + Currently, the supported keywords are <literal>level</literal>, + <literal>long</literal>, and <literal>workers</literal>. The detail string cannot be used when the compression method is specified as a plain integer. </para> diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index 8de38e0fd0d..e81e35c13b3 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -681,6 +681,8 @@ PostgreSQL documentation as though it had been fed through <application>gzip</application>, <application>lz4</application>, or <application>zstd</application>; but the default is not to compress. + With zstd compression, <literal>long</literal> mode may improve the + compression ratio, at the cost of increased memory use. </para> <para> The tar archive format currently does not support compression at all. diff --git a/src/backend/backup/basebackup_zstd.c b/src/backend/backup/basebackup_zstd.c index ac6cac178a0..1bb5820c884 100644 --- a/src/backend/backup/basebackup_zstd.c +++ b/src/backend/backup/basebackup_zstd.c @@ -118,6 +118,18 @@ bbsink_zstd_begin_backup(bbsink *sink) compress->workers, ZSTD_getErrorName(ret))); } + if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0) + { + ret = ZSTD_CCtx_setParameter(mysink->cctx, + ZSTD_c_enableLongDistanceMatching, + compress->long_distance); + if (ZSTD_isError(ret)) + ereport(ERROR, + errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not set compression flag for %s: %s", + "long", ZSTD_getErrorName(ret))); + } + /* * We need our own buffer, because we're going to pass different data to * the next sink than what gets passed to us. diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c index fe17d6df4ef..fba391e2a0f 100644 --- a/src/bin/pg_basebackup/bbstreamer_zstd.c +++ b/src/bin/pg_basebackup/bbstreamer_zstd.c @@ -106,6 +106,19 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *comp compress->workers, ZSTD_getErrorName(ret)); } + if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0) + { + ret = ZSTD_CCtx_setParameter(streamer->cctx, + ZSTD_c_enableLongDistanceMatching, + compress->long_distance); + if (ZSTD_isError(ret)) + { + pg_log_error("could not set compression flag for %s: %s", + "long", ZSTD_getErrorName(ret)); + exit(1); + } + } + /* Initialize the ZSTD output buffer. */ streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data; streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen; diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl index b60cb78a0d5..4d130a7f944 100644 --- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl +++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl @@ -139,7 +139,14 @@ SKIP: 'gzip:workers=3', 'invalid compression specification: compression algorithm "gzip" does not accept a worker count', 'failure on worker count for gzip' - ],); + ], + [ + 'gzip:long', + 'invalid compression specification: compression algorithm "gzip" does not support long-distance mode', + 'failure on long mode for gzip' + ], + ); + for my $cft (@compression_failure_tests) { my $cfail = quotemeta($client_fails . $cft->[1]); diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index cf85c3a4c93..49a877ce010 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -80,6 +80,11 @@ _ZstdCStreamParams(pg_compress_specification compress) _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel, compress.level, "level"); + if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE) + _Zstd_CCtx_setParam_or_die(cstream, + ZSTD_c_enableLongDistanceMatching, + compress.long_distance, "long"); + return cstream; } diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index 74f23ae7f74..bb898b06bb4 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -267,11 +267,12 @@ my %pgdump_runs = ( ], }, + # Exercise long mode for test coverage compression_zstd_plain => { test_key => 'compression', compile_option => 'zstd', dump_cmd => [ - 'pg_dump', '--format=plain', '--compress=zstd', + 'pg_dump', '--format=plain', '--compress=zstd:long', "--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres', ], # Decompress the generated file to run through the tests. diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl index 3007bbe8556..05754bc8ec7 100644 --- a/src/bin/pg_verifybackup/t/008_untar.pl +++ b/src/bin/pg_verifybackup/t/008_untar.pl @@ -49,6 +49,14 @@ my @test_configuration = ( 'decompress_program' => $ENV{'ZSTD'}, 'decompress_flags' => ['-d'], 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, + { + 'compression_method' => 'zstd', + 'backup_flags' => [ '--compress', 'server-zstd:level=1,long' ], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => ['-d'], + 'enabled' => check_pg_config("#define USE_ZSTD 1") }); for my $tc (@test_configuration) diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl index f3aa0f59e29..ac51a174d14 100644 --- a/src/bin/pg_verifybackup/t/010_client_untar.pl +++ b/src/bin/pg_verifybackup/t/010_client_untar.pl @@ -50,6 +50,14 @@ my @test_configuration = ( 'decompress_flags' => ['-d'], 'enabled' => check_pg_config("#define USE_ZSTD 1") }, + { + 'compression_method' => 'zstd', + 'backup_flags' => ['--compress', 'client-zstd:level=1,long'], + 'backup_archive' => 'base.tar.zst', + 'decompress_program' => $ENV{'ZSTD'}, + 'decompress_flags' => [ '-d' ], + 'enabled' => check_pg_config("#define USE_ZSTD 1") + }, { 'compression_method' => 'parallel zstd', 'backup_flags' => [ '--compress', 'client-zstd:workers=3' ], diff --git a/src/common/compression.c b/src/common/compression.c index 2d3e56b4d62..35a7cade645 100644 --- a/src/common/compression.c +++ b/src/common/compression.c @@ -12,7 +12,7 @@ * Otherwise, a compression specification is a comma-separated list of items, * each having the form keyword or keyword=value. * - * Currently, the only supported keywords are "level" and "workers". + * Currently, the supported keywords are "level", "long", and "workers". * * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group * @@ -38,6 +38,8 @@ static int expect_integer_value(char *keyword, char *value, pg_compress_specification *result); +static bool expect_boolean_value(char *keyword, char *value, + pg_compress_specification *result); /* * Look up a compression algorithm by name. Returns true and sets *algorithm @@ -232,6 +234,11 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio result->workers = expect_integer_value(keyword, value, result); result->options |= PG_COMPRESSION_OPTION_WORKERS; } + else if (strcmp(keyword, "long") == 0) + { + result->long_distance = expect_boolean_value(keyword, value, result); + result->options |= PG_COMPRESSION_OPTION_LONG_DISTANCE; + } else result->parse_error = psprintf(_("unrecognized compression option: \"%s\""), keyword); @@ -289,6 +296,43 @@ expect_integer_value(char *keyword, char *value, pg_compress_specification *resu return ivalue; } +/* + * Parse 'value' as a boolean and return the result. + * + * If parsing fails, set result->parse_error to an appropriate message + * and return -1. The caller must check result->parse_error to determine if + * the call was successful. + * + * Valid values are: yes, no, on, off, 1, 0. + * + * Inspired by ParseVariableBool(). + */ +static bool +expect_boolean_value(char *keyword, char *value, pg_compress_specification *result) +{ + if (value == NULL) + return true; + + if (pg_strcasecmp(value, "yes") == 0) + return true; + if (pg_strcasecmp(value, "on") == 0) + return true; + if (pg_strcasecmp(value, "1") == 0) + return true; + + if (pg_strcasecmp(value, "no") == 0) + return false; + if (pg_strcasecmp(value, "off") == 0) + return false; + if (pg_strcasecmp(value, "0") == 0) + return false; + + result->parse_error = + psprintf(_("value for compression option \"%s\" must be a boolean"), + keyword); + return false; +} + /* * Returns NULL if the compression specification string was syntactically * valid and semantically sensible. Otherwise, returns an error message. @@ -354,6 +398,17 @@ validate_compress_specification(pg_compress_specification *spec) get_compress_algorithm_name(spec->algorithm)); } + /* + * Of the compression algorithms that we currently support, only zstd + * supports long-distance mode. + */ + if ((spec->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0 && + (spec->algorithm != PG_COMPRESSION_ZSTD)) + { + return psprintf(_("compression algorithm \"%s\" does not support long-distance mode"), + get_compress_algorithm_name(spec->algorithm)); + } + return NULL; } diff --git a/src/include/common/compression.h b/src/include/common/compression.h index b48c173022e..38aae9dd873 100644 --- a/src/include/common/compression.h +++ b/src/include/common/compression.h @@ -27,6 +27,7 @@ typedef enum pg_compress_algorithm } pg_compress_algorithm; #define PG_COMPRESSION_OPTION_WORKERS (1 << 0) +#define PG_COMPRESSION_OPTION_LONG_DISTANCE (1 << 1) typedef struct pg_compress_specification { @@ -34,6 +35,7 @@ typedef struct pg_compress_specification unsigned options; /* OR of PG_COMPRESSION_OPTION constants */ int level; int workers; + bool long_distance; char *parse_error; /* NULL if parsing was OK, else message */ } pg_compress_specification; -- 2.34.1
>From 1b78e26b5901de39c24677362c78391cb7c39b6a Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Thu, 30 Mar 2023 17:48:57 -0500 Subject: [PATCH 3/4] WIP: pg_dump: support zstd workers This is a separate commit since it's not essential; the zstd library is frequently compiled without threading support, so the functionality isn't very well-tested, and because use of zstd threads might conceivably play poorly with pg_dump's use of threads under Windows. Targetting postgres v17. --- doc/src/sgml/ref/pg_dump.sgml | 8 ++++++-- src/bin/pg_dump/compress_zstd.c | 4 ++++ src/bin/pg_dump/pg_dump.c | 4 ---- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml index e81e35c13b3..1d55ce05b21 100644 --- a/doc/src/sgml/ref/pg_dump.sgml +++ b/doc/src/sgml/ref/pg_dump.sgml @@ -681,8 +681,12 @@ PostgreSQL documentation as though it had been fed through <application>gzip</application>, <application>lz4</application>, or <application>zstd</application>; but the default is not to compress. - With zstd compression, <literal>long</literal> mode may improve the - compression ratio, at the cost of increased memory use. + With zstd compression, <literal>long</literal> and + <literal>workers</literal> options may be specified to enable long-distance + matching and threaded workers, respectively. + Long distance mode may improve the compression ratio, at the cost of + increased memory use. + Threaded workers allow leveraging multiple CPUs during compression. </para> <para> The tar archive format currently does not support compression at all. diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index 49a877ce010..f1f84ad69c4 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -85,6 +85,10 @@ _ZstdCStreamParams(pg_compress_specification compress) ZSTD_c_enableLongDistanceMatching, compress.long_distance, "long"); + if (compress.options & PG_COMPRESSION_OPTION_WORKERS) + _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers, + compress.workers, "workers"); + return cstream; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index a426984046b..240dcdb0223 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -740,10 +740,6 @@ main(int argc, char **argv) if (error_detail != NULL) pg_fatal("%s", error_detail); - if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS) - pg_log_warning("compression option \"%s\" is not currently supported by pg_dump", - "workers"); - /* * Custom and directory formats are compressed by default with gzip when * available, not the others. -- 2.34.1
>From 6d8d1d6474b4a02689e21e59240188d4e621ef2a Mon Sep 17 00:00:00 2001 From: Justin Pryzby <pryz...@telsasoft.com> Date: Wed, 4 Jan 2023 21:21:53 -0600 Subject: [PATCH 4/4] TMP: pg_dump: use Zstd by default, for CI only //-os-only: linux-meson --- .cirrus.yml | 9 ++++++++- src/bin/pg_dump/compress_zstd.c | 9 +++++++++ src/bin/pg_dump/pg_dump.c | 4 ++-- src/bin/pg_dump/t/002_pg_dump.pl | 14 +++++++------- 4 files changed, 26 insertions(+), 10 deletions(-) diff --git a/.cirrus.yml b/.cirrus.yml index 5b1747522f9..14402a0ad5c 100644 --- a/.cirrus.yml +++ b/.cirrus.yml @@ -267,6 +267,7 @@ LINUX_CONFIGURE_FEATURES: &LINUX_CONFIGURE_FEATURES >- LINUX_MESON_FEATURES: &LINUX_MESON_FEATURES >- -Dllvm=enabled -Duuid=e2fs + -Dzstd=enabled # Linux, both 32bit and 64bit @@ -389,6 +390,9 @@ task: configure_script: | su postgres <<-EOF + mkdir subprojects + meson wrap install zstd + meson configure -D zstd:multithread=enabled --force-fallback-for=zstd meson setup \ --buildtype=debug \ -Dcassert=true \ @@ -616,7 +620,10 @@ task: # Use /DEBUG:FASTLINK to avoid high memory usage during linking configure_script: | vcvarsall x64 - meson setup --backend ninja --buildtype debug -Dc_link_args=/DEBUG:FASTLINK -Dcassert=true -Db_pch=true -Dextra_lib_dirs=c:\openssl\1.1\lib -Dextra_include_dirs=c:\openssl\1.1\include -DTAR=%TAR% -DPG_TEST_EXTRA="%PG_TEST_EXTRA%" build + mkdir subprojects + meson wrap install zstd + meson configure -D zstd:multithread=enabled --force-fallback-for=zstd + meson setup --backend ninja --buildtype debug -Dc_link_args=/DEBUG:FASTLINK -Dcassert=true -Db_pch=true -Dextra_lib_dirs=c:\openssl\1.1\lib -Dextra_include_dirs=c:\openssl\1.1\include -DTAR=%TAR% -DPG_TEST_EXTRA="%PG_TEST_EXTRA%" -D zstd=enabled build build_script: | vcvarsall x64 diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c index f1f84ad69c4..c7be670b4a3 100644 --- a/src/bin/pg_dump/compress_zstd.c +++ b/src/bin/pg_dump/compress_zstd.c @@ -88,6 +88,15 @@ _ZstdCStreamParams(pg_compress_specification compress) if (compress.options & PG_COMPRESSION_OPTION_WORKERS) _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers, compress.workers, "workers"); + else + { + size_t res; + + res = ZSTD_CCtx_setParameter(cstream, ZSTD_c_nbWorkers, 3); + if (ZSTD_isError(res)) + pg_log_warning("could not set compression parameter: \"%s\": %s", + "workers", ZSTD_getErrorName(res)); + } return cstream; } diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c index 240dcdb0223..b90a1087f30 100644 --- a/src/bin/pg_dump/pg_dump.c +++ b/src/bin/pg_dump/pg_dump.c @@ -747,8 +747,8 @@ main(int argc, char **argv) if ((archiveFormat == archCustom || archiveFormat == archDirectory) && !user_compression_defined) { -#ifdef HAVE_LIBZ - parse_compress_specification(PG_COMPRESSION_GZIP, NULL, +#ifdef USE_ZSTD + parse_compress_specification(PG_COMPRESSION_ZSTD, NULL, &compression_spec); #else /* Nothing to do in the default case */ diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl index bb898b06bb4..0a635ae9fc3 100644 --- a/src/bin/pg_dump/t/002_pg_dump.pl +++ b/src/bin/pg_dump/t/002_pg_dump.pl @@ -385,10 +385,10 @@ my %pgdump_runs = ( command_like => { command => [ 'pg_restore', '-l', "$tempdir/defaults_custom_format.dump", ], - expected => $supports_gzip ? - qr/Compression: gzip/ : + expected => $supports_zstd ? + qr/Compression: zstd/ : qr/Compression: none/, - name => 'data content is gzip-compressed by default if available', + name => 'data content is zstd-compressed by default if available', }, }, @@ -410,16 +410,16 @@ my %pgdump_runs = ( command_like => { command => [ 'pg_restore', '-l', "$tempdir/defaults_dir_format", ], - expected => $supports_gzip ? - qr/Compression: gzip/ : + expected => $supports_zstd ? + qr/Compression: zstd/ : qr/Compression: none/, name => 'data content is gzip-compressed by default', }, glob_patterns => [ "$tempdir/defaults_dir_format/toc.dat", "$tempdir/defaults_dir_format/blobs.toc", - $supports_gzip ? - "$tempdir/defaults_dir_format/*.dat.gz" : + $supports_zstd ? + "$tempdir/defaults_dir_format/*.dat.zst" : "$tempdir/defaults_dir_format/*.dat", ], }, -- 2.34.1