On Mon, Sep 5, 2016 at 10:01 AM, Michael Paquier <michael.paqu...@gmail.com>
wrote:

> On Sat, Sep 3, 2016 at 10:35 PM, Magnus Hagander <mag...@hagander.net>
> wrote:
> > Ugh. That would be nice to have, but I think that's outside the scope of
> > this patch.
>
> A test for this patch that could have value would be to use
> pg_basebackup -X stream -Ft, then untar pg_xlog.tar and look at the
> size of the segments. If you have an idea to untar something without
> the in-core perl support because we need to have the TAP stuff able to
> run on at least 5.8.8, I'd welcome an idea. Honestly I still have
> none, and that's why the recovery tests do not use tarballs in their
> tests when using pg_basebackup. In short let's not add something more
> for this patch.
>
> > PFA is an updated version of this patch that:
> > * documents a magic value passed to zlib (which is in their
> documentation as
> > being a magic value, but has no define)
> > * fixes the padding of tar files
> > * adds a most basic test that the -X stream -Ft does produce a tarfile
>
> So I had a more serious look at this patch, and it basically makes
> more generic the operations done for the plain mode by adding a set of
> routines that can be used by both tar and plain mode to work on the
> WAL files streamed. Elegant.
>
> +           <para>
> +            The transaction log files will be written to
> +             the <filename>base.tar</filename> file.
> +           </para>
> Nit: number of spaces here.
>

Fixed.



> -mark_file_as_archived(const char *basedir, const char *fname)
> +mark_file_as_archived(StreamCtl *stream, const char *fname)
> Just passing WalMethod as argument would be enough, but... My patch
> adding the fsync calls to pg_basebackup could just make use of
> StreamCtl, so let's keep it as you suggest.
>

Yeah, I think it's cleaner to pass the whole structure around really. If
not now, we'd need it eventually. That makes all callers more consistent.



>  static bool
>  existsTimeLineHistoryFile(StreamCtl *stream)
>  {
> [...]
> +   return stream->walmethod->existsfile(histfname);
>  }
> existsfile returns always false for the tar method. This does not
> matter much because pg_basebackup exists immediately in case of a
> failure, but I think that this deserves a comment in ReceiveXlogStream
> where existsTimeLineHistoryFile is called.
>

OK, added. As you say, the behaviour is expected, but it makes sense to
mention it clealry there.



> I find the use of existsfile() in open_walfile() rather confusing
> because this relies on the fact  that existsfile() returns always
> false for the tar mode. We could add an additional field in WalMethod
> to store the method type and use that more, but that may make the code
> more confusing than what you propose. What do you think?
>

Yeah, I'm not sure that helps. The point is that the abstraction is
supposed to take care of that. But if it's confusing, then clearly a
comment is warranted there, so I've added that. Do you think that makes it
clear enough?



>
> +   int         (*unlink) (const char *pathname);
> The unlink method is used nowhere. This could just be removed.
>

That's clearly a missed cleanup. Removed, thanks.



> -static void
> +void
>  print_tar_number(char *s, int len, uint64 val)
> This could be an independent patch. Or not.
>

Could be, but we don't really have any other uses for it.



> I think that I found another bug regarding the contents of the
> segments. I did pg_basebackup -F t -X stream, then untared pg_xlog.tar
> which contained segment 1/0/2, then:
> $ pg_xlogdump 000000010000000000000002
> pg_xlogdump: FATAL:  could not find a valid record after 0/2000000
> I'd expect this segment to have records, up to a XLOG_SWITCH record.
>

Ugh. That's definitely broken yes. It seeked back and overwrote the tar
header with the data, instead of starting where the file part was supposed
to be. It worked fine on compressed files, and it's when implementing that
that it broke.

So what's our basic rule for these perl tests - are we allowed to use
pg_xlogdump from within a pg_basebackup test? If so that could actually be
a useful test - do the backup, extract the xlog and verify that it contains
the SWITCH record.


I also noticed that using -Z5 created a .tar.gz and -z created a .tar
(which was compressed).  Because compresslevel is set to -1 with -z,
meaning default.


Again, apologies for getting late back into the game here.

//Magnus
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index 9f1eae1..a4236a5 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -180,7 +180,8 @@ PostgreSQL documentation
             target directory, the tar contents will be written to
             standard output, suitable for piping to for example
             <productname>gzip</productname>. This is only possible if
-            the cluster has no additional tablespaces.
+            the cluster has no additional tablespaces and transaction
+            log streaming is not used.
            </para>
            </listitem>
          </varlistentry>
@@ -323,6 +324,10 @@ PostgreSQL documentation
              If the log has been rotated when it's time to transfer it, the
              backup will fail and be unusable.
            </para>
+           <para>
+            The transaction log files will be written to
+            the <filename>base.tar</filename> file.
+           </para>
           </listitem>
          </varlistentry>
 
@@ -339,6 +344,9 @@ PostgreSQL documentation
              client can keep up with transaction log received, using this mode
              requires no extra transaction logs to be saved on the master.
            </para>
+           <para>The transactionn log files are written to a separate file
+            called <filename>pg_xlog.tar</filename>.
+           </para>
           </listitem>
          </varlistentry>
         </variablelist>
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index fa1ce8b..52ac9e9 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -19,7 +19,7 @@ include $(top_builddir)/src/Makefile.global
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS += -L$(top_builddir)/src/fe_utils -lpgfeutils -lpq
 
-OBJS=receivelog.o streamutil.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o walmethods.o $(WIN32RES)
 
 all: pg_basebackup pg_receivexlog pg_recvlogical
 
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 42f3b27..467d4fe 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -438,7 +438,7 @@ typedef struct
 {
 	PGconn	   *bgconn;
 	XLogRecPtr	startptr;
-	char		xlogdir[MAXPGPATH];
+	char		xlog[MAXPGPATH];	/* directory or tarfile depending on mode */
 	char	   *sysidentifier;
 	int			timeline;
 } logstreamer_param;
@@ -458,9 +458,13 @@ LogStreamerMain(logstreamer_param *param)
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = false;
 	stream.mark_done = true;
-	stream.basedir = param->xlogdir;
 	stream.partial_suffix = NULL;
 
+	if (format == 'p')
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog);
+	else
+		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel);
+
 	if (!ReceiveXlogStream(param->bgconn, &stream))
 
 		/*
@@ -470,6 +474,14 @@ LogStreamerMain(logstreamer_param *param)
 		 */
 		return 1;
 
+	if (!stream.walmethod->finish())
+	{
+		fprintf(stderr,
+				_("%s: could not finish writing WAL files: %s\n"),
+				progname, strerror(errno));
+		return 1;
+	}
+
 	PQfinish(param->bgconn);
 	return 0;
 }
@@ -520,22 +532,25 @@ StartLogStreamer(char *startpos, uint32 timeline, char *sysidentifier)
 		/* Error message already written in GetConnection() */
 		exit(1);
 
-	snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
-
-	/*
-	 * Create pg_xlog/archive_status (and thus pg_xlog) so we can write to
-	 * basedir/pg_xlog as the directory entry in the tar file may arrive
-	 * later.
-	 */
-	snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
-			 basedir);
+	snprintf(param->xlog, sizeof(param->xlog), "%s/pg_xlog", basedir);
 
-	if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+	if (format == 'p')
 	{
-		fprintf(stderr,
-				_("%s: could not create directory \"%s\": %s\n"),
-				progname, statusdir, strerror(errno));
-		disconnect_and_exit(1);
+		/*
+		 * Create pg_xlog/archive_status (and thus pg_xlog) so we can write to
+		 * basedir/pg_xlog as the directory entry in the tar file may arrive
+		 * later.
+		 */
+		snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+				 basedir);
+
+		if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+		{
+			fprintf(stderr,
+					_("%s: could not create directory \"%s\": %s\n"),
+					progname, statusdir, strerror(errno));
+			disconnect_and_exit(1);
+		}
 	}
 
 	/*
@@ -2195,16 +2210,6 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
-	if (format != 'p' && streamwal)
-	{
-		fprintf(stderr,
-				_("%s: WAL streaming can only be used in plain mode\n"),
-				progname);
-		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
-				progname);
-		exit(1);
-	}
-
 	if (replication_slot && !streamwal)
 	{
 		fprintf(stderr,
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 7f7ee9d..9b4c101 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -337,11 +337,19 @@ StreamLog(void)
 	stream.standby_message_timeout = standby_message_timeout;
 	stream.synchronous = synchronous;
 	stream.mark_done = false;
-	stream.basedir = basedir;
+	stream.walmethod = CreateWalDirectoryMethod(basedir);
 	stream.partial_suffix = ".partial";
 
 	ReceiveXlogStream(conn, &stream);
 
+	if (!stream.walmethod->finish())
+	{
+		fprintf(stderr,
+				_("%s: could not finish writing WAL files: %s\n"),
+				progname, strerror(errno));
+		return;
+	}
+
 	PQfinish(conn);
 	conn = NULL;
 }
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index 062730b..481944f 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -26,7 +26,7 @@
 
 
 /* fd and filename for currently open WAL file */
-static int	walfile = -1;
+static Walfile *walfile = NULL;
 static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
@@ -37,7 +37,7 @@ static PGresult *HandleCopyStream(PGconn *conn, StreamCtl *stream,
 				 XLogRecPtr *stoppos);
 static int	CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int	CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
-static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+static bool ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 					XLogRecPtr blockpos, int64 *last_status);
 static bool ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 				   XLogRecPtr *blockpos);
@@ -52,33 +52,33 @@ static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
 
 static bool
-mark_file_as_archived(const char *basedir, const char *fname)
+mark_file_as_archived(StreamCtl *stream, const char *fname)
 {
-	int			fd;
+	Walfile    *f;
 	static char tmppath[MAXPGPATH];
 
-	snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
-			 basedir, fname);
+	snprintf(tmppath, sizeof(tmppath), "archive_status/%s.done",
+			 fname);
 
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
+	f = stream->walmethod->open_for_write(tmppath, NULL, 0);
+	if (f == NULL)
 	{
 		fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (fsync(fd) != 0)
+	if (stream->walmethod->fsync(f) != 0)
 	{
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
 
-		close(fd);
+		stream->walmethod->close(f, CLOSE_UNLINK);
 
 		return false;
 	}
 
-	close(fd);
+	stream->walmethod->close(f, CLOSE_NORMAL);
 
 	return true;
 }
@@ -92,79 +92,70 @@ mark_file_as_archived(const char *basedir, const char *fname)
 static bool
 open_walfile(StreamCtl *stream, XLogRecPtr startpoint)
 {
-	int			f;
+	Walfile    *f;
 	char		fn[MAXPGPATH];
-	struct stat statbuf;
-	char	   *zerobuf;
-	int			bytes;
+	ssize_t		size;
 	XLogSegNo	segno;
 
 	XLByteToSeg(startpoint, segno);
 	XLogFileName(current_walfile_name, stream->timeline, segno);
 
-	snprintf(fn, sizeof(fn), "%s/%s%s", stream->basedir, current_walfile_name,
+	snprintf(fn, sizeof(fn), "%s%s", current_walfile_name,
 			 stream->partial_suffix ? stream->partial_suffix : "");
-	f = open(fn, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (f == -1)
-	{
-		fprintf(stderr,
-				_("%s: could not open transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		return false;
-	}
 
 	/*
-	 * Verify that the file is either empty (just created), or a complete
-	 * XLogSegSize segment. Anything in between indicates a corrupt file.
+	 * When streaming to files, if an existing file exists we verify that it's
+	 * either empty (just created), or a complete XLogSegSize segment (in
+	 * which case it has been created and padded). Anything else indicates a
+	 * corrupt file.
+	 *
+	 * When streaming to tar, no file with this name will exist before, so we
+	 * never have to verify a size.
 	 */
-	if (fstat(f, &statbuf) != 0)
-	{
-		fprintf(stderr,
-				_("%s: could not stat transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		close(f);
-		return false;
-	}
-	if (statbuf.st_size == XLogSegSize)
-	{
-		/* File is open and ready to use */
-		walfile = f;
-		return true;
-	}
-	if (statbuf.st_size != 0)
-	{
-		fprintf(stderr,
-				_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
-				progname, fn, (int) statbuf.st_size, XLogSegSize);
-		close(f);
-		return false;
-	}
-
-	/* New, empty, file. So pad it to 16Mb with zeroes */
-	zerobuf = pg_malloc0(XLOG_BLCKSZ);
-	for (bytes = 0; bytes < XLogSegSize; bytes += XLOG_BLCKSZ)
+	if (stream->walmethod->existsfile(fn))
 	{
-		if (write(f, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+		size = stream->walmethod->get_file_size(fn);
+		if (size < 0)
 		{
 			fprintf(stderr,
-					_("%s: could not pad transaction log file \"%s\": %s\n"),
-					progname, fn, strerror(errno));
-			free(zerobuf);
-			close(f);
-			unlink(fn);
+			_("%s: could not get size of transaction log file \"%s\": %s\n"),
+					progname, fn, stream->walmethod->getlasterror());
+			return false;
+		}
+		if (size == XLogSegSize)
+		{
+			/* Already padded file. Open it for use */
+			f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, 0);
+			if (f == NULL)
+			{
+				fprintf(stderr,
+						_("%s: could not open existing transaction log file \"%s\": %s\n"),
+						progname, fn, stream->walmethod->getlasterror());
+				return false;
+			}
+			walfile = f;
+			return true;
+		}
+		if (size != 0)
+		{
+			fprintf(stderr,
+					_("%s: transaction log file \"%s\" has %d bytes, should be 0 or %d\n"),
+					progname, fn, (int) size, XLogSegSize);
 			return false;
 		}
 	}
-	free(zerobuf);
 
-	if (lseek(f, SEEK_SET, 0) != 0)
+	/* No file existed, so create one */
+
+	f = stream->walmethod->open_for_write(current_walfile_name, stream->partial_suffix, XLogSegSize);
+	if (f == NULL)
 	{
 		fprintf(stderr,
-				_("%s: could not seek to beginning of transaction log file \"%s\": %s\n"),
-				progname, fn, strerror(errno));
-		close(f);
+				_("%s: could not open transaction log file \"%s\": %s\n"),
+				progname, fn, stream->walmethod->getlasterror());
 		return false;
 	}
+
 	walfile = f;
 	return true;
 }
@@ -178,56 +169,50 @@ static bool
 close_walfile(StreamCtl *stream, XLogRecPtr pos)
 {
 	off_t		currpos;
+	int			r;
 
-	if (walfile == -1)
+	if (walfile == NULL)
 		return true;
 
-	currpos = lseek(walfile, 0, SEEK_CUR);
+	currpos = stream->walmethod->get_current_pos(walfile);
 	if (currpos == -1)
 	{
 		fprintf(stderr,
 			 _("%s: could not determine seek position in file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (fsync(walfile) != 0)
+	if (stream->walmethod->fsync(walfile) != 0)
 	{
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	if (close(walfile) != 0)
+	if (stream->partial_suffix)
 	{
-		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
-		walfile = -1;
-		return false;
+		if (currpos == XLOG_SEG_SIZE)
+			r = stream->walmethod->close(walfile, CLOSE_NORMAL);
+		else
+		{
+			fprintf(stderr,
+					_("%s: not renaming \"%s%s\", segment is not complete\n"),
+					progname, current_walfile_name, stream->partial_suffix);
+			r = stream->walmethod->close(walfile, CLOSE_NO_RENAME);
+		}
 	}
-	walfile = -1;
+	else
+		r = stream->walmethod->close(walfile, CLOSE_NORMAL);
 
-	/*
-	 * If we finished writing a .partial file, rename it into place.
-	 */
-	if (currpos == XLOG_SEG_SIZE && stream->partial_suffix)
-	{
-		char		oldfn[MAXPGPATH];
-		char		newfn[MAXPGPATH];
+	walfile = NULL;
 
-		snprintf(oldfn, sizeof(oldfn), "%s/%s%s", stream->basedir, current_walfile_name, stream->partial_suffix);
-		snprintf(newfn, sizeof(newfn), "%s/%s", stream->basedir, current_walfile_name);
-		if (rename(oldfn, newfn) != 0)
-		{
-			fprintf(stderr, _("%s: could not rename file \"%s\": %s\n"),
-					progname, current_walfile_name, strerror(errno));
-			return false;
-		}
+	if (r != 0)
+	{
+		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
+		return false;
 	}
-	else if (stream->partial_suffix)
-		fprintf(stderr,
-				_("%s: not renaming \"%s%s\", segment is not complete\n"),
-				progname, current_walfile_name, stream->partial_suffix);
 
 	/*
 	 * Mark file as archived if requested by the caller - pg_basebackup needs
@@ -238,7 +223,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
 	if (currpos == XLOG_SEG_SIZE && stream->mark_done)
 	{
 		/* writes error message if failed */
-		if (!mark_file_as_archived(stream->basedir, current_walfile_name))
+		if (!mark_file_as_archived(stream, current_walfile_name))
 			return false;
 	}
 
@@ -253,9 +238,7 @@ close_walfile(StreamCtl *stream, XLogRecPtr pos)
 static bool
 existsTimeLineHistoryFile(StreamCtl *stream)
 {
-	char		path[MAXPGPATH];
 	char		histfname[MAXFNAMELEN];
-	int			fd;
 
 	/*
 	 * Timeline 1 never has a history file. We treat that as if it existed,
@@ -266,31 +249,16 @@ existsTimeLineHistoryFile(StreamCtl *stream)
 
 	TLHistoryFileName(histfname, stream->timeline);
 
-	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-	fd = open(path, O_RDONLY | PG_BINARY, 0);
-	if (fd < 0)
-	{
-		if (errno != ENOENT)
-			fprintf(stderr, _("%s: could not open timeline history file \"%s\": %s\n"),
-					progname, path, strerror(errno));
-		return false;
-	}
-	else
-	{
-		close(fd);
-		return true;
-	}
+	return stream->walmethod->existsfile(histfname);
 }
 
 static bool
 writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 {
 	int			size = strlen(content);
-	char		path[MAXPGPATH];
 	char		tmppath[MAXPGPATH];
 	char		histfname[MAXFNAMELEN];
-	int			fd;
+	Walfile    *f;
 
 	/*
 	 * Check that the server's idea of how timeline history files should be
@@ -304,62 +272,39 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 		return false;
 	}
 
-	snprintf(path, sizeof(path), "%s/%s", stream->basedir, histfname);
-
-	/*
-	 * Write into a temp file name.
-	 */
-	snprintf(tmppath, MAXPGPATH, "%s.tmp", path);
-
-	unlink(tmppath);
-
-	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
-	if (fd < 0)
+	f = stream->walmethod->open_for_write(histfname, ".tmp", 0);
+	if (f == NULL)
 	{
 		fprintf(stderr, _("%s: could not create timeline history file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, histfname, stream->walmethod->getlasterror());
 		return false;
 	}
 
-	errno = 0;
-	if ((int) write(fd, content, size) != size)
+	if ((int) stream->walmethod->write(f, content, size) != size)
 	{
-		int			save_errno = errno;
+		fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
+				progname, histfname, stream->walmethod->getlasterror());
 
 		/*
 		 * If we fail to make the file, delete it to release disk space
 		 */
-		close(fd);
-		unlink(tmppath);
-		errno = save_errno;
+		stream->walmethod->close(f, CLOSE_UNLINK);
 
-		fprintf(stderr, _("%s: could not write timeline history file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
 		return false;
 	}
 
-	if (fsync(fd) != 0)
+	if (stream->walmethod->fsync(f) != 0)
 	{
-		close(fd);
 		fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
+				progname, tmppath, stream->walmethod->getlasterror());
+		stream->walmethod->close(f, CLOSE_NORMAL);
 		return false;
 	}
 
-	if (close(fd) != 0)
+	if (stream->walmethod->close(f, CLOSE_NORMAL) != 0)
 	{
 		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, tmppath, strerror(errno));
-		return false;
-	}
-
-	/*
-	 * Now move the completed history file into place with its final name.
-	 */
-	if (rename(tmppath, path) < 0)
-	{
-		fprintf(stderr, _("%s: could not rename file \"%s\" to \"%s\": %s\n"),
-				progname, tmppath, path, strerror(errno));
+				progname, histfname, stream->walmethod->getlasterror());
 		return false;
 	}
 
@@ -367,7 +312,7 @@ writeTimeLineHistoryFile(StreamCtl *stream, char *filename, char *content)
 	if (stream->mark_done)
 	{
 		/* writes error message if failed */
-		if (!mark_file_as_archived(stream->basedir, histfname))
+		if (!mark_file_as_archived(stream, histfname))
 			return false;
 	}
 
@@ -577,7 +522,9 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	{
 		/*
 		 * Fetch the timeline history file for this timeline, if we don't have
-		 * it already.
+		 * it already. When streaming log to tar, this will always return
+		 * false, as we are never streaming into an existing file and therefor
+		 * there can be no pre-existing timeline history file.
 		 */
 		if (!existsTimeLineHistoryFile(stream))
 		{
@@ -736,10 +683,10 @@ ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
 	}
 
 error:
-	if (walfile != -1 && close(walfile) != 0)
+	if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NORMAL) != 0)
 		fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),
-				progname, current_walfile_name, strerror(errno));
-	walfile = -1;
+		  progname, current_walfile_name, stream->walmethod->getlasterror());
+	walfile = NULL;
 	return false;
 }
 
@@ -823,12 +770,12 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 		 * If synchronous option is true, issue sync command as soon as there
 		 * are WAL data which has not been flushed yet.
 		 */
-		if (stream->synchronous && lastFlushPosition < blockpos && walfile != -1)
+		if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL)
 		{
-			if (fsync(walfile) != 0)
+			if (stream->walmethod->fsync(walfile) != 0)
 			{
 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-						progname, current_walfile_name, strerror(errno));
+						progname, current_walfile_name, stream->walmethod->getlasterror());
 				goto error;
 			}
 			lastFlushPosition = blockpos;
@@ -879,7 +826,7 @@ HandleCopyStream(PGconn *conn, StreamCtl *stream,
 			/* Check the message type. */
 			if (copybuf[0] == 'k')
 			{
-				if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+				if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,
 										 &last_status))
 					goto error;
 			}
@@ -1032,7 +979,7 @@ CopyStreamReceive(PGconn *conn, long timeout, char **buffer)
  * Process the keepalive message.
  */
 static bool
-ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
+ProcessKeepaliveMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 					XLogRecPtr blockpos, int64 *last_status)
 {
 	int			pos;
@@ -1059,7 +1006,7 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 	if (replyRequested && still_sending)
 	{
 		if (reportFlushPosition && lastFlushPosition < blockpos &&
-			walfile != -1)
+			walfile != NULL)
 		{
 			/*
 			 * If a valid flush location needs to be reported, flush the
@@ -1068,10 +1015,10 @@ ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 			 * data has been successfully replicated or not, at the normal
 			 * shutdown of the server.
 			 */
-			if (fsync(walfile) != 0)
+			if (stream->walmethod->fsync(walfile) != 0)
 			{
 				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
-						progname, current_walfile_name, strerror(errno));
+						progname, current_walfile_name, stream->walmethod->getlasterror());
 				return false;
 			}
 			lastFlushPosition = blockpos;
@@ -1129,7 +1076,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 	 * Verify that the initial location in the stream matches where we think
 	 * we are.
 	 */
-	if (walfile == -1)
+	if (walfile == NULL)
 	{
 		/* No file open yet */
 		if (xlogoff != 0)
@@ -1143,12 +1090,11 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 	else
 	{
 		/* More data in existing segment */
-		/* XXX: store seek value don't reseek all the time */
-		if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+		if (stream->walmethod->get_current_pos(walfile) != xlogoff)
 		{
 			fprintf(stderr,
 					_("%s: got WAL data offset %08x, expected %08x\n"),
-					progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+					progname, xlogoff, (int) stream->walmethod->get_current_pos(walfile));
 			return false;
 		}
 	}
@@ -1169,7 +1115,7 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 		else
 			bytes_to_write = bytes_left;
 
-		if (walfile == -1)
+		if (walfile == NULL)
 		{
 			if (!open_walfile(stream, *blockpos))
 			{
@@ -1178,14 +1124,13 @@ ProcessXLogDataMsg(PGconn *conn, StreamCtl *stream, char *copybuf, int len,
 			}
 		}
 
-		if (write(walfile,
-				  copybuf + hdr_len + bytes_written,
-				  bytes_to_write) != bytes_to_write)
+		if (stream->walmethod->write(walfile, copybuf + hdr_len + bytes_written,
+									 bytes_to_write) != bytes_to_write)
 		{
 			fprintf(stderr,
 				  _("%s: could not write %u bytes to WAL file \"%s\": %s\n"),
 					progname, bytes_to_write, current_walfile_name,
-					strerror(errno));
+					stream->walmethod->getlasterror());
 			return false;
 		}
 
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index 554ff8b..e6db14a 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -13,6 +13,7 @@
 #define RECEIVELOG_H
 
 #include "libpq-fe.h"
+#include "walmethods.h"
 
 #include "access/xlogdefs.h"
 
@@ -39,7 +40,7 @@ typedef struct StreamCtl
 
 	stream_stop_callback stream_stop;	/* Stop streaming when returns true */
 
-	char	   *basedir;		/* Received segments written to this dir */
+	WalWriteMethod *walmethod;	/* How to write the WAL */
 	char	   *partial_suffix; /* Suffix appended to partially received files */
 } StreamCtl;
 
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index fd9857d..de4631e 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -4,7 +4,7 @@ use Cwd;
 use Config;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 54;
+use Test::More tests => 56;
 
 program_help_ok('pg_basebackup');
 program_version_ok('pg_basebackup');
@@ -197,6 +197,10 @@ $node->command_ok(
 	'pg_basebackup -X stream runs');
 ok(grep(/^[0-9A-F]{24}$/, slurp_dir("$tempdir/backupxf/pg_xlog")),
 	'WAL files copied');
+$node->command_ok(
+	[ 'pg_basebackup', '-D', "$tempdir/backupxst", '-X', 'stream', '-Ft' ],
+	'pg_basebackup -X stream runs in tar mode');
+ok(-f "$tempdir/backupxst/pg_xlog.tar");
 
 $node->command_fails(
 	[ 'pg_basebackup', '-D', "$tempdir/fail", '-S', 'slot1' ],
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
new file mode 100644
index 0000000..c1c574b
--- /dev/null
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -0,0 +1,818 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.c - implementations of different ways to write received wal
+ *
+ * NOTE! The caller must ensure that only one method is instantiated in
+ *		 any given program, and that it's only instantiated once!
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/walmethods.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <sys/stat.h>
+#include <time.h>
+#include <unistd.h>
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "pgtar.h"
+
+#include "receivelog.h"
+
+/* Size of zlib buffer for .tar.gz */
+#define ZLIB_OUT_SIZE 4096
+
+/*-------------------------------------------------------------------------
+ * WalDirectoryMethod - write wal to a directory looking like pg_xlog
+ *-------------------------------------------------------------------------
+ */
+
+/*
+ * Global static data for this method
+ */
+typedef struct DirectoryMethodData
+{
+	char	   *basedir;
+}	DirectoryMethodData;
+static DirectoryMethodData *dir_data = NULL;
+
+/*
+ * Local file handle
+ */
+typedef struct DirectoryMethodFile
+{
+	int			fd;
+	off_t		currpos;
+	char	   *pathname;
+	char	   *temp_suffix;
+}	DirectoryMethodFile;
+
+static char *
+dir_getlasterror(void)
+{
+	/* Directory method always sets errno, so just use strerror */
+	return strerror(errno);
+}
+
+static Walfile
+dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+	static char tmppath[MAXPGPATH];
+	int			fd;
+	DirectoryMethodFile *f;
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+			 dir_data->basedir, pathname, temp_suffix ? temp_suffix : "");
+
+	fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+	if (fd < 0)
+		return NULL;
+
+	if (pad_to_size)
+	{
+		/* Always pre-pad on regular files */
+		char	   *zerobuf;
+		int			bytes;
+
+		zerobuf = pg_malloc0(XLOG_BLCKSZ);
+		for (bytes = 0; bytes < pad_to_size; bytes += XLOG_BLCKSZ)
+		{
+			if (write(fd, zerobuf, XLOG_BLCKSZ) != XLOG_BLCKSZ)
+			{
+				int			save_errno = errno;
+
+				pg_free(zerobuf);
+				close(fd);
+				errno = save_errno;
+				return NULL;
+			}
+		}
+		pg_free(zerobuf);
+
+		if (lseek(fd, 0, SEEK_SET) != 0)
+			return NULL;
+	}
+
+	f = pg_malloc0(sizeof(DirectoryMethodFile));
+	f->fd = fd;
+	f->currpos = 0;
+	f->pathname = pg_strdup(pathname);
+	if (temp_suffix)
+		f->temp_suffix = pg_strdup(temp_suffix);
+	return f;
+}
+
+static ssize_t
+dir_write(Walfile f, const void *buf, size_t count)
+{
+	ssize_t		r;
+	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+
+	Assert(f != NULL);
+
+	r = write(df->fd, buf, count);
+	if (r > 0)
+		df->currpos += r;
+	return r;
+}
+
+static off_t
+dir_get_current_pos(Walfile f)
+{
+	Assert(f != NULL);
+
+	/* Use a cached value to prevent lots of reseeks */
+	return ((DirectoryMethodFile *) f)->currpos;
+}
+
+static int
+dir_close(Walfile f, WalCloseMethod method)
+{
+	int			r;
+	DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+	static char tmppath[MAXPGPATH];
+	static char tmppath2[MAXPGPATH];
+
+	Assert(f != NULL);
+
+	r = close(df->fd);
+
+	if (r == 0)
+	{
+		/* Build path to the current version of the file */
+		if (method == CLOSE_NORMAL && df->temp_suffix)
+		{
+			/* If we have a temp prefix, normal is we rename the file */
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+					 dir_data->basedir, df->pathname, df->temp_suffix);
+			snprintf(tmppath2, sizeof(tmppath2), "%s/%s",
+					 dir_data->basedir, df->pathname);
+			r = rename(tmppath, tmppath2);
+		}
+		else if (method == CLOSE_UNLINK
+			)
+		{
+			/* Unlink the file once it's closed */
+			snprintf(tmppath, sizeof(tmppath), "%s/%s%s",
+					 dir_data->basedir, df->pathname, df->temp_suffix ? df->temp_suffix : "");
+			r = unlink(tmppath);
+		}
+		/* else either CLOSE_NORMAL and no temp suffix, or CLOSE_NO_RENAME */
+	}
+
+	pg_free(df->pathname);
+	if (df->temp_suffix)
+		pg_free(df->temp_suffix);
+	pg_free(df);
+
+	return r;
+}
+
+static int
+dir_fsync(Walfile f)
+{
+	Assert(f != NULL);
+
+	return fsync(((DirectoryMethodFile *) f)->fd);
+}
+
+static ssize_t
+dir_get_file_size(const char *pathname)
+{
+	struct stat statbuf;
+	static char tmppath[MAXPGPATH];
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	if (stat(tmppath, &statbuf) != 0)
+		return -1;
+
+	return statbuf.st_size;
+}
+
+static bool
+dir_existsfile(const char *pathname)
+{
+	static char tmppath[MAXPGPATH];
+	int			fd;
+
+	snprintf(tmppath, sizeof(tmppath), "%s/%s",
+			 dir_data->basedir, pathname);
+
+	fd = open(tmppath, O_RDONLY | PG_BINARY, 0);
+	if (fd < 0)
+		return false;
+	close(fd);
+	return true;
+}
+
+static bool
+dir_finish(void)
+{
+	/* No cleanup necessary */
+	return true;
+}
+
+
+WalWriteMethod *
+CreateWalDirectoryMethod(const char *basedir)
+{
+	WalWriteMethod *method;
+
+	method = pg_malloc0(sizeof(WalWriteMethod));
+	method->open_for_write = dir_open_for_write;
+	method->write = dir_write;
+	method->get_current_pos = dir_get_current_pos;
+	method->get_file_size = dir_get_file_size;
+	method->close = dir_close;
+	method->fsync = dir_fsync;
+	method->existsfile = dir_existsfile;
+	method->finish = dir_finish;
+	method->getlasterror = dir_getlasterror;
+
+	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->basedir = pg_strdup(basedir);
+
+	return method;
+}
+
+
+/*-------------------------------------------------------------------------
+ * WalTarMethod - write wal to a tar file containing pg_xlog contents
+ *-------------------------------------------------------------------------
+ */
+
+typedef struct TarMethodFile
+{
+	off_t		ofs_start;		/* Where does the *header* for this file start */
+	off_t		currpos;
+	char		header[512];
+	char	   *pathname;
+	size_t		pad_to_size;
+}	TarMethodFile;
+
+typedef struct TarMethodData
+{
+	char	   *tarfilename;
+	int			fd;
+	int			compression;
+	TarMethodFile *currentfile;
+	char		lasterror[1024];
+#ifdef HAVE_LIBZ
+	z_streamp	zp;
+	void	   *zlibOut;
+#endif
+}	TarMethodData;
+static TarMethodData *tar_data = NULL;
+
+#define tar_clear_error() tar_data->lasterror[0] = '\0'
+#define tar_set_error(msg) strlcpy(tar_data->lasterror, msg, sizeof(tar_data->lasterror))
+
+static char *
+tar_getlasterror(void)
+{
+	/*
+	 * If a custom error is set, return that one. Otherwise, assume errno is
+	 * set and return that one.
+	 */
+	if (tar_data->lasterror[0])
+		return tar_data->lasterror;
+	return strerror(errno);
+}
+
+#ifdef HAVE_LIBZ
+static bool
+tar_write_compressed_data(void *buf, size_t count, bool flush)
+{
+	tar_data->zp->next_in = buf;
+	tar_data->zp->avail_in = count;
+
+	while (tar_data->zp->avail_in || flush)
+	{
+		int			r;
+
+		r = deflate(tar_data->zp, flush ? Z_FINISH : Z_NO_FLUSH);
+		if (r == Z_STREAM_ERROR)
+		{
+			tar_set_error("deflate failed");
+			return false;
+		}
+
+		if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+		{
+			size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+			if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+				return false;
+
+			tar_data->zp->next_out = tar_data->zlibOut;
+			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+		}
+
+		if (r == Z_STREAM_END)
+			break;
+	}
+
+	if (flush)
+	{
+		/* Reset the stream for writing */
+		if (deflateReset(tar_data->zp) != Z_OK)
+		{
+			tar_set_error("deflateReset failed");
+			return false;
+		}
+	}
+
+	return true;
+}
+#endif
+
+static ssize_t
+tar_write(Walfile f, const void *buf, size_t count)
+{
+	ssize_t		r;
+
+	Assert(f != NULL);
+	tar_clear_error();
+
+	/* Tarfile will always be positioned at the end */
+	if (!tar_data->compression)
+	{
+		r = write(tar_data->fd, buf, count);
+		if (r > 0)
+			((TarMethodFile *) f)->currpos += r;
+		return r;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		if (!tar_write_compressed_data((void *) buf, count, false))
+			return -1;
+		((TarMethodFile *) f)->currpos += count;
+		return count;
+	}
+#endif
+}
+
+static bool
+tar_write_padding_data(TarMethodFile * f, size_t bytes)
+{
+	char	   *zerobuf = pg_malloc0(XLOG_BLCKSZ);
+	size_t		bytesleft = bytes;
+
+	while (bytesleft)
+	{
+		size_t		bytestowrite = bytesleft > XLOG_BLCKSZ ? XLOG_BLCKSZ : bytesleft;
+
+		size_t		r = tar_write(f, zerobuf, bytestowrite);
+
+		if (r < 0)
+			return false;
+		bytesleft -= r;
+	}
+	return true;
+}
+
+static Walfile
+tar_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_size)
+{
+	int			save_errno;
+	static char tmppath[MAXPGPATH];
+
+	tar_clear_error();
+
+	if (tar_data->fd < 0)
+	{
+		/*
+		 * We open the tar file only when we first try to write to it.
+		 */
+		tar_data->fd = open(tar_data->tarfilename,
+						  O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+		if (tar_data->fd < 0)
+			return NULL;
+
+#ifdef HAVE_LIBZ
+		if (tar_data->compression)
+		{
+			tar_data->zp = (z_streamp) pg_malloc(sizeof(z_stream));
+			tar_data->zp->zalloc = Z_NULL;
+			tar_data->zp->zfree = Z_NULL;
+			tar_data->zp->opaque = Z_NULL;
+			tar_data->zp->next_out = tar_data->zlibOut;
+			tar_data->zp->avail_out = ZLIB_OUT_SIZE;
+
+			/*
+			 * Initialize deflation library. Adding the magic value 16 to the
+			 * default 15 for the windowBits parameter makes the output be
+			 * gzip instead of zlib.
+			 */
+			if (deflateInit2(tar_data->zp, tar_data->compression, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY) != Z_OK)
+			{
+				tar_set_error("deflateInit2 failed");
+				return NULL;
+			}
+		}
+#endif
+
+		/* There's no tar header itself, the file starts with regular files */
+	}
+
+	Assert(tar_data->currentfile == NULL);
+	if (tar_data->currentfile != NULL)
+	{
+		tar_set_error("implementation error: tar files can't have more than one open file\n");
+		return NULL;
+	}
+
+	tar_data->currentfile = pg_malloc0(sizeof(TarMethodFile));
+
+	snprintf(tmppath, sizeof(tmppath), "%s%s",
+			 pathname, temp_suffix ? temp_suffix : "");
+
+	/* Create a header with size set to 0 - we will fill out the size on close */
+	if (tarCreateHeader(tar_data->currentfile->header, tmppath, NULL, 0, S_IRUSR | S_IWUSR, 0, 0, time(NULL)) != TAR_OK)
+	{
+		pg_free(tar_data->currentfile);
+		tar_data->currentfile = NULL;
+		tar_set_error("could not create tar header");
+		return NULL;
+	}
+
+#ifdef HAVE_LIBZ
+	if (tar_data->compression)
+	{
+		/* Flush existing data */
+		if (!tar_write_compressed_data(NULL, 0, true))
+			return NULL;
+
+		/* Turn off compression for header */
+		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return NULL;
+		}
+	}
+#endif
+
+	tar_data->currentfile->ofs_start = lseek(tar_data->fd, 0, SEEK_CUR);
+	if (tar_data->currentfile->ofs_start == -1)
+	{
+		save_errno = errno;
+		pg_free(tar_data->currentfile);
+		tar_data->currentfile = NULL;
+		errno = save_errno;
+		return NULL;
+	}
+	tar_data->currentfile->currpos = 0;
+
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, tar_data->currentfile->header, 512) != 512)
+		{
+			save_errno = errno;
+			pg_free(tar_data->currentfile);
+			tar_data->currentfile = NULL;
+			errno = save_errno;
+			return NULL;
+		}
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		/* Write header through the zlib APIs but with no compression */
+		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+			return NULL;
+
+		/* Re-enable compression for the rest of the file */
+		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return NULL;
+		}
+	}
+#endif
+
+	tar_data->currentfile->pathname = pg_strdup(pathname);
+
+	/*
+	 * Uncompressed files are padded on creation, but for compression we can't
+	 * do that
+	 */
+	if (pad_to_size)
+	{
+		tar_data->currentfile->pad_to_size = pad_to_size;
+		if (!tar_data->compression)
+		{
+			/* Uncompressed, so pad now */
+			tar_write_padding_data(tar_data->currentfile, pad_to_size);
+			/* Seek back to start */
+			if (lseek(tar_data->fd, tar_data->currentfile->ofs_start + 512, SEEK_SET) != tar_data->currentfile->ofs_start + 512)
+				return NULL;
+
+			tar_data->currentfile->currpos = 0;
+		}
+	}
+
+	return tar_data->currentfile;
+}
+
+static ssize_t
+tar_get_file_size(const char *pathname)
+{
+	tar_clear_error();
+
+	/* Currently not used, so not supported */
+	errno = ENOSYS;
+	return -1;
+}
+
+static off_t
+tar_get_current_pos(Walfile f)
+{
+	Assert(f != NULL);
+	tar_clear_error();
+
+	return ((TarMethodFile *) f)->currpos;
+}
+
+static int
+tar_fsync(Walfile f)
+{
+	Assert(f != NULL);
+	tar_clear_error();
+
+	/*
+	 * Always sync the whole tarfile, because that's all we can do. This makes
+	 * no sense on compressed files, so just ignore those.
+	 */
+	if (tar_data->compression)
+		return 0;
+
+	return fsync(tar_data->fd);
+}
+
+static int
+tar_close(Walfile f, WalCloseMethod method)
+{
+	ssize_t		filesize;
+	int			padding;
+	TarMethodFile *tf = (TarMethodFile *) f;
+
+	Assert(f != NULL);
+	tar_clear_error();
+
+	if (method == CLOSE_UNLINK)
+	{
+		if (tar_data->compression)
+		{
+			tar_set_error("unlink not supported with compression");
+			return -1;
+		}
+
+		/*
+		 * Unlink the file that we just wrote to the tar. We do this by
+		 * truncating it to the start of the header. This is safe as we only
+		 * allow writing of the very last file.
+		 */
+		if (ftruncate(tar_data->fd, tf->ofs_start) != 0)
+			return -1;
+
+		pg_free(tf->pathname);
+		pg_free(tf);
+		tar_data->currentfile = NULL;
+
+		return 0;
+	}
+
+	/*
+	 * Pad the file itself with zeroes if necessary. Note that this is
+	 * different from the tar format padding -- this is the padding we asked
+	 * for when the file was opened.
+	 */
+	if (tf->pad_to_size)
+	{
+		if (tar_data->compression)
+		{
+			/*
+			 * A compressed tarfile is padded on close since we cannot know
+			 * the size of the compressed output until the end.
+			 */
+			size_t		sizeleft = tf->pad_to_size - tf->currpos;
+
+			if (sizeleft)
+			{
+				if (!tar_write_padding_data(tf, sizeleft))
+					return -1;
+			}
+		}
+		else
+		{
+			/*
+			 * An uncompressed tarfile was padded on creation, so just adjust
+			 * the current position as if we seeked to the end.
+			 */
+			tf->currpos = tf->pad_to_size;
+		}
+	}
+
+	/*
+	 * Get the size of the file, and pad the current data up to the nearest
+	 * 512 byte boundary.
+	 */
+	filesize = tar_get_current_pos(f);
+	padding = ((filesize + 511) & ~511) - filesize;
+	if (padding)
+	{
+		char		zerobuf[512];
+
+		MemSet(zerobuf, 0, padding);
+		if (tar_write(f, zerobuf, padding) != padding)
+			return -1;
+	}
+
+
+#ifdef HAVE_LIBZ
+	if (tar_data->compression)
+	{
+		/* Flush the current buffer */
+		if (!tar_write_compressed_data(NULL, 0, true))
+		{
+			errno = EINVAL;
+			return -1;
+		}
+	}
+#endif
+
+	/*
+	 * Now go back and update the header with the correct filesize and
+	 * possibly also renaming the file. We overwrite the entire current header
+	 * when done, including the checksum.
+	 */
+	print_tar_number(&(tf->header[124]), 12, filesize);
+
+	if (method == CLOSE_NORMAL)
+
+		/*
+		 * We overwrite it with what it was before if we have no tempname,
+		 * since we're going to write the buffer anyway.
+		 */
+		strlcpy(&(tf->header[0]), tf->pathname, 100);
+
+	print_tar_number(&(tf->header[148]), 8, tarChecksum(((TarMethodFile *) f)->header));
+	if (lseek(tar_data->fd, tf->ofs_start, SEEK_SET) != ((TarMethodFile *) f)->ofs_start)
+		return -1;
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, tf->header, 512) != 512)
+			return -1;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		/* Turn off compression */
+		if (deflateParams(tar_data->zp, 0, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return -1;
+		}
+
+		/* Overwrite the header, assuming the size will be the same */
+		if (!tar_write_compressed_data(tar_data->currentfile->header, 512, true))
+			return -1;
+
+		/* Turn compression back on */
+		if (deflateParams(tar_data->zp, tar_data->compression, 0) != Z_OK)
+		{
+			tar_set_error("deflateParams failed");
+			return -1;
+		}
+	}
+#endif
+
+	/* Move file pointer back down to end, so we can write the next file */
+	if (lseek(tar_data->fd, 0, SEEK_END) < 0)
+		return -1;
+
+	/* Always fsync on close, so the padding gets fsynced */
+	tar_fsync(f);
+
+	/* Clean up and done */
+	pg_free(tf->pathname);
+	pg_free(tf);
+	tar_data->currentfile = NULL;
+
+	return 0;
+}
+
+static bool
+tar_existsfile(const char *pathname)
+{
+	tar_clear_error();
+	/* We only deal with new tarfiles, so nothing externally created exists */
+	return false;
+}
+
+static bool
+tar_finish(void)
+{
+	char		zerobuf[1024];
+
+	tar_clear_error();
+
+	if (tar_data->currentfile)
+	{
+		if (tar_close(tar_data->currentfile, CLOSE_NORMAL) != 0)
+			return false;
+	}
+
+	/* A tarfile always ends with two empty  blocks */
+	MemSet(zerobuf, 0, sizeof(zerobuf));
+	if (!tar_data->compression)
+	{
+		if (write(tar_data->fd, zerobuf, sizeof(zerobuf)) != sizeof(zerobuf))
+			return false;
+	}
+#ifdef HAVE_LIBZ
+	else
+	{
+		if (!tar_write_compressed_data(zerobuf, sizeof(zerobuf), false))
+			return false;
+
+		/* Also flush all data to make sure the gzip stream is finished */
+		tar_data->zp->next_in = NULL;
+		tar_data->zp->avail_in = 0;
+		while (true)
+		{
+			int			r;
+
+			r = deflate(tar_data->zp, Z_FINISH);
+
+			if (r == Z_STREAM_ERROR)
+			{
+				tar_set_error("deflate failed");
+				return false;
+			}
+			if (tar_data->zp->avail_out < ZLIB_OUT_SIZE)
+			{
+				size_t		len = ZLIB_OUT_SIZE - tar_data->zp->avail_out;
+
+				if (write(tar_data->fd, tar_data->zlibOut, len) != len)
+					return false;
+			}
+			if (r == Z_STREAM_END)
+				break;
+		}
+
+		if (deflateEnd(tar_data->zp) != Z_OK)
+		{
+			tar_set_error("deflateEnd failed");
+			return false;
+		}
+	}
+#endif
+
+	/* sync the empty blocks as well, since they're after the last file */
+	fsync(tar_data->fd);
+
+	if (close(tar_data->fd) != 0)
+		return false;
+
+	tar_data->fd = -1;
+
+	return true;
+}
+
+WalWriteMethod *
+CreateWalTarMethod(const char *tarbase, int compression)
+{
+	WalWriteMethod *method;
+	const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
+
+	method = pg_malloc0(sizeof(WalWriteMethod));
+	method->open_for_write = tar_open_for_write;
+	method->write = tar_write;
+	method->get_current_pos = tar_get_current_pos;
+	method->get_file_size = tar_get_file_size;
+	method->close = tar_close;
+	method->fsync = tar_fsync;
+	method->existsfile = tar_existsfile;
+	method->finish = tar_finish;
+	method->getlasterror = tar_getlasterror;
+
+	tar_data = pg_malloc0(sizeof(TarMethodData));
+	tar_data->tarfilename = pg_malloc0(strlen(tarbase) + strlen(suffix) + 1);
+	sprintf(tar_data->tarfilename, "%s%s", tarbase, suffix);
+	tar_data->fd = -1;
+	tar_data->compression = compression;
+	if (compression)
+		tar_data->zlibOut = (char *) pg_malloc(ZLIB_OUT_SIZE + 1);
+
+	return method;
+}
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
new file mode 100644
index 0000000..39dd6a9
--- /dev/null
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -0,0 +1,45 @@
+/*-------------------------------------------------------------------------
+ *
+ * walmethods.h
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/walmethods.h
+ *-------------------------------------------------------------------------
+ */
+
+
+typedef void *Walfile;
+
+typedef enum
+{
+	CLOSE_NORMAL,
+	CLOSE_UNLINK,
+	CLOSE_NO_RENAME,
+}	WalCloseMethod;
+
+typedef struct WalWriteMethod WalWriteMethod;
+struct WalWriteMethod
+{
+	Walfile(*open_for_write) (const char *pathname, const char *temp_suffix, size_t pad_to_size);
+	int			(*close) (Walfile f, WalCloseMethod method);
+	bool		(*existsfile) (const char *pathname);
+	ssize_t		(*get_file_size) (const char *pathname);
+
+	ssize_t		(*write) (Walfile f, const void *buf, size_t count);
+	off_t		(*get_current_pos) (Walfile f);
+	int			(*fsync) (Walfile f);
+	bool		(*finish) (void);
+	char	   *(*getlasterror) (void);
+};
+
+/*
+ * Available WAL methods:
+ *	- WalDirectoryMethod - write WAL to regular files in a standard pg_xlog
+ *	- TarDirectoryMethod - write WAL to a tarfile corresponding to pg_xlog
+ *						   (only implements the methods required for pg_basebackup,
+ *						   not all those required for pg_receivexlog)
+ */
+WalWriteMethod *CreateWalDirectoryMethod(const char *basedir);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression);
diff --git a/src/include/pgtar.h b/src/include/pgtar.h
index 45ca400..1d179f0 100644
--- a/src/include/pgtar.h
+++ b/src/include/pgtar.h
@@ -22,4 +22,5 @@ enum tarError
 extern enum tarError tarCreateHeader(char *h, const char *filename, const char *linktarget,
 			  pgoff_t size, mode_t mode, uid_t uid, gid_t gid, time_t mtime);
 extern uint64 read_tar_number(const char *s, int len);
+extern void print_tar_number(char *s, int len, uint64 val);
 extern int	tarChecksum(char *header);
diff --git a/src/port/tar.c b/src/port/tar.c
index 52a2113..f1da959 100644
--- a/src/port/tar.c
+++ b/src/port/tar.c
@@ -16,7 +16,7 @@
  * support only non-negative numbers, so we don't worry about the GNU rules
  * for handling negative numbers.)
  */
-static void
+void
 print_tar_number(char *s, int len, uint64 val)
 {
 	if (val < (((uint64) 1) << ((len - 1) * 3)))
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to