On Sat, Jan 7, 2017 at 8:19 PM, Magnus Hagander <mag...@hagander.net> wrote: > On Sat, Jan 7, 2017 at 12:31 AM, Michael Paquier <michael.paqu...@gmail.com> > wrote: >> There is something I forgot. With this patch, >> FindStreamingStart()@pg_receivexlog.c is actually broken. In short it >> forgets to consider files that have been compressed at the last run of >> pg_receivexlog and will try to stream changes from the beginning. I >> can see that gzip -l provides this information... But I have yet to >> find something in zlib that allows a cheap lookup as startup of >> streaming should be fast. Looking at how gzip -l does it may be faster >> than looking at the docs. > > Do we really care though? As in, does startup of streaming have to be *that* > fast? Even gunziping 16Mb (worst case) doesn't exactly take a long time. If > your pg_receivexlog is restarting so often that it becomes a problem, I > think you already have another and much bigger problem on your hands.
Based on some analysis, it is enough to look at the last 4 bytes of the compressed file to get the size output data with a single call to lseek() and then read(). So as there is a simple way to do things and that's far cheaper than decompressing perhaps hundred of segments I'd rather do it this way. Attached is the implementation. This code is using 2 booleans for 4 states of the file names: full non-compressed, partial non-compressed, full compressed and partial compressed. This keeps the last check of FindStreamingStart() more simple, but that's quite fancy lately to have an enum for such things :D > I found another problem with it -- it is completely broken in sync mode. You > need to either forbid sync mode together with compression, or teach > dir_sync() about it. The later would sound better, but I wonder how much > that's going to kill compression due to the small blocks? Is it a reasonable > use-case? Hm. Looking at the docs I think that --compress defined with --synchronous maps to the use of Z_SYNC_FLUSH with gzflush(). FWIW I don't have a direct use case for it, but it is not a major effort to add it, so done. There is no actual reason to forbid this combinations of options either. -- Michael
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml index bfa055b58b..8c1ea9a2e2 100644 --- a/doc/src/sgml/ref/pg_receivexlog.sgml +++ b/doc/src/sgml/ref/pg_receivexlog.sgml @@ -180,6 +180,19 @@ PostgreSQL documentation </para> </listitem> </varlistentry> + + <varlistentry> + <term><option>-Z <replaceable class="parameter">level</replaceable></option></term> + <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term> + <listitem> + <para> + Enables gzip compression of transaction logs, and specifies the + compression level (0 through 9, 0 being no compression and 9 being best + compression). The suffix <filename>.gz</filename> will + automatically be added to all filenames. + </para> + </listitem> + </varlistentry> </variablelist> <para> diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c index 8ebf24e771..b9c0bb5fff 100644 --- a/src/bin/pg_basebackup/pg_basebackup.c +++ b/src/bin/pg_basebackup/pg_basebackup.c @@ -481,7 +481,7 @@ LogStreamerMain(logstreamer_param *param) stream.partial_suffix = NULL; if (format == 'p') - stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync); + stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync); else stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync); diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c index b6f57a878c..74fa5c68c0 100644 --- a/src/bin/pg_basebackup/pg_receivexlog.c +++ b/src/bin/pg_basebackup/pg_receivexlog.c @@ -34,6 +34,7 @@ /* Global options */ static char *basedir = NULL; static int verbose = 0; +static int compresslevel = 0; static int noloop = 0; static int standby_message_timeout = 10 * 1000; /* 10 sec = default */ static volatile bool time_to_abort = false; @@ -57,6 +58,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline, exit(code); \ } +/* Routines to evaluate segment file format */ +#define IsCompressXLogFileName(fname) \ + (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \ + strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ + strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0) +#define IsPartialCompressXLogFileName(fname) \ + (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \ + strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN && \ + strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0) static void usage(void) @@ -75,6 +85,7 @@ usage(void) printf(_(" --synchronous flush transaction log immediately after writing\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" -V, --version output version information, then exit\n")); + printf(_(" -Z, --compress=0-9 compress logs with given compression level\n")); printf(_(" -?, --help show this help, then exit\n")); printf(_("\nConnection options:\n")); printf(_(" -d, --dbname=CONNSTR connection string\n")); @@ -187,14 +198,31 @@ FindStreamingStart(uint32 *tli) uint32 tli; XLogSegNo segno; bool ispartial; + bool iscompress; /* * Check if the filename looks like an xlog file, or a .partial file. */ if (IsXLogFileName(dirent->d_name)) + { ispartial = false; + iscompress = false; + } else if (IsPartialXLogFileName(dirent->d_name)) + { + ispartial = true; + iscompress = false; + } + else if (IsCompressXLogFileName(dirent->d_name)) + { + ispartial = false; + iscompress = true; + } + else if (IsPartialCompressXLogFileName(dirent->d_name)) + { ispartial = true; + iscompress = true; + } else continue; @@ -205,9 +233,11 @@ FindStreamingStart(uint32 *tli) /* * Check that the segment has the right size, if it's supposed to be - * completed. + * completed. For non-compressed segments just check the on-disk size. + * For compressed segments, look at the last 4 bytes of the compressed + * file and check the size of the uncompressed data. */ - if (!ispartial) + if (!ispartial && !iscompress) { struct stat statbuf; char fullpath[MAXPGPATH]; @@ -228,6 +258,47 @@ FindStreamingStart(uint32 *tli) continue; } } + else if (!ispartial && iscompress) + { + int fd; + char buf[4]; + int bytes_out; + char fullpath[MAXPGPATH]; + + snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name); + + fd = open(fullpath, O_RDONLY | PG_BINARY); + if (fd < 0) + { + fprintf(stderr, _("%s: could not open file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + if (lseek(fd, (off_t)(-4), SEEK_END) < 0) + { + fprintf(stderr, _("%s: could not seek file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf)) + { + fprintf(stderr, _("%s: could not read file \"%s\": %s\n"), + progname, fullpath, strerror(errno)); + disconnect_and_exit(1); + } + + close(fd); + bytes_out = (buf[3] << 24) | (buf[2] << 16) | + (buf[1] << 8) | buf[0]; + + if (bytes_out != XLOG_SEG_SIZE) + { + fprintf(stderr, + _("%s: segment file \"%s\" has incorrect size %d, skipping\n"), + progname, dirent->d_name, bytes_out); + continue; + } + } /* Looks like a valid segment. Remember that we saw it. */ if ((segno > high_segno) || @@ -338,7 +409,8 @@ StreamLog(void) stream.synchronous = synchronous; stream.do_sync = true; stream.mark_done = false; - stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync); + stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel, + stream.do_sync); stream.partial_suffix = ".partial"; ReceiveXlogStream(conn, &stream); @@ -389,6 +461,7 @@ main(int argc, char **argv) {"status-interval", required_argument, NULL, 's'}, {"slot", required_argument, NULL, 'S'}, {"verbose", no_argument, NULL, 'v'}, + {"compress", required_argument, NULL, 'Z'}, /* action */ {"create-slot", no_argument, NULL, 1}, {"drop-slot", no_argument, NULL, 2}, @@ -419,7 +492,7 @@ main(int argc, char **argv) } } - while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv", + while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:", long_options, &option_index)) != -1) { switch (c) @@ -469,6 +542,15 @@ main(int argc, char **argv) case 'v': verbose++; break; + case 'Z': + compresslevel = atoi(optarg); + if (compresslevel < 0 || compresslevel > 9) + { + fprintf(stderr, _("%s: invalid compression level \"%s\"\n"), + progname, optarg); + exit(1); + } + break; /* action */ case 1: do_create_slot = true; @@ -535,6 +617,16 @@ main(int argc, char **argv) exit(1); } +#ifndef HAVE_LIBZ + if (compresslevel != 0) + { + fprintf(stderr, + _("%s: this build does not support compression\n"), + progname); + exit(1); + } +#endif + /* * Check existence of destination folder. */ diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c index 88ee603b8b..b7d075f15e 100644 --- a/src/bin/pg_basebackup/walmethods.c +++ b/src/bin/pg_basebackup/walmethods.c @@ -41,6 +41,7 @@ typedef struct DirectoryMethodData { char *basedir; + int compression; bool sync; } DirectoryMethodData; static DirectoryMethodData *dir_data = NULL; @@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile char *pathname; char *fullpath; char *temp_suffix; +#ifdef HAVE_LIBZ + gzFile gzfp; +#endif } DirectoryMethodFile; static char * @@ -70,17 +74,40 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ static char tmppath[MAXPGPATH]; int fd; DirectoryMethodFile *f; +#ifdef HAVE_LIBZ + gzFile gzfp; +#endif - snprintf(tmppath, sizeof(tmppath), "%s/%s%s", - dir_data->basedir, pathname, temp_suffix ? temp_suffix : ""); + snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", + dir_data->basedir, pathname, + dir_data->compression > 0 ? ".gz" : "", + temp_suffix ? temp_suffix : ""); - fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); - if (fd < 0) - return NULL; +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + { + gzfp = gzopen(tmppath, "wb"); + if (gzfp == NULL) + return NULL; - if (pad_to_size) + if (gzsetparams(gzfp, dir_data->compression, + Z_DEFAULT_STRATEGY) != Z_OK) + { + gzclose(gzfp); + return NULL; + } + } + else +#endif + { + fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR); + if (fd < 0) + return NULL; + } + + /* Do pre-padding on non-compressed files */ + if (pad_to_size && dir_data->compression == 0) { - /* Always pre-pad on regular files */ char *zerobuf; int bytes; @@ -120,13 +147,23 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_ if (fsync_fname(tmppath, false, progname) != 0 || fsync_parent_path(tmppath, progname) != 0) { - close(fd); +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + gzclose(gzfp); + else +#endif + close(fd); return NULL; } } f = pg_malloc0(sizeof(DirectoryMethodFile)); - f->fd = fd; +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + f->gzfp = gzfp; + else +#endif + f->fd = fd; f->currpos = 0; f->pathname = pg_strdup(pathname); f->fullpath = pg_strdup(tmppath); @@ -144,7 +181,12 @@ dir_write(Walfile f, const void *buf, size_t count) Assert(f != NULL); - r = write(df->fd, buf, count); +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + r = (ssize_t) gzwrite(df->gzfp, buf, count); + else +#endif + r = write(df->fd, buf, count); if (r > 0) df->currpos += r; return r; @@ -169,7 +211,12 @@ dir_close(Walfile f, WalCloseMethod method) Assert(f != NULL); - r = close(df->fd); +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + r = gzclose(df->gzfp); + else +#endif + r = close(df->fd); if (r == 0) { @@ -180,17 +227,22 @@ dir_close(Walfile f, WalCloseMethod method) * If we have a temp prefix, normal operation is to rename the * file. */ - snprintf(tmppath, sizeof(tmppath), "%s/%s%s", - dir_data->basedir, df->pathname, df->temp_suffix); - snprintf(tmppath2, sizeof(tmppath2), "%s/%s", - dir_data->basedir, df->pathname); + snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", + dir_data->basedir, df->pathname, + dir_data->compression > 0 ? ".gz" : "", + df->temp_suffix); + snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s", + dir_data->basedir, df->pathname, + dir_data->compression > 0 ? ".gz" : ""); r = durable_rename(tmppath, tmppath2, progname); } else if (method == CLOSE_UNLINK) { /* Unlink the file once it's closed */ - snprintf(tmppath, sizeof(tmppath), "%s/%s%s", - dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : ""); + snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s", + dir_data->basedir, df->pathname, + dir_data->compression > 0 ? ".gz" : "", + df->temp_suffix ? df->temp_suffix : ""); r = unlink(tmppath); } else @@ -226,6 +278,15 @@ dir_sync(Walfile f) if (!dir_data->sync) return 0; +#ifdef HAVE_LIBZ + if (dir_data->compression > 0) + { + if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK) + return -1; + return 0; + } +#endif + return fsync(((DirectoryMethodFile *) f)->fd); } @@ -277,7 +338,7 @@ dir_finish(void) WalWriteMethod * -CreateWalDirectoryMethod(const char *basedir, bool sync) +CreateWalDirectoryMethod(const char *basedir, int compression, bool sync) { WalWriteMethod *method; @@ -293,6 +354,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync) method->getlasterror = dir_getlasterror; dir_data = pg_malloc0(sizeof(DirectoryMethodData)); + dir_data->compression = compression; dir_data->basedir = pg_strdup(basedir); dir_data->sync = sync; diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h index c1723d53b5..2cd8b6d755 100644 --- a/src/bin/pg_basebackup/walmethods.h +++ b/src/bin/pg_basebackup/walmethods.h @@ -41,7 +41,8 @@ struct WalWriteMethod * (only implements the methods required for pg_basebackup, * not all those required for pg_receivexlog) */ -WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync); +WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, + int compression, bool sync); WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync); /* Cleanup routines for previously-created methods */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers