Attached patch adds support for -X stream to work with .tar and .tar.gz
file formats.

If tar mode is specified, a separate pg_xlog.tar (or .tar.gz) file is
created and the data is streamed into it. Regular mode is (should not) see
any changes in how it works.

The implementation creates a "walmethod" for directory and one for tar,
which is basically a set of function pointers that we pass around as part
of the StreamCtl structure. All calls to modify the files are sent through
the current method, using the normal open/read/write calls as it is now for
directories, and the more complicated method for tar and targz.

The tar method doesn't support all things that are required for
pg_receivexlog, but I don't think it makes any real sense to have support
for pg_receivexlog in tar mode. But it does support all the things that
pg_basebackup needs.

Some smaller pieces of functionality like unlinking files on failure and
padding files have been moved into the walmethod because they have to be
differently implemented (we cannot pre-pad a compressed file -- the size
will depend on the compression ration anyway -- for example).

AFAICT we never actually documented that -X stream doesn't work with tar in
the manpage of current versions. Only in the error message. We might want
to fix that in backbranches.

In passing this also fixes an XXX comment about not re-lseeking on the WAL
file all the time -- the walmethod now tracks the current position in the
file in a variable.

Finally, to make this work, the pring_tar_number() function is now exported
from port/tar.c along with the other ones already exported from there.

-- 
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index 03615da..981d201 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -180,7 +180,8 @@ PostgreSQL documentation
             target directory, the tar contents will be written to
             standard output, suitable for piping to for example
             <productname>gzip</productname>. This is only possible if
-            the cluster has no additional tablespaces.
+            the cluster has no additional tablespaces and transaction
+            log streaming is not used.
            </para>
            </listitem>
          </varlistentry>
@@ -323,6 +324,10 @@ PostgreSQL documentation
              If the log has been rotated when it's time to transfer it, the
              backup will fail and be unusable.
            </para>
+           <para>
+            The transaction log files will be written to
+             the <filename>base.tar</filename> file.
+           </para>
           </listitem>
          </varlistentry>
 
@@ -339,6 +344,9 @@ PostgreSQL documentation
              client can keep up with transaction log received, using this mode
              requires no extra transaction logs to be saved on the master.
            </para>
+           <para>The transactionn log files are written to a separate file
+            called <filename>pg_xlog.tar</filename>.
+           </para>
           </listitem>
          </varlistentry>
         </variablelist>
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index fa1ce8b..52ac9e9 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
 
-OBJS=receivelog.o streamutil.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES)
 
 all: pg_basebackup pg_receivexlog pg_recvlogical
 
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 351a420..58c0821 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -365,7 +365,7 @@ typedef struct
 {
 	PGconn	   *bgconn;
 	XLogRecPtr	startptr;
-	char		xlogdir[MAXPGPATH];
+	char		xlog[MAXPGPATH];	/* directory or tarfile depending on mode */
 	char	   *sysidentifier;
 	int			timeline;
 } logstreamer_param;
@@ -383,9 +383,13 @@ LogStreamerMain(logstreamer_param *param)
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
 	stream.mark_done = true;
-	stream.basedir = param->xlogdir;
 	stream.partial_suffix = NULL;
 
+	if (format == 'p')
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog);
+	else
+		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel);
+
 	if (!ReceiveXlogStream(param->bgconn, &stream))
 
 		/*
@@ -395,6 +399,14 @@ LogStreamerMain(logstreamer_param *param)
 		 */
 		return 1;
 
+	if (!stream.walmethod->finish())
+	{
+		fprintf(stderr,
+				_("%s: could not finish writing WAL files: %s\n"),
+				progname, strerror(errno));
+		return 1;
+	}
+
 	PQfinish(param->bgconn);
 	return 0;
 }
@@ -445,22 +457,25 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		/* Error message already written in GetConnection() */
 		exit(1);
 
-	snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
-
-	/*
-	 * Create pg_xlog/archive_status (and thus pg_xlog) so we can write to
-	 * basedir/pg_xlog as the directory entry in the tar file may arrive
-	 * later.
-	 */
-	snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
-			 basedir);
+	snprintf(param->xlog, sizeof(param->xlog), "%s/pg_xlog", basedir);
 
-	if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+	if (format == 'p')
 	{
-		fprintf(stderr,
-				_("%s: could not create directory \"%s\": %s\n"),
-				progname, statusdir, strerror(errno));
-		disconnect_and_exit(1);
+		/*
+		 * Create pg_xlog/archive_status (and thus pg_xlog) so we can write to
+		 * basedir/pg_xlog as the directory entry in the tar file may arrive
+		 * later.
+		 */
+		snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+				 basedir);
+
+		if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+		{
+			fprintf(stderr,
+					_("%s: could not create directory \"%s\": %s\n"),
+					progname, statusdir, strerror(errno));
+			disconnect_and_exit(1);
+		}
 	}
 
 	/*
@@ -2110,16 +2125,6 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
-	if (format != 'p' && streamwal)
-	{
-		fprintf(stderr,
-				_("%s: WAL streaming can only be used in plain mode\n"),
-				progname);
-		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
-				progname);
-		exit(1);
-	}
-
 	if (replication_slot && !streamwal)
 	{
 		fprintf(stderr,
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 7f7ee9d..9b4c101 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -337,11 +337,19 @@ StreamLog(void)
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.mark_done = false;
-	stream.basedir = basedir;
+	stream.walmethod = CreateWalDirectoryMethod(basedir);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
 
+	if (!stream.walmethod->finish())
+	{
+		fprintf(stderr,
+				_("%s: could not finish writing WAL files: %s\n"),
+				progname, strerror(errno));
+		return;
+	}
+
 	PQfinish(conn);
 	conn = NULL;
 }
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 062730b..9197eeb 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -26,7 +26,7 @@
 
 
 /* fd and filename for currently open WAL file */
-static int	walfile = -1;
+static Walfile *walfile = NULL;
 static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
@@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 				 XLogRecPtr *stoppos);
 static int	CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int	CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
-static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 					XLogRecPtr blockpos, int64 *last_status);
 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 				   XLogRecPtr *blockpos);
@@ -52,33 +52,33 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
 
 static bool
-mark_file_as_archived(const char *basedir, const char *fname)
+mark_file_as_archived(StreamCtl *stream, const char *fname)
 {
-	int			fd;
+	Walfile    *f;
 	static char tmppath[MAXPGPATH];
 
-	snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
-			 basedir, fname);
+	snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
+			 fname);
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
+	f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+	if (f == NULL)
 	{
 		fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (fsync(fd) != 0)
+	if (stream->walmethod->fsync(f) != 0)
 	{
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
 
-		close(fd);
+		stream->walmethod->close(f, CLOSE_UNLINK);
 
 		return false;
 	}
 
-	close(fd);
+	stream->walmethod->close(f, CLOSE_NORMAL);
 
 	return true;
 }
@@ -92,79 +92,65 @@ mark_file_as_archived(const char *basedir, const char *fname)
 static bool
 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
 {
-	int			f;
+	Walfile    *f;
 	char		fn[MAXPGPATH];
-	struct stat statbuf;
-	char	   *zerobuf;
-	int			bytes;
+	ssize_t		size;
 	XLogSegNo	segno;
 
 	XLByteToSeg(startpoint, segno);
 	XLogFileName(current_walfile_name, stream->timeline, segno);
 
-	snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+	snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
 			 stream->partial_suffix ? stream->partial_suffix : "");
-	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (f == -1)
-	{
-		fprintf(stderr,
-				_("%s: could not open transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		return false;
-	}
-
-	/*
-	 * Verify that the file is either empty (just created), or a complete
-	 * XLogSegSize segment. Anything in between indicates a corrupt file.
-	 */
-	if (fstat(f, &statbuf) != 0)
-	{
-		fprintf(stderr,
-				_("%s: could not stat transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		close(f);
-		return false;
-	}
-	if (statbuf.st_size == XLogSegSize)
-	{
-		/* File is open and ready to use */
-		walfile = f;
-		return true;
-	}
-	if (statbuf.st_size != 0)
-	{
-		fprintf(stderr,
-				_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
-				progname, fn, (int) statbuf.st_size, XLogSegSize);
-		close(f);
-		return false;
-	}
 
-	/* New, empty, file. So pad it to 16Mb with zeroes */
-	zerobuf = pg_malloc0(XLOG_BLCKSZ);
-	for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
+	if (stream->walmethod->existsfile(fn))
 	{
-		if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+		/*
+		 * Verify that the file is either empty (just created), or a complete
+		 * XLogSegSize segment. Anything in between indicates a corrupt file.
+		 */
+		size = stream->walmethod->get_file_size(fn);
+		if (size < 0)
+		{
+			fprintf(stderr,
+			_("%s: could not get size of transaction log file \"%s\": %s\n"),
+					progname, fn, stream->walmethod->getlasterror());
+			return false;
+		}
+		if (size == XLogSegSize)
+		{
+			/* Already padded file. Open it for use */
+			f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+			if (f == NULL)
+			{
+				fprintf(stderr,
+						_("%s: could not open existing transaction log file \"%s\": %s\n"),
+						progname, fn, stream->walmethod->getlasterror());
+				return false;
+			}
+			walfile = f;
+			return true;
+		}
+		if (size != 0)
 		{
 			fprintf(stderr,
-					_("%s: could not pad transaction log file \"%s\": %s\n"),
-					progname, fn, strerror(errno));
-			free(zerobuf);
-			close(f);
-			unlink(fn);
+					_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
+					progname, fn, (int) size, XLogSegSize);
 			return false;
 		}
 	}
-	free(zerobuf);
 
-	if (lseek(f, SEEK_SET, 0) != 0)
+	/* No file existed, so create one */
+
+	f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
+	if (f == NULL)
 	{
 		fprintf(stderr,
-				_("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		close(f);
+				_("%s: could not open transaction log file \"%s\": %s\n"),
+				progname, fn, stream->walmethod->getlasterror());
 		return false;
 	}
+
 	walfile = f;
 	return true;
 }
@@ -178,56 +164,50 @@ static bool
 close_walfile(StreamCtl *stream, XLogRecPtr pos)
 {
 	off_t		currpos;
+	int			r;
 
-	if (walfile == -1)
+	if (walfile == NULL)
 		return true;
 
-	currpos = lseek(walfile, 0, SEEK_CUR);
+	currpos = stream->walmethod->get_current_pos(walfile);
 	if (currpos == -1)
 	{
 		fprintf(stderr,
 			 _("%s: could not determine seek position in file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (fsync(walfile) != 0)
+	if (stream->walmethod->fsync(walfile) != 0)
 	{
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (close(walfile) != 0)
+	if (stream->partial_suffix)
 	{
-		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
-		walfile = -1;
-		return false;
+		if (currpos == XLOG_SEG_SIZE)
+			r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+		else
+		{
+			fprintf(stderr,
+					_("%s: not renaming \"%s%s\", segment is not complete\n"),
+					progname, current_walfile_name, stream->partial_suffix);
+			r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+		}
 	}
-	walfile = -1;
+	else
+		r = stream->walmethod->close(walfile, CLOSE_NORMAL);
 
-	/*
-	 * If we finished writing a .partial file, rename it into place.
-	 */
-	if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
-	{
-		char		oldfn[MAXPGPATH];
-		char		newfn[MAXPGPATH];
+	walfile = NULL;
 
-		snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
-		snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
-		if (rename(oldfn, newfn) != 0)
-		{
-			fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
-					progname, current_walfile_name, strerror(errno));
-			return false;
-		}
+	if (r != 0)
+	{
+		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
+		return false;
 	}
-	else if (stream->partial_suffix)
-		fprintf(stderr,
-				_("%s: not renaming \"%s%s\", segment is not complete\n"),
-				progname, current_walfile_name, stream->partial_suffix);
 
 	/*
 	 * Mark file as archived if requested by the caller - pg_basebackup needs
@@ -238,7 +218,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
 	if (currpos == XLOG_SEG_SIZE && stream->mark_done)
 	{
 		/* writes error message if failed */
-		if (!mark_file_as_archived(stream->basedir, current_walfile_name))
+		if (!mark_file_as_archived(stream, current_walfile_name))
 			return false;
 	}
 
@@ -253,9 +233,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
 static bool
 existsTimeLineHistoryFile(StreamCtl *stream)
 {
-	char		path[MAXPGPATH];
 	char		histfname[MAXFNAMELEN];
-	int			fd;
 
 	/*
 	 * Timeline 1 never has a history file. We treat that as if it existed,
@@ -266,31 +244,16 @@ existsTimeLineHistoryFile(StreamCtl *stream)
 
 	TLHistoryFileName(histfname, stream->timeline);
 
-	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-	fd = open(path, O_RDONLY | PG_BINARY, 0);
-	if (fd < 0)
-	{
-		if (errno != ENOENT)
-			fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
-					progname, path, strerror(errno));
-		return false;
-	}
-	else
-	{
-		close(fd);
-		return true;
-	}
+	return stream->walmethod->existsfile(histfname);
 }
 
 static bool
 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 {
 	int			size = strlen(content);
-	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
 	char		histfname[MAXFNAMELEN];
-	int			fd;
+	Walfile    *f;
 
 	/*
 	 * Check that the server's idea of how timeline history files should be
@@ -304,62 +267,39 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 		return false;
 	}
 
-	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-	/*
-	 * Write into a temp file name.
-	 */
-	snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
-
-	unlink(tmppath);
-
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
+	f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+	if (f == NULL)
 	{
 		fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, histfname, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	errno = 0;
-	if ((int) write(fd, content, size) != size)
+	if ((int) stream->walmethod->write(f, content, size) != size)
 	{
-		int			save_errno = errno;
+		fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
+				progname, histfname, stream->walmethod->getlasterror());
 
 		/*
 		 * If we fail to make the file, delete it to release disk space
 		 */
-		close(fd);
-		unlink(tmppath);
-		errno = save_errno;
+		stream->walmethod->close(f, CLOSE_UNLINK);
 
-		fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
 		return false;
 	}
 
-	if (fsync(fd) != 0)
+	if (stream->walmethod->fsync(f) != 0)
 	{
-		close(fd);
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
+		stream->walmethod->close(f, CLOSE_NORMAL);
 		return false;
 	}
 
-	if (close(fd) != 0)
+	if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
 	{
 		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
-		return false;
-	}
-
-	/*
-	 * Now move the completed history file into place with its final name.
-	 */
-	if (rename(tmppath, path) < 0)
-	{
-		fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
-				progname, tmppath, path, strerror(errno));
+				progname, histfname, stream->walmethod->getlasterror());
 		return false;
 	}
 
@@ -367,7 +307,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 	if (stream->mark_done)
 	{
 		/* writes error message if failed */
-		if (!mark_file_as_archived(stream->basedir, histfname))
+		if (!mark_file_as_archived(stream, histfname))
 			return false;
 	}
 
@@ -736,10 +676,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	}
 
 error:
-	if (walfile != -1 && close(walfile) != 0)
+	if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
 		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
-	walfile = -1;
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
+	walfile = NULL;
 	return false;
 }
 
@@ -823,12 +763,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		 * If synchronous option is true, issue sync command as soon as there
 		 * are WAL data which has not been flushed yet.
 		 */
-		if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
+		if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
 		{
-			if (fsync(walfile) != 0)
+			if (stream->walmethod->fsync(walfile) != 0)
 			{
 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-						progname, current_walfile_name, strerror(errno));
+						progname, current_walfile_name, stream->walmethod->getlasterror());
 				goto error;
 			}
 			lastFlushPosition = blockpos;
@@ -879,7 +819,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			/* Check the message type. */
 			if (copybuf[0] == 'k')
 			{
-				if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+				if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
 										 &last_status))
 					goto error;
 			}
@@ -1032,7 +972,7 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
  * Process the keepalive message.
  */
 static bool
-ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 					XLogRecPtr blockpos, int64 *last_status)
 {
 	int			pos;
@@ -1059,7 +999,7 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 	if (replyRequested && still_sending)
 	{
 		if (reportFlushPosition && lastFlushPosition < blockpos &&
-			walfile != -1)
+			walfile != NULL)
 		{
 			/*
 			 * If a valid flush location needs to be reported, flush the
@@ -1068,10 +1008,10 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 			 * data has been successfully replicated or not, at the normal
 			 * shutdown of the server.
 			 */
-			if (fsync(walfile) != 0)
+			if (stream->walmethod->fsync(walfile) != 0)
 			{
 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-						progname, current_walfile_name, strerror(errno));
+						progname, current_walfile_name, stream->walmethod->getlasterror());
 				return false;
 			}
 			lastFlushPosition = blockpos;
@@ -1129,7 +1069,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 	 * Verify that the initial location in the stream matches where we think
 	 * we are.
 	 */
-	if (walfile == -1)
+	if (walfile == NULL)
 	{
 		/* No file open yet */
 		if (xlogoff != 0)
@@ -1143,12 +1083,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 	else
 	{
 		/* More data in existing segment */
-		/* XXX: store seek value don't reseek all the time */
-		if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+		if (stream->walmethod->get_current_pos(walfile) != xlogoff)
 		{
 			fprintf(stderr,
 					_("%s: got WAL data offset %08x, expected %08x\n"),
-					progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+					progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
 			return false;
 		}
 	}
@@ -1169,7 +1108,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 		else
 			bytes_to_write = bytes_left;
 
-		if (walfile == -1)
+		if (walfile == NULL)
 		{
 			if (!open_walfile(stream, *blockpos))
 			{
@@ -1178,14 +1117,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 			}
 		}
 
-		if (write(walfile,
-				  copybuf + hdr_len + bytes_written,
-				  bytes_to_write) != bytes_to_write)
+		if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
+									 bytes_to_write) != bytes_to_write)
 		{
 			fprintf(stderr,
 				  _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
 					progname, bytes_to_write, current_walfile_name,
-					strerror(errno));
+					stream->walmethod->getlasterror());
 			return false;
 		}
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 554ff8b..e6db14a 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -13,6 +13,7 @@
 #define RECEIVELOG_H
 
 #include "libpq-fe.h"
+#include "walmethods.h"
 
 #include "access/xlogdefs.h"
 
@@ -39,7 +40,7 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
-	char	   *basedir;		/* Received segments written to this dir */
+	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
 } StreamCtl;
 
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
new file mode 100644
index 0000000..53fce28
--- /dev/null
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -0,0 +1,822 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.c - implementations of different ways to write received wal
+ *
+ * NOTE! The caller must ensure that only one method is instantiated in
+ *		 any given program, and that it's only instantiated once!
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/walmethods.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/stat.h>
+#include <time.h>
+#include <unistd.h>
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "pgtar.h"
+
+#include "receivelog.h"
+
+/* Size of zlib buffer for .tar.gz */
+#define ZLIB_OUT_SIZE 4096
+
+/*-------------------------------------------------------------------------
+ * WalDirectoryMethod - write wal to a directory looking like pg_xlog
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * Global static data for this method
+ */
+typedef struct DirectoryMethodData
+{
+	char	   *basedir;
+}	DirectoryMethodData;
+static DirectoryMethodData *dir_data = NULL;
+
+/*
+ * Local file handle
+ */
+typedef struct DirectoryMethodFile
+{
+	int			fd;
+	off_t		currpos;
+	char	   *pathname;
+	char	   *temp_suffix;
+}	DirectoryMethodFile;
+
+static char *
+dir_getlasterror(void)
+{
+	/* Directory method always sets errno, so just use strerror */
+	return strerror(errno);
+}
+
+static Walfile
+dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+	static char tmppath[MAXPGPATH];
+	int			fd;
+	DirectoryMethodFile *f;
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+
+	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		return NULL;
+
+	if (pad_to_size)
+	{
+		/* Always pre-pad on regular files */
+		char	   *zerobuf;
+		int			bytes;
+
+		zerobuf = pg_malloc0(XLOG_BLCKSZ);
+		for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
+		{
+			if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+			{
+				int			save_errno = errno;
+
+				pg_free(zerobuf);
+				close(fd);
+				errno = save_errno;
+				return NULL;
+			}
+		}
+		pg_free(zerobuf);
+
+		if (lseek(fd, 0, SEEK_SET) != 0)
+			return NULL;
+	}
+
+	f = pg_malloc0(sizeof(DirectoryMethodFile));
+	f->fd = fd;
+	f->currpos = 0;
+	f->pathname = pg_strdup(pathname);
+	if (temp_suffix)
+		f->temp_suffix = pg_strdup(temp_suffix);
+	return f;
+}
+
+static ssize_t
+dir_write(Walfile f, const void *buf, size_t count)
+{
+	ssize_t		r;
+	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+
+	Assert(f != NULL);
+
+	r = write(df->fd, buf, count);
+	if (r > 0)
+		df->currpos += r;
+	return r;
+}
+
+static off_t
+dir_get_current_pos(Walfile f)
+{
+	Assert(f != NULL);
+
+	/* Use a cached value to prevent lots of reseeks */
+	return ((DirectoryMethodFile *) f)->currpos;
+}
+
+static int
+dir_close(Walfile f, WalCloseMethod method)
+{
+	int			r;
+	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+	static char tmppath[MAXPGPATH];
+	static char tmppath2[MAXPGPATH];
+
+	Assert(f != NULL);
+
+	r = close(df->fd);
+
+	if (r == 0)
+	{
+		/* Build path to the current version of the file */
+		if (method == CLOSE_NORMAL && df->temp_suffix)
+		{
+			/* If we have a temp prefix, normal is we rename the file */
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+					 dir_data->basedir, df->pathname, df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
+					 dir_data->basedir, df->pathname);
+			r = rename(tmppath, tmppath2);
+		}
+		else if (method == CLOSE_UNLINK)
+		{
+			/* Unlink the file once it's closed */
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			r = unlink(tmppath);
+		}
+		/* else either CLOSE_NORMAL and no temp suffix, or CLOSE_NO_RENAME */
+	}
+
+	pg_free(df->pathname);
+	if (df->temp_suffix)
+		pg_free(df->temp_suffix);
+	pg_free(df);
+
+	return r;
+}
+
+static int
+dir_fsync(Walfile f)
+{
+	Assert(f != NULL);
+
+	return fsync(((DirectoryMethodFile *) f)->fd);
+}
+
+static ssize_t
+dir_get_file_size(const char *pathname)
+{
+	struct stat statbuf;
+	static char tmppath[MAXPGPATH];
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	if (stat(tmppath, &statbuf) != 0)
+		return -1;
+
+	return statbuf.st_size;
+}
+
+static int
+dir_unlink(const char *pathname)
+{
+	static char tmppath[MAXPGPATH];
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	return unlink(tmppath);
+}
+
+static bool
+dir_existsfile(const char *pathname)
+{
+	static char tmppath[MAXPGPATH];
+	int			fd;
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
+	if (fd < 0)
+		return false;
+	close(fd);
+	return true;
+}
+
+static bool
+dir_finish(void)
+{
+	/* No cleanup necessary */
+	return true;
+}
+
+
+WalWriteMethod *
+CreateWalDirectoryMethod(const char *basedir)
+{
+	WalWriteMethod *method;
+
+	method = pg_malloc0(sizeof(WalWriteMethod));
+	method->open_for_write = dir_open_for_write;
+	method->write = dir_write;
+	method->get_current_pos = dir_get_current_pos;
+	method->get_file_size = dir_get_file_size;
+	method->close = dir_close;
+	method->fsync = dir_fsync;
+	method->unlink = dir_unlink;
+	method->existsfile = dir_existsfile;
+	method->finish = dir_finish;
+	method->getlasterror = dir_getlasterror;
+
+	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->basedir = pg_strdup(basedir);
+
+	return method;
+}
+
+
+/*-------------------------------------------------------------------------
+ * WalTarMethod - write wal to a tar file containing pg_xlog contents
+ *-------------------------------------------------------------------------
+ */
+
+typedef struct TarMethodFile
+{
+	off_t		ofs_start;		/* Where does the *header* for this file start */
+	off_t		currpos;
+	char		header[512];
+	char	   *pathname;
+	size_t		pad_to_size;
+}	TarMethodFile;
+
+typedef struct TarMethodData
+{
+	char	   *tarfilename;
+	int			fd;
+	int			compression;
+	TarMethodFile *currentfile;
+	char		lasterror[1024];
+#ifdef HAVE_LIBZ
+	z_streamp	zp;
+	void	   *zlibOut;
+#endif
+}	TarMethodData;
+static TarMethodData *tar_data = NULL;
+
+#define tar_clear_error() tar_data->lasterror[0] = '\0'
+#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
+
+static char *
+tar_getlasterror(void)
+{
+	/*
+	 * If a custom error is set, return that one. Otherwise, assume errno is
+	 * set and return that one.
+	 */
+	if (tar_data->lasterror[0])
+		return tar_data->lasterror;
+	return strerror(errno);
+}
+
+#ifdef HAVE_LIBZ
+static bool
+tar_write_compressed_data(void *buf, size_t count, bool flush)
+{
+	tar_data->zp->next_in = buf;
+	tar_data->zp->avail_in = count;
+
+	while (tar_data->zp->avail_in || flush)
+	{
+		int			r;
+
+		r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
+		if (r == Z_STREAM_ERROR)
+		{
+			tar_set_error("deflate failed");
+			return false;
+		}
+
+		if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+		{
+			size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+			if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+				return false;
+
+			tar_data->zp->next_out = tar_data->zlibOut;
+			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+		}
+
+		if (r == Z_STREAM_END)
+			break;
+	}
+
+	if (flush)
+	{
+		/* Reset the stream for writing */
+		if (deflateReset(tar_data->zp) != Z_OK)
+		{
+			tar_set_error("deflateReset failed");
+			return false;
+		}
+	}
+
+	return true;
+}
+#endif
+
+static ssize_t
+tar_write(Walfile f, const void *buf, size_t count)
+{
+	ssize_t		r;
+
+	Assert(f != NULL);
+	tar_clear_error();
+
+	/* Tarfile will always be positioned at the end */
+	if (!tar_data->compression)
+	{
+		r = write(tar_data->fd, buf, count);
+		if (r > 0)
+			((TarMethodFile *) f)->currpos += r;
+		return r;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		if (!tar_write_compressed_data((void *) buf, count, false))
+			return -1;
+		((TarMethodFile *) f)->currpos += count;
+		return count;
+	}
+#endif
+}
+
+static bool
+tar_write_padding_data(TarMethodFile * f, size_t bytes)
+{
+	char	   *zerobuf = pg_malloc0(XLOG_BLCKSZ);
+	size_t		bytesleft = bytes;
+
+	while (bytesleft)
+	{
+		size_t		bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
+
+		size_t		r = tar_write(f, zerobuf, bytestowrite);
+
+		if (r < 0)
+			return false;
+		bytesleft -= r;
+	}
+	return true;
+}
+
+static Walfile
+tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+	int			save_errno;
+	static char tmppath[MAXPGPATH];
+
+	tar_clear_error();
+
+	if (tar_data->fd < 0)
+	{
+		/*
+		 * We open the tar file only when we first try to write to it.
+		 */
+		tar_data->fd = open(tar_data->tarfilename,
+						  O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+		if (tar_data->fd < 0)
+			return NULL;
+
+#ifdef HAVE_LIBZ
+		if (tar_data->compression)
+		{
+			tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+			tar_data->zp->zalloc = Z_NULL;
+			tar_data->zp->zfree = Z_NULL;
+			tar_data->zp->opaque = Z_NULL;
+			tar_data->zp->next_out = tar_data->zlibOut;
+			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+
+			if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
+			{
+				tar_set_error("deflateInit2 failed");
+				return NULL;
+			}
+		}
+#endif
+
+		/* There's no tar header itself, the file starts with regular files */
+	}
+
+	Assert(tar_data->currentfile == NULL);
+	if (tar_data->currentfile != NULL)
+	{
+		tar_set_error("implementation error: tar files can't have more than one open file\n");
+		return NULL;
+	}
+
+	tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+
+	snprintf(tmppath, sizeof(tmppath), "%s%s",
+			 pathname, temp_suffix ? temp_suffix : "");
+
+	/* Create a header with size set to 0 - we will fill out the size on close */
+	if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
+	{
+		pg_free(tar_data->currentfile);
+		tar_data->currentfile = NULL;
+		tar_set_error("could not create tar header");
+		return NULL;
+	}
+
+#ifdef HAVE_LIBZ
+	if (tar_data->compression)
+	{
+		/* Flush existing data */
+		if (!tar_write_compressed_data(NULL, 0, true))
+			return NULL;
+
+		/* Turn off compression for header */
+		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return NULL;
+		}
+	}
+#endif
+
+	tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
+	if (tar_data->currentfile->ofs_start == -1)
+	{
+		save_errno = errno;
+		pg_free(tar_data->currentfile);
+		tar_data->currentfile = NULL;
+		errno = save_errno;
+		return NULL;
+	}
+	tar_data->currentfile->currpos = 0;
+
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
+		{
+			save_errno = errno;
+			pg_free(tar_data->currentfile);
+			tar_data->currentfile = NULL;
+			errno = save_errno;
+			return NULL;
+		}
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		/* Write header through the zlib APIs but with no compression */
+		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+			return NULL;
+
+		/* Re-enable compression for the rest of the file */
+		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return NULL;
+		}
+	}
+#endif
+
+	tar_data->currentfile->pathname = pg_strdup(pathname);
+
+	/*
+	 * Uncompressed files are padded on creation, but for compression we can't
+	 * do that
+	 */
+	if (pad_to_size)
+	{
+		if (tar_data->compression)
+		{
+			tar_data->currentfile->pad_to_size = pad_to_size;
+		}
+		else
+		{
+			/* Uncompressed, so pad now */
+			tar_data->currentfile->pad_to_size = 0;
+			tar_write_padding_data(tar_data->currentfile, pad_to_size);
+			/* Seek back to start */
+			if (lseek(tar_data->fd, tar_data->currentfile->ofs_start, SEEK_SET) != tar_data->currentfile->ofs_start)
+				return NULL;
+
+			tar_data->currentfile->currpos = 0;
+		}
+	}
+
+	return tar_data->currentfile;
+}
+
+static ssize_t
+tar_get_file_size(const char *pathname)
+{
+	tar_clear_error();
+
+	/* Currently not used, so not supported */
+	errno = ENOSYS;
+	return -1;
+}
+
+static off_t
+tar_get_current_pos(Walfile f)
+{
+	Assert(f != NULL);
+	tar_clear_error();
+
+	return ((TarMethodFile *) f)->currpos;
+}
+
+static int
+tar_fsync(Walfile f)
+{
+	Assert(f != NULL);
+	tar_clear_error();
+
+	/*
+	 * Always sync the whole tarfile, because that's all we can do. This makes
+	 * no sense on compressed files, so just ignore those.
+	 */
+	if (tar_data->compression)
+		return 0;
+
+	return fsync(tar_data->fd);
+}
+
+static int
+tar_close(Walfile f, WalCloseMethod method)
+{
+	ssize_t		filesize;
+	int			padding;
+	TarMethodFile *tf = (TarMethodFile *) f;
+
+	Assert(f != NULL);
+	tar_clear_error();
+
+	if (method == CLOSE_UNLINK)
+	{
+		if (tar_data->compression)
+		{
+			tar_set_error("unlink not supported with compression");
+			return -1;
+		}
+
+		/*
+		 * Unlink the file that we just wrote to the tar. We do this by
+		 * truncating it to the start of the header. This is safe as we only
+		 * allow writing of the very last file.
+		 */
+		if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
+			return -1;
+
+		pg_free(tf->pathname);
+		pg_free(tf);
+		tar_data->currentfile = NULL;
+
+		return 0;
+	}
+
+	/*
+	 * Pad the file itself with zeroes if necessary. Note that this is
+	 * different from the tar format padding -- this is the padding we asked
+	 * for when the file was opened.
+	 */
+	if (tf->pad_to_size)
+	{
+		size_t		sizeleft = tf->pad_to_size - tf->currpos;
+
+		if (sizeleft)
+		{
+			if (!tar_write_padding_data(tf, sizeleft))
+				return -1;
+		}
+	}
+
+	/*
+	 * Get the size of the file, and pad the current data up to the nearest
+	 * 512 byte boundary.
+	 */
+	filesize = tar_get_current_pos(f);
+	padding = ((filesize + 511) & ~511) - filesize;
+	if (padding)
+	{
+		char		zerobuf[512];
+
+		MemSet(zerobuf, 0, padding);
+		if (tar_write(f, zerobuf, padding) != padding)
+			return -1;
+	}
+
+
+#ifdef HAVE_LIBZ
+	if (tar_data->compression)
+	{
+		/* Flush the current buffer */
+		if (!tar_write_compressed_data(NULL, 0, true))
+		{
+			errno = EINVAL;
+			return -1;
+		}
+	}
+#endif
+
+	/*
+	 * Now go back and update the header with the correct filesize and
+	 * possibly also renaming the file. We overwrite the entire current header
+	 * when done, including the checksum.
+	 */
+	print_tar_number(&(tf->header[124]), 12, filesize);
+
+	if (method == CLOSE_NORMAL)
+
+		/*
+		 * We overwrite it with what it was before if we have no tempname,
+		 * since we're going to write the buffer anyway.
+		 */
+		strlcpy(&(tf->header[0]), tf->pathname, 100);
+
+	print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
+	if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
+		return -1;
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, tf->header, 512) != 512)
+			return -1;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		/* Turn off compression */
+		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return -1;
+		}
+
+		/* Overwrite the header, assuming the size will be the same */
+		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+			return -1;
+
+		/* Turn compression back on */
+		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return -1;
+		}
+	}
+#endif
+
+	/* Move file pointer back down to end, so we can write the next file */
+	if (lseek(tar_data->fd, 0, SEEK_END) < 0)
+		return -1;
+
+	/* Always fsync on close, so the padding gets fsynced */
+	tar_fsync(f);
+
+	/* Clean up and done */
+	pg_free(tf->pathname);
+	pg_free(tf);
+	tar_data->currentfile = NULL;
+
+	return 0;
+}
+
+static int
+tar_unlink(const char *pathname)
+{
+	tar_clear_error();
+	errno = ENOSYS;
+	return -1;
+}
+
+static bool
+tar_existsfile(const char *pathname)
+{
+	tar_clear_error();
+	/* We only deal with new tarfiles, so nothing externally created exists */
+	return false;
+}
+
+static bool
+tar_finish(void)
+{
+	char		zerobuf[1024];
+
+	tar_clear_error();
+
+	if (tar_data->currentfile)
+	{
+		if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+			return false;
+	}
+
+	/* A tarfile always ends with two empty  blocks */
+	MemSet(zerobuf, 0, sizeof(zerobuf));
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
+			return false;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+			return false;
+
+		/* Also flush all data to make sure the gzip stream is finished */
+		tar_data->zp->next_in = NULL;
+		tar_data->zp->avail_in = 0;
+		while (true)
+		{
+			int			r;
+
+			r = deflate(tar_data->zp, Z_FINISH);
+
+			if (r == Z_STREAM_ERROR)
+			{
+				tar_set_error("deflate failed");
+				return false;
+			}
+			if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+			{
+				size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+				if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+					return false;
+			}
+			if (r == Z_STREAM_END)
+				break;
+		}
+
+		if (deflateEnd(tar_data->zp) != Z_OK)
+		{
+			tar_set_error("deflateEnd failed");
+			return false;
+		}
+	}
+#endif
+
+	/* sync the empty blocks as well, since they're after the last file */
+	fsync(tar_data->fd);
+
+	if (close(tar_data->fd) != 0)
+		return false;
+
+	tar_data->fd = -1;
+
+	return true;
+}
+
+WalWriteMethod *
+CreateWalTarMethod(const char *tarbase, int compression)
+{
+	WalWriteMethod *method;
+	const char *suffix = (compression > 0) ? ".tar.gz" : ".tar";
+
+	method = pg_malloc0(sizeof(WalWriteMethod));
+	method->open_for_write = tar_open_for_write;
+	method->write = tar_write;
+	method->get_current_pos = tar_get_current_pos;
+	method->get_file_size = tar_get_file_size;
+	method->close = tar_close;
+	method->fsync = tar_fsync;
+	method->unlink = tar_unlink;
+	method->existsfile = tar_existsfile;
+	method->finish = tar_finish;
+	method->getlasterror = tar_getlasterror;
+
+	tar_data = pg_malloc0(sizeof(TarMethodData));
+	tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+	sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
+	tar_data->fd = -1;
+	tar_data->compression = compression;
+	if (compression)
+		tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+
+	return method;
+}
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
new file mode 100644
index 0000000..9922cfd
--- /dev/null
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -0,0 +1,46 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.h
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/walmethods.h
+ *-------------------------------------------------------------------------
+ */
+
+
+typedef void *Walfile;
+
+typedef enum
+{
+	CLOSE_NORMAL,
+	CLOSE_UNLINK,
+	CLOSE_NO_RENAME,
+}	WalCloseMethod;
+
+typedef struct WalWriteMethod WalWriteMethod;
+struct WalWriteMethod
+{
+	Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+	int			(*close) (Walfile f, WalCloseMethod method);
+	int			(*unlink) (const char *pathname);
+	bool		(*existsfile) (const char *pathname);
+	ssize_t		(*get_file_size) (const char *pathname);
+
+	ssize_t		(*write) (Walfile f, const void *buf, size_t count);
+	off_t		(*get_current_pos) (Walfile f);
+	int			(*fsync) (Walfile f);
+	bool		(*finish) (void);
+	char	   *(*getlasterror) (void);
+};
+
+/*
+ * Available WAL methods:
+ *	- WalDirectoryMethod - write WAL to regular files in a standard pg_xlog
+ *	- TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog
+ *						   (only implements the methods required for pg_basebackup,
+ *						   not all those required for pg_receivexlog)
+ */
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression);
diff --git a/src/include/pgtar.h b/src/include/pgtar.h
index 45ca400..1d179f0 100644
--- a/src/include/pgtar.h
+++ b/src/include/pgtar.h
@@ -22,4 +22,5 @@ enum tarError
 extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget,
 			  pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime);
 extern uint64 read_tar_number(const char *s, int len);
+extern void print_tar_number(char *s, int len, uint64 val);
 extern int	tarChecksum(char *header);
diff --git a/src/port/tar.c b/src/port/tar.c
index 52a2113..f1da959 100644
--- a/src/port/tar.c
+++ b/src/port/tar.c
@@ -16,7 +16,7 @@
  * support only non-negative numbers, so we don't worry about the GNU rules
  * for handling negative numbers.)
  */
-static void
+void
 print_tar_number(char *s, int len, uint64 val)
 {
 	if (val < (((uint64) 1) << ((len - 1) * 3)))
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to