On Sat, Jan 7, 2017 at 8:19 PM, Magnus Hagander <mag...@hagander.net> wrote:
> On Sat, Jan 7, 2017 at 12:31 AM, Michael Paquier <michael.paqu...@gmail.com>
> wrote:
>> There is something I forgot. With this patch,
>> FindStreamingStart()@pg_receivexlog.c is actually broken. In short it
>> forgets to consider files that have been compressed at the last run of
>> pg_receivexlog and will try to stream changes from the beginning. I
>> can see that gzip -l provides this information... But I have yet to
>> find something in zlib that allows a cheap lookup as startup of
>> streaming should be fast. Looking at how gzip -l does it may be faster
>> than looking at the docs.
>
> Do we really care though? As in, does startup of streaming have to be *that*
> fast? Even gunziping 16Mb (worst case) doesn't exactly take a long time. If
> your pg_receivexlog is restarting so often that it becomes a problem, I
> think you already have another and much bigger problem on your hands.

Based on some analysis, it is enough to look at the last 4 bytes of
the compressed file to get the size output data with a single call to
lseek() and then read(). So as there is a simple way to do things and
that's far cheaper than decompressing perhaps hundred of segments I'd
rather do it this way. Attached is the implementation. This code is
using 2 booleans for 4 states of the file names: full non-compressed,
partial non-compressed, full compressed and partial compressed. This
keeps the last check of FindStreamingStart() more simple, but that's
quite fancy lately to have an enum for such things :D

> I found another problem with it -- it is completely broken in sync mode. You
> need to either forbid sync mode together with compression, or teach
> dir_sync() about it. The later would sound better, but I wonder how much
> that's going to kill compression due to the small blocks? Is it a reasonable
> use-case?

Hm. Looking at the docs I think that --compress defined with
--synchronous maps to the use of Z_SYNC_FLUSH with gzflush(). FWIW I
don't have a direct use case for it, but it is not a major effort to
add it, so done. There is no actual reason to forbid this combinations
of options either.
-- 
Michael
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index bfa055b58b..8c1ea9a2e2 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -180,6 +180,19 @@ PostgreSQL documentation
        </para>
       </listitem>
      </varlistentry>
+
+     <varlistentry>
+      <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
+      <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables gzip compression of transaction logs, and specifies the
+        compression level (0 through 9, 0 being no compression and 9 being best
+        compression).  The suffix <filename>.gz</filename> will
+        automatically be added to all filenames.
+       </para>
+      </listitem>
+     </varlistentry>
     </variablelist>
 
    <para>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8ebf24e771..b9c0bb5fff 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -481,7 +481,7 @@ LogStreamerMain(logstreamer_param *param)
 	stream.partial_suffix = NULL;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, do_sync);
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
 	else
 		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);
 
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index b6f57a878c..74fa5c68c0 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -34,6 +34,7 @@
 /* Global options */
 static char *basedir = NULL;
 static int	verbose = 0;
+static int	compresslevel = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
 static volatile bool time_to_abort = false;
@@ -57,6 +58,15 @@ static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
 	exit(code);									\
 	}
 
+/* Routines to evaluate segment file format */
+#define IsCompressXLogFileName(fname)    \
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") &&	\
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
+#define IsPartialCompressXLogFileName(fname)    \
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") &&	\
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
 
 static void
 usage(void)
@@ -75,6 +85,7 @@ usage(void)
 	printf(_("      --synchronous      flush transaction log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -187,14 +198,31 @@ FindStreamingStart(uint32 *tli)
 		uint32		tli;
 		XLogSegNo	segno;
 		bool		ispartial;
+		bool		iscompress;
 
 		/*
 		 * Check if the filename looks like an xlog file, or a .partial file.
 		 */
 		if (IsXLogFileName(dirent->d_name))
+		{
 			ispartial = false;
+			iscompress = false;
+		}
 		else if (IsPartialXLogFileName(dirent->d_name))
+		{
+			ispartial = true;
+			iscompress = false;
+		}
+		else if (IsCompressXLogFileName(dirent->d_name))
+		{
+			ispartial = false;
+			iscompress = true;
+		}
+		else if (IsPartialCompressXLogFileName(dirent->d_name))
+		{
 			ispartial = true;
+			iscompress = true;
+		}
 		else
 			continue;
 
@@ -205,9 +233,11 @@ FindStreamingStart(uint32 *tli)
 
 		/*
 		 * Check that the segment has the right size, if it's supposed to be
-		 * completed.
+		 * completed.  For non-compressed segments just check the on-disk size.
+		 * For compressed segments, look at the last 4 bytes of the compressed
+		 * file and check the size of the uncompressed data.
 		 */
-		if (!ispartial)
+		if (!ispartial && !iscompress)
 		{
 			struct stat statbuf;
 			char		fullpath[MAXPGPATH];
@@ -228,6 +258,47 @@ FindStreamingStart(uint32 *tli)
 				continue;
 			}
 		}
+		else if (!ispartial && iscompress)
+		{
+			int		fd;
+			char	buf[4];
+			int		bytes_out;
+			char	fullpath[MAXPGPATH];
+
+			snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+			fd = open(fullpath, O_RDONLY | PG_BINARY);
+			if (fd < 0)
+			{
+				fprintf(stderr, _("%s: could not open file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+			if (lseek(fd, (off_t)(-4), SEEK_END) < 0)
+			{
+				fprintf(stderr, _("%s: could not seek file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+			if (read(fd, (char *) buf, sizeof(buf)) != sizeof(buf))
+			{
+				fprintf(stderr, _("%s: could not read file \"%s\": %s\n"),
+						progname, fullpath, strerror(errno));
+				disconnect_and_exit(1);
+			}
+
+			close(fd);
+			bytes_out = (buf[3] << 24) | (buf[2] << 16) |
+						(buf[1] << 8) | buf[0];
+
+			if (bytes_out != XLOG_SEG_SIZE)
+			{
+				fprintf(stderr,
+						_("%s: segment file \"%s\" has incorrect size %d, skipping\n"),
+						progname, dirent->d_name, bytes_out);
+				continue;
+			}
+		}
 
 		/* Looks like a valid segment. Remember that we saw it. */
 		if ((segno > high_segno) ||
@@ -338,7 +409,8 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = true;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, stream.do_sync);
+	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+												stream.do_sync);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
@@ -389,6 +461,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
 		{"drop-slot", no_argument, NULL, 2},
@@ -419,7 +492,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWvZ:",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -469,6 +542,15 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'Z':
+				compresslevel = atoi(optarg);
+				if (compresslevel < 0 || compresslevel > 9)
+				{
+					fprintf(stderr, _("%s: invalid compression level \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				break;
 /* action */
 			case 1:
 				do_create_slot = true;
@@ -535,6 +617,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+#ifndef HAVE_LIBZ
+	if (compresslevel != 0)
+	{
+		fprintf(stderr,
+				_("%s: this build does not support compression\n"),
+				progname);
+		exit(1);
+	}
+#endif
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 88ee603b8b..b7d075f15e 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -41,6 +41,7 @@
 typedef struct DirectoryMethodData
 {
 	char	   *basedir;
+	int			compression;
 	bool		sync;
 }	DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
@@ -55,6 +56,9 @@ typedef struct DirectoryMethodFile
 	char	   *pathname;
 	char	   *fullpath;
 	char	   *temp_suffix;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 }	DirectoryMethodFile;
 
 static char *
@@ -70,17 +74,40 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 	static char tmppath[MAXPGPATH];
 	int			fd;
 	DirectoryMethodFile *f;
+#ifdef HAVE_LIBZ
+	gzFile		gzfp;
+#endif
 
-	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+			 dir_data->basedir, pathname,
+			 dir_data->compression > 0 ? ".gz" : "",
+			 temp_suffix ? temp_suffix : "");
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
-		return NULL;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		gzfp = gzopen(tmppath, "wb");
+		if (gzfp == NULL)
+			return NULL;
 
-	if (pad_to_size)
+		if (gzsetparams(gzfp, dir_data->compression,
+						Z_DEFAULT_STRATEGY) != Z_OK)
+		{
+			gzclose(gzfp);
+			return NULL;
+		}
+	}
+	else
+#endif
+	{
+		fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+		if (fd < 0)
+			return NULL;
+	}
+
+	/* Do pre-padding on non-compressed files */
+	if (pad_to_size && dir_data->compression == 0)
 	{
-		/* Always pre-pad on regular files */
 		char	   *zerobuf;
 		int			bytes;
 
@@ -120,13 +147,23 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		if (fsync_fname(tmppath, false, progname) != 0 ||
 			fsync_parent_path(tmppath, progname) != 0)
 		{
-			close(fd);
+#ifdef HAVE_LIBZ
+			if (dir_data->compression > 0)
+				gzclose(gzfp);
+			else
+#endif
+				close(fd);
 			return NULL;
 		}
 	}
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
-	f->fd = fd;
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		f->gzfp = gzfp;
+	else
+#endif
+		f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
 	f->fullpath = pg_strdup(tmppath);
@@ -144,7 +181,12 @@ dir_write(Walfile f, const void *buf, size_t count)
 
 	Assert(f != NULL);
 
-	r = write(df->fd, buf, count);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = (ssize_t) gzwrite(df->gzfp, buf, count);
+	else
+#endif
+		r = write(df->fd, buf, count);
 	if (r > 0)
 		df->currpos += r;
 	return r;
@@ -169,7 +211,12 @@ dir_close(Walfile f, WalCloseMethod method)
 
 	Assert(f != NULL);
 
-	r = close(df->fd);
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+		r = gzclose(df->gzfp);
+	else
+#endif
+		r = close(df->fd);
 
 	if (r == 0)
 	{
@@ -180,17 +227,22 @@ dir_close(Walfile f, WalCloseMethod method)
 			 * If we have a temp prefix, normal operation is to rename the
 			 * file.
 			 */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix);
-			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
-					 dir_data->basedir, df->pathname);
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "");
 			r = durable_rename(tmppath, tmppath2, progname);
 		}
 		else if (method == CLOSE_UNLINK)
 		{
 			/* Unlink the file once it's closed */
-			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
-					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
+					 dir_data->basedir, df->pathname,
+					 dir_data->compression > 0 ? ".gz" : "",
+					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
 		else
@@ -226,6 +278,15 @@ dir_sync(Walfile f)
 	if (!dir_data->sync)
 		return 0;
 
+#ifdef HAVE_LIBZ
+	if (dir_data->compression > 0)
+	{
+		if (gzflush(((DirectoryMethodFile *) f)->gzfp, Z_SYNC_FLUSH) != Z_OK)
+			return -1;
+		return 0;
+	}
+#endif
+
 	return fsync(((DirectoryMethodFile *) f)->fd);
 }
 
@@ -277,7 +338,7 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, bool sync)
+CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -293,6 +354,7 @@ CreateWalDirectoryMethod(const char *basedir, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
 
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index c1723d53b5..2cd8b6d755 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -41,7 +41,8 @@ struct WalWriteMethod
  *						   (only implements the methods required for pg_basebackup,
  *						   not all those required for pg_receivexlog)
  */
-WalWriteMethod *CreateWalDirectoryMethod(const char *basedir, bool sync);
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 int compression, bool sync);
 WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to