Better late than never (or?), here's the final cleanup of
pg_streamrecv for moving into the main distribution, per discussion
back in late dec or early jan. It also includes the "stream logs in
parallel to backup" part that was not completed on pg_basebackup.

Other than that, the only changes to pg_basebackup are the moving of a
couple of functions into streamutil.c to make them usable from both,
and the progress format output fix Fujii-san mentioned recently.

Should be complete except for Win32 support (needs thread/fork thing
for the  background streaming feature. Shouldn't be too hard, and I
guess that falls on me anyway..) and the reference documentation.

And with no feedback to my question here
(http://archives.postgresql.org/pgsql-hackers/2011-02/msg00805.php), I
went with the "duplicate the macros that are needed to avoid loading
postgres.h" path.

Yes, I realize that this is far too late in the CF process really, but
I wanted to post it anyway... If it's too late to be acceptable it
should be possible to maintain this outside the main repository until
9.2, since it only changes frontend binaries. So I'm not actually
going to put it on the CF page unless someone else says that's a good
idea, to at least share the blame from Robert ;)

-- 
 Magnus Hagander
 Me: http://www.hagander.net/
 Work: http://www.redpill-linpro.com/
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index ccb1502..5bbe52d 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -18,21 +18,26 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 
-OBJS=	pg_basebackup.o $(WIN32RES)
+OBJS=receivelog.o streamutil.o $(WIN32RES)
 
-all: pg_basebackup
+all: pg_basebackup pg_receivexlog
 
-pg_basebackup: $(OBJS) | submake-libpq submake-libpgport
-	$(CC) $(CFLAGS) $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+pg_basebackup: pg_basebackup.o $(OBJS) | submake-libpq submake-libpgport
+	$(CC) $(CFLAGS) pg_basebackup.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
+
+pg_receivexlog: pg_receivexlog.o $(OBJS) | submake-libpq submake-libpgport
+	$(CC) $(CFLAGS) pg_receivexlog.o $(OBJS) $(libpq_pgport) $(LDFLAGS) $(LDFLAGS_EX) $(LIBS) -o $@$(X)
 
 install: all installdirs
 	$(INSTALL_PROGRAM) pg_basebackup$(X) '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	$(INSTALL_PROGRAM) pg_receivexlog$(X) '$(DESTDIR)$(bindir)/pg_receivexlog(X)'
 
 installdirs:
 	$(MKDIR_P) '$(DESTDIR)$(bindir)'
 
 uninstall:
 	rm -f '$(DESTDIR)$(bindir)/pg_basebackup$(X)'
+	rm -f '$(DESTDIR)$(bindir)/pg_receivexlog(X)'
 
 clean distclean maintainer-clean:
-	rm -f pg_basebackup$(X) $(OBJS)
+	rm -f pg_basebackup$(X) pg_receivexlog$(X) $(OBJS) pg_basebackup.o pg_receivexlog.o
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 61aa1d3..7442bbe 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -17,6 +17,8 @@
 #include <unistd.h>
 #include <dirent.h>
 #include <sys/stat.h>
+#include <sys/types.h>
+#include <sys/wait.h>
 
 #ifdef HAVE_LIBZ
 #include <zlib.h>
@@ -24,9 +26,11 @@
 
 #include "getopt_long.h"
 
+#include "receivelog.h"
+#include "streamutil.h"
+
 
 /* Global options */
-static const char *progname;
 char	   *basedir = NULL;
 char		format = 'p';		/* p(lain)/t(ar) */
 char	   *label = "pg_basebackup base backup";
@@ -34,38 +38,35 @@ bool		showprogress = false;
 int			verbose = 0;
 int			compresslevel = 0;
 bool		includewal = false;
+bool		streamwal = false;
 bool		fastcheckpoint = false;
-char	   *dbhost = NULL;
-char	   *dbuser = NULL;
-char	   *dbport = NULL;
-int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
 
 /* Progress counters */
 static uint64 totalsize;
 static uint64 totaldone;
 static int	tablespacecount;
 
-/* Connection kept global so we can disconnect easily */
-static PGconn *conn = NULL;
+/* Pipe to communicate with background wal receiver process */
+static int	bgpipe[2] = {-1, -1};
 
-#define disconnect_and_exit(code)				\
-	{											\
-	if (conn != NULL) PQfinish(conn);			\
-	exit(code);									\
-	}
+/* Handle to child process */
+static pid_t bgchild = -1;
+
+/* End position for xlog streaming, empty string if unknown yet */
+static XLogRecPtr xlogendptr;
+static bool has_xlogendptr = false;
 
 /* Function headers */
-static char *xstrdup(const char *s);
-static void *xmalloc0(int size);
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname);
 static void progress_report(int tablespacenum, char *fn);
-static PGconn *GetConnection(void);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
 static void BaseBackup();
 
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
 #ifdef HAVE_LIBZ
 static const char *
 get_gz_error(gzFile *gzf)
@@ -81,39 +82,6 @@ get_gz_error(gzFile *gzf)
 }
 #endif
 
-/*
- * strdup() and malloc() replacements that prints an error and exits
- * if something goes wrong. Can never return NULL.
- */
-static char *
-xstrdup(const char *s)
-{
-	char	   *result;
-
-	result = strdup(s);
-	if (!result)
-	{
-		fprintf(stderr, _("%s: out of memory\n"), progname);
-		exit(1);
-	}
-	return result;
-}
-
-static void *
-xmalloc0(int size)
-{
-	void	   *result;
-
-	result = malloc(size);
-	if (!result)
-	{
-		fprintf(stderr, _("%s: out of memory\n"), progname);
-		exit(1);
-	}
-	MemSet(result, 0, size);
-	return result;
-}
-
 
 static void
 usage(void)
@@ -125,7 +93,7 @@ usage(void)
 	printf(_("\nOptions controlling the output:\n"));
 	printf(_("  -D, --pgdata=directory    receive base backup into directory\n"));
 	printf(_("  -F, --format=p|t          output format (plain, tar)\n"));
-	printf(_("  -x, --xlog                include required WAL files in backup\n"));
+	printf(_("  -x, --xlog[=stream]       include required WAL files in backup\n"));
 	printf(_("  -Z, --compress=0-9        compress tar output\n"));
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -c, --checkpoint=fast|spread\n"
@@ -146,6 +114,140 @@ usage(void)
 
 
 /*
+ * Called in the background process whenever a complete segment of WAL
+ * has been received. Check to see if there is any data on our pipe
+ * (which would mean we have a stop position), and if it is, check if
+ * it is time to stop.
+ */
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+	fd_set		fds;
+	struct timeval tv;
+	int			r;
+
+	if (has_xlogendptr)
+	{
+		/* Already know when to stop, compare to the position we got */
+		if (segendpos.xlogid > xlogendptr.xlogid ||
+			(segendpos.xlogid == xlogendptr.xlogid &&
+			 segendpos.xrecoff >= xlogendptr.xrecoff))
+			return true;
+	}
+
+	/*
+	 * Don't have the end pointer yet - check our pipe to see if it has been
+	 * sent now.
+	 */
+	FD_ZERO(&fds);
+	FD_SET(bgpipe[0], &fds);
+
+	MemSet(&tv, 0, sizeof(tv));
+
+	r = select(bgpipe[0] + 1, &fds, NULL, NULL, &tv);
+	if (r == 1)
+	{
+		char		xlogend[64];
+
+		MemSet(xlogend, 0, sizeof(xlogend));
+		r = piperead(bgpipe[0], xlogend, sizeof(xlogend));
+		if (r < 0)
+		{
+			fprintf(stderr, _("%s: could not read from ready pipe: %s\n"),
+					progname, strerror(errno));
+			exit(1);
+		}
+
+		if (sscanf(xlogend, "%X/%X", &xlogendptr.xlogid, &xlogendptr.xrecoff) != 2)
+		{
+			fprintf(stderr, _("%s: could not parse xlog end position \"%s\"\n"),
+					progname, xlogend);
+			exit(1);
+		}
+		has_xlogendptr = true;
+
+		/* since we have a value now, call ourselves to make the comparison */
+		return segment_callback(segendpos, timeline);
+	}
+
+	/* Else nothing happened, so don't exit */
+	return false;
+}
+
+/*
+ * Initiate background process for receiving xlog during the backup.
+ * The background stream will use it's own database connection so we can
+ * stream the logfile in parallel with the backups.
+ */
+static void
+StartLogStreamer(char *startpos, uint32 timeline)
+{
+	PGconn	   *bgconn;
+	XLogRecPtr	startptr;
+	char		xlogdir[MAXPGPATH];
+
+	/* Convert the starting position */
+	if (sscanf(startpos, "%X/%X", &startptr.xlogid, &startptr.xrecoff) != 2)
+	{
+		fprintf(stderr, _("%s: invalid format of xlog location: %s\n"),
+				progname, startpos);
+		disconnect_and_exit(1);
+	}
+	/* Round off to even segment position */
+	startptr.xrecoff -= startptr.xrecoff % XLOG_SEG_SIZE;
+
+	/* Create our background pipe */
+	if (pgpipe(bgpipe) < 0)
+	{
+		fprintf(stderr, _("%s: could not create pipe for background process: %s\n"),
+				progname, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	/* Get a second connection */
+	bgconn = GetConnection();
+
+	/*
+	 * Always in plain format, so we can write to basedir/pg_xlog. But the
+	 * directory entry in the tar file may arrive later, so make sure it's
+	 * created before we start.
+	 */
+	snprintf(xlogdir, sizeof(xlogdir), "%s/pg_xlog", basedir);
+	verify_dir_is_empty_or_create(xlogdir);
+
+	/* Fork off the child process and tell it to go about it's business */
+	/* XXX: win32 */
+	bgchild = fork();
+	if (bgchild == 0)
+	{
+		/* in child process */
+
+		if (!ReceiveXlogStream(bgconn, startptr, timeline, xlogdir, segment_callback))
+
+			/*
+			 * Any errors will already have been reported in the function
+			 * process, but we need to tell the parent that we didn't shutdown
+			 * in a nice way. Do this by exiting with an error code and expect
+			 * it to be picked up.
+			 */
+			exit(1);
+
+		PQfinish(bgconn);
+		exit(0);
+	}
+	else if (bgchild < 0)
+	{
+		fprintf(stderr, _("%s: could not create background process: %s\n"),
+				progname, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	/*
+	 * Else we are in the parent process and all is well.
+	 */
+}
+
+/*
  * Verify that the given directory exists and is empty. If it does not
  * exist, it is created. If it exists but is not empty, an error will
  * be give and the process ended.
@@ -202,13 +304,19 @@ verify_dir_is_empty_or_create(char *dirname)
 static void
 progress_report(int tablespacenum, char *fn)
 {
-	int percent = (int) ((totaldone / 1024) * 100 / totalsize);
+	int			percent = (int) ((totaldone / 1024) * 100 / totalsize);
+
 	if (percent > 100)
 		percent = 100;
 
-	if (verbose)
+	if (!fn)
+		fprintf(stderr,
+		INT64_FORMAT "/" INT64_FORMAT " kb g(100%%) %i/%i tablespaces %35s\r",
+				totaldone / 1024, totalsize,
+				tablespacenum, tablespacecount, "");
+	else if (verbose)
 		fprintf(stderr,
-				INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30s)\r",
+				INT64_FORMAT "/" INT64_FORMAT " kB (%i%%) %i/%i tablespaces (%-30.30s)\r",
 				totaldone / 1024, totalsize,
 				percent,
 				tablespacenum, tablespacecount, fn);
@@ -443,11 +551,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 		strcpy(current_path, PQgetvalue(res, rownum, 1));
 
 	/*
-	 * Make sure we're unpacking into an empty directory
-	 */
-	verify_dir_is_empty_or_create(current_path);
-
-	/*
 	 * Get the COPY data
 	 */
 	res = PQgetResult(conn);
@@ -540,10 +643,18 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 					fn[strlen(fn) - 1] = '\0';	/* Remove trailing slash */
 					if (mkdir(fn, S_IRWXU) != 0)
 					{
-						fprintf(stderr,
-							_("%s: could not create directory \"%s\": %s\n"),
-								progname, fn, strerror(errno));
-						disconnect_and_exit(1);
+						/*
+						 * When streaming WAL, pg_xlog will have been created
+						 * by the wal receiver process, so just ignore failure
+						 * on that.
+						 */
+						if (!streamwal || strcmp(fn + strlen(fn) - 8, "/pg_xlog") != 0)
+						{
+							fprintf(stderr,
+									_("%s: could not create directory \"%s\": %s\n"),
+									progname, fn, strerror(errno));
+							disconnect_and_exit(1);
+						}
 					}
 #ifndef WIN32
 					if (chmod(fn, (mode_t) filemode))
@@ -654,90 +765,6 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 }
 
 
-static PGconn *
-GetConnection(void)
-{
-	PGconn	   *tmpconn;
-	int			argcount = 4;	/* dbname, replication, fallback_app_name,
-								 * password */
-	int			i;
-	const char **keywords;
-	const char **values;
-	char	   *password = NULL;
-
-	if (dbhost)
-		argcount++;
-	if (dbuser)
-		argcount++;
-	if (dbport)
-		argcount++;
-
-	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
-	values = xmalloc0((argcount + 1) * sizeof(*values));
-
-	keywords[0] = "dbname";
-	values[0] = "replication";
-	keywords[1] = "replication";
-	values[1] = "true";
-	keywords[2] = "fallback_application_name";
-	values[2] = progname;
-	i = 3;
-	if (dbhost)
-	{
-		keywords[i] = "host";
-		values[i] = dbhost;
-		i++;
-	}
-	if (dbuser)
-	{
-		keywords[i] = "user";
-		values[i] = dbuser;
-		i++;
-	}
-	if (dbport)
-	{
-		keywords[i] = "port";
-		values[i] = dbport;
-		i++;
-	}
-
-	while (true)
-	{
-		if (dbgetpassword == 1)
-		{
-			/* Prompt for a password */
-			password = simple_prompt(_("Password: "), 100, false);
-			keywords[argcount - 1] = "password";
-			values[argcount - 1] = password;
-		}
-
-		tmpconn = PQconnectdbParams(keywords, values, true);
-		if (password)
-			free(password);
-
-		if (PQstatus(tmpconn) == CONNECTION_BAD &&
-			PQconnectionNeedsPassword(tmpconn) &&
-			dbgetpassword != -1)
-		{
-			dbgetpassword = 1;	/* ask for password next time */
-			PQfinish(tmpconn);
-			continue;
-		}
-
-		if (PQstatus(tmpconn) != CONNECTION_OK)
-		{
-			fprintf(stderr, _("%s: could not connect to server: %s\n"),
-					progname, PQerrorMessage(tmpconn));
-			exit(1);
-		}
-
-		/* Connection ok! */
-		free(values);
-		free(keywords);
-		return tmpconn;
-	}
-}
-
 static void
 BaseBackup()
 {
@@ -780,7 +807,7 @@ BaseBackup()
 	snprintf(current_path, sizeof(current_path), "BASE_BACKUP LABEL '%s' %s %s %s %s",
 			 escaped_label,
 			 showprogress ? "PROGRESS" : "",
-			 includewal ? "WAL" : "",
+			 includewal && !streamwal ? "WAL" : "",
 			 fastcheckpoint ? "FAST" : "",
 	         includewal ? "NOWAIT" : "");
 
@@ -859,6 +886,18 @@ BaseBackup()
 	}
 
 	/*
+	 * If we're streaming WAL, start the streaming session before we start
+	 * receiving the actual data chunks.
+	 */
+	if (streamwal)
+	{
+		if (verbose)
+			fprintf(stderr, _("%s: starting background WAL receiver\n"),
+					progname);
+		StartLogStreamer(xlogstart, timeline);
+	}
+
+	/*
 	 * Start receiving chunks
 	 */
 	for (i = 0; i < PQntuples(res); i++)
@@ -871,7 +910,7 @@ BaseBackup()
 
 	if (showprogress)
 	{
-		progress_report(PQntuples(res), "");
+		progress_report(PQntuples(res), NULL);
 		fprintf(stderr, "\n");	/* Need to move to next line */
 	}
 	PQclear(res);
@@ -905,6 +944,49 @@ BaseBackup()
 		disconnect_and_exit(1);
 	}
 
+	if (bgchild != -1)
+	{
+		int			status;
+		int			r;
+
+		if (verbose)
+			fprintf(stderr, _("%s: waiting for background process to finish streaming...\n"), progname);
+		if (pipewrite(bgpipe[1], xlogend, strlen(xlogend)) != strlen(xlogend))
+		{
+			fprintf(stderr, _("%s: could not send command to background pipe: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+
+		/* Just wait for the background process to exit */
+		r = waitpid(bgchild, &status, 0);
+		if (r == -1)
+		{
+			fprintf(stderr, _("%s: could not wait for child process: %s\n"),
+					progname, strerror(errno));
+			disconnect_and_exit(1);
+		}
+		if (r != bgchild)
+		{
+			fprintf(stderr, "%s: child %i died, expected %i\n",
+					progname, r, bgchild);
+			disconnect_and_exit(1);
+		}
+		if (!WIFEXITED(status))
+		{
+			fprintf(stderr, "%s: child process did not exit normally\n",
+					progname);
+			disconnect_and_exit(1);
+		}
+		if (WEXITSTATUS(status) != 0)
+		{
+			fprintf(stderr, "%s: child process exited with error %i\n",
+					progname, WEXITSTATUS(status));
+			disconnect_and_exit(1);
+		}
+		/* Exited normally, we're happy! */
+	}
+
 	/*
 	 * End of copy data. Final result is already checked inside the loop.
 	 */
@@ -924,7 +1006,7 @@ main(int argc, char **argv)
 		{"pgdata", required_argument, NULL, 'D'},
 		{"format", required_argument, NULL, 'F'},
 		{"checkpoint", required_argument, NULL, 'c'},
-		{"xlog", no_argument, NULL, 'x'},
+		{"xlog", optional_argument, NULL, 'x'},
 		{"compress", required_argument, NULL, 'Z'},
 		{"label", required_argument, NULL, 'l'},
 		{"host", required_argument, NULL, 'h'},
@@ -980,6 +1062,18 @@ main(int argc, char **argv)
 				break;
 			case 'x':
 				includewal = true;
+				if (optarg)
+				{
+					if (strcmp(optarg, "s") == 0 ||
+						strcmp(optarg, "stream") == 0)
+						streamwal = true;
+					else
+					{
+						fprintf(stderr, _("%s: invalid xlog option \"%s\", must be empty or \"stream\"\n"),
+								progname, optarg);
+						exit(1);
+					}
+				}
 				break;
 			case 'l':
 				label = xstrdup(optarg);
@@ -1080,6 +1174,16 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (format != 'p' && streamwal)
+	{
+		fprintf(stderr,
+				_("%s: wal streaming can only be used in plain mode\n"),
+				progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel > 0)
 	{
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
new file mode 100644
index 0000000..41b5bb7
--- /dev/null
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -0,0 +1,407 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_receivexlog.c - receive streaming transaction log data and write it
+ *					  to a local file.
+ *
+ * Author: Magnus Hagander <mag...@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/pg_receivexlog.c
+ *-------------------------------------------------------------------------
+ */
+
+
+#include "postgres_fe.h"
+#include "libpq-fe.h"
+
+#include <dirent.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "getopt_long.h"
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+
+/* Global options */
+char	   *basedir = NULL;
+int			verbose = 0;
+
+
+static void usage(void);
+static XLogRecPtr FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline);
+static void StreamLog();
+static bool segment_callback(XLogRecPtr segendpos, uint32 timeline);
+
+/*
+ * XXX: from xlog_internal.h
+ */
+#define XLogSegsPerFile (((uint32) 0xffffffff) / XLOG_SEG_SIZE)
+#define PrevLogSeg(logId, logSeg)       \
+        do { \
+                if (logSeg) \
+                        (logSeg)--; \
+                else \
+                { \
+                        (logId)--; \
+                        (logSeg) = XLogSegsPerFile-1; \
+                } \
+        } while (0)
+
+
+static void
+usage(void)
+{
+	printf(_("%s receives PostgreSQL streaming transaction logs\n\n"),
+		   progname);
+	printf(_("Usage:\n"));
+	printf(_("  %s [OPTION]...\n"), progname);
+	printf(_("\nOptions controlling the output:\n"));
+	printf(_("  -D, --dir=directory       receive xlog files into this directory\n"));
+	printf(_("\nGeneral options:\n"));
+	printf(_("  -v, --verbose             output verbose messages\n"));
+	printf(_("  -?, --help                show this help, then exit\n"));
+	printf(_("  -V, --version             output version information, then exit\n"));
+	printf(_("\nConnection options:\n"));
+	printf(_("  -h, --host=HOSTNAME      database server host or socket directory\n"));
+	printf(_("  -p, --port=PORT          database server port number\n"));
+	printf(_("  -U, --username=NAME      connect as specified database user\n"));
+	printf(_("  -w, --no-password        never prompt for password\n"));
+	printf(_("  -W, --password           force password prompt (should happen automatically)\n"));
+	printf(_("\nReport bugs to <pgsql-b...@postgresql.org>.\n"));
+}
+
+static bool
+segment_callback(XLogRecPtr segendpos, uint32 timeline)
+{
+	char		fn[MAXPGPATH];
+	struct stat statbuf;
+
+	if (verbose)
+		fprintf(stderr, _("%s: finished segment at %X/%X (timeline %u)\n"),
+				progname, segendpos.xlogid, segendpos.xrecoff, timeline);
+
+	/*
+	 * Check if there is a partial file for the name we just finished, and if
+	 * there is, remove it under the assumption that we have now got all the
+	 * data we need.
+	 */
+	PrevLogSeg(segendpos.xlogid, segendpos.xrecoff);
+	snprintf(fn, sizeof(fn), "%s/%08X%08X%08X.partial",
+			 basedir, timeline,
+			 segendpos.xlogid,
+			 segendpos.xrecoff / XLOG_SEG_SIZE);
+	if (stat(fn, &statbuf) == 0)
+	{
+		/* File existed, get rid of it */
+		if (verbose)
+			fprintf(stderr, _("%s: removing file \"%s\"\n"),
+					progname, fn);
+		unlink(fn);
+	}
+
+	/* Never abort */
+	return false;
+}
+
+/*
+ * Determine starting location for streaming, based on:
+ * 1. If there are existing xlog segments, start at the end of the last one
+ * 2. If the last one is a partial segment, rename it and start over, since
+ *	  we don't sync after every write.
+ * 3. If no existing xlog exists, start from the beginning of the current
+ *	  WAL segment.
+ */
+static XLogRecPtr
+FindStreamingStart(XLogRecPtr currentpos, uint32 currenttimeline)
+{
+	DIR		   *dir;
+	struct dirent *dirent;
+	int			i;
+	bool		b;
+	XLogRecPtr	high = {0, 0};
+
+	dir = opendir(basedir);
+	if (dir == NULL)
+	{
+		fprintf(stderr, _("%s: could not open directory \"%s\": %s\n"),
+				progname, basedir, strerror(errno));
+		disconnect_and_exit(1);
+	}
+
+	while ((dirent = readdir(dir)) != NULL)
+	{
+		char		fullpath[MAXPGPATH];
+		struct stat statbuf;
+		uint32		tli,
+					log,
+					seg;
+
+		if (!strcmp(dirent->d_name, ".") || !strcmp(dirent->d_name, ".."))
+			continue;
+
+		/* xlog files are always 24 characters */
+		if (strlen(dirent->d_name) != 24)
+			continue;
+
+		/* Filenames are always made out of 0-9 and A-F */
+		b = false;
+		for (i = 0; i < 24; i++)
+		{
+			if (!(dirent->d_name[i] >= '0' && dirent->d_name[i] <= '9') &&
+				!(dirent->d_name[i] >= 'A' && dirent->d_name[i] <= 'F'))
+			{
+				b = true;
+				break;
+			}
+		}
+		if (b)
+			continue;
+
+		/*
+		 * Looks like an xlog file. Parse it's position.
+		 */
+		if (sscanf(dirent->d_name, "%08X%08X%08X", &tli, &log, &seg) != 3)
+		{
+			fprintf(stderr, _("%s: could not parse xlog filename \"%s\"\n"),
+					progname, dirent->d_name);
+			disconnect_and_exit(1);
+		}
+		log *= XLOG_SEG_SIZE;
+
+		/* Ignore any files that are for another timeline */
+		if (tli != currenttimeline)
+			continue;
+
+		/* Check if this is a completed segment or not */
+		snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+		if (stat(fullpath, &statbuf) != 0)
+		{
+			fprintf(stderr, _("%s: could not stat file \"%s\": %s\n"),
+					progname, fullpath, strerror(errno));
+			disconnect_and_exit(1);
+		}
+
+		if (statbuf.st_size == 16 * 1024 * 1024)
+		{
+			/* Completed segment */
+			if (log > high.xlogid ||
+				(log == high.xlogid && seg > high.xrecoff))
+			{
+				high.xlogid = log;
+				high.xrecoff = seg;
+				continue;
+			}
+		}
+		else
+		{
+			/*
+			 * This is a partial file. Rename it out of the way.
+			 */
+			char		newfn[MAXPGPATH];
+
+			fprintf(stderr, _("%s: renaming partial file \"%s\" to \"%s.partial\"\n"),
+					progname, dirent->d_name, dirent->d_name);
+
+			snprintf(newfn, sizeof(newfn), "%s/%s.partial",
+					 basedir, dirent->d_name);
+
+			if (stat(newfn, &statbuf) == 0)
+			{
+				fprintf(stderr, _("%s: file \"%s\" already exists. Check and clean up manually.\n"),
+						progname, newfn);
+				disconnect_and_exit(1);
+			}
+			if (rename(fullpath, newfn) != 0)
+			{
+				fprintf(stderr, _("%s: could not rename \"%s\" to \"%s\": %s\n"),
+						progname, fullpath, newfn, strerror(errno));
+				disconnect_and_exit(1);
+			}
+
+			/* Don't continue looking for more, we assume this is the last */
+			break;
+		}
+	}
+
+	closedir(dir);
+
+	if (high.xlogid > 0 && high.xrecoff > 0)
+		return high;
+
+	return currentpos;
+}
+
+/*
+ * Start the log streaming
+ */
+static void
+StreamLog(void)
+{
+	PGresult   *res;
+	uint32		timeline;
+	XLogRecPtr	startpos;
+
+	/*
+	 * Connect in replication mode to the server
+	 */
+	conn = GetConnection();
+
+	/*
+	 * Run IDENFITY_SYSTEM so we can get the timeline and current xlog
+	 * position.
+	 */
+	res = PQexec(conn, "IDENTIFY_SYSTEM");
+	if (PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		fprintf(stderr, _("%s: could not identify system: %s\n"),
+				progname, PQerrorMessage(conn));
+		disconnect_and_exit(1);
+	}
+	if (PQntuples(res) != 1)
+	{
+		fprintf(stderr, _("%s: could not identify system, got %i rows\n"),
+				progname, PQntuples(res));
+		disconnect_and_exit(1);
+	}
+	timeline = atoi(PQgetvalue(res, 0, 1));
+	if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &startpos.xlogid, &startpos.xrecoff) != 2)
+	{
+		fprintf(stderr, _("%s: could not parse log start position from value \"%s\"\n"),
+				progname, PQgetvalue(res, 0, 2));
+		disconnect_and_exit(1);
+	}
+	PQclear(res);
+
+	/*
+	 * Figure out where to start streaming.
+	 */
+	startpos = FindStreamingStart(startpos, timeline);
+
+	/*
+	 * Always start streaming at the beginning of a segment
+	 */
+	startpos.xrecoff -= startpos.xrecoff % XLOG_SEG_SIZE;
+
+	/*
+	 * Start the replication
+	 */
+	if (verbose)
+		fprintf(stderr, _("%s: starting log streaming at %X/%X (timeline %u)\n"),
+				progname, startpos.xlogid, startpos.xrecoff, timeline);
+
+	ReceiveXlogStream(conn, startpos, timeline, basedir, segment_callback);
+}
+
+int
+main(int argc, char **argv)
+{
+	static struct option long_options[] = {
+		{"help", no_argument, NULL, '?'},
+		{"version", no_argument, NULL, 'V'},
+		{"dir", required_argument, NULL, 'D'},
+		{"host", required_argument, NULL, 'h'},
+		{"port", required_argument, NULL, 'p'},
+		{"username", required_argument, NULL, 'U'},
+		{"no-password", no_argument, NULL, 'w'},
+		{"password", no_argument, NULL, 'W'},
+		{"verbose", no_argument, NULL, 'v'},
+		{NULL, 0, NULL, 0}
+	};
+	int			c;
+
+	int			option_index;
+
+	progname = get_progname(argv[0]);
+	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_receivexlog"));
+
+	if (argc > 1)
+	{
+		if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
+		{
+			usage();
+			exit(0);
+		}
+		else if (strcmp(argv[1], "-V") == 0
+				 || strcmp(argv[1], "--version") == 0)
+		{
+			puts("pg_receivexlog (PostgreSQL) " PG_VERSION);
+			exit(0);
+		}
+	}
+
+	while ((c = getopt_long(argc, argv, "D:h:p:U:wWv",
+							long_options, &option_index)) != -1)
+	{
+		switch (c)
+		{
+			case 'D':
+				basedir = xstrdup(optarg);
+				break;
+			case 'h':
+				dbhost = xstrdup(optarg);
+				break;
+			case 'p':
+				if (atoi(optarg) <= 0)
+				{
+					fprintf(stderr, _("%s: invalid port number \"%s\"\n"),
+							progname, optarg);
+					exit(1);
+				}
+				dbport = xstrdup(optarg);
+				break;
+			case 'U':
+				dbuser = xstrdup(optarg);
+				break;
+			case 'w':
+				dbgetpassword = -1;
+				break;
+			case 'W':
+				dbgetpassword = 1;
+				break;
+			case 'v':
+				verbose++;
+				break;
+			default:
+
+				/*
+				 * getopt_long already emitted a complaint
+				 */
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+						progname);
+				exit(1);
+		}
+	}
+
+	/*
+	 * Any non-option arguments?
+	 */
+	if (optind < argc)
+	{
+		fprintf(stderr,
+				_("%s: too many command-line arguments (first is \"%s\")\n"),
+				progname, argv[optind]);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	/*
+	 * Required arguments
+	 */
+	if (basedir == NULL)
+	{
+		fprintf(stderr, _("%s: no target directory specified\n"), progname);
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
+	StreamLog();
+
+	exit(0);
+}
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
new file mode 100644
index 0000000..3be9692
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -0,0 +1,207 @@
+/*-------------------------------------------------------------------------
+ *
+ * receivelog.c - receive transaction log files using the streaming
+ *				  replication protocol.
+ *
+ * Author: Magnus Hagander <mag...@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/receivelog.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "libpq-fe.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "receivelog.h"
+#include "streamutil.h"
+
+/* XXX: from xlog_internal.h */
+#define MAXFNAMELEN		64
+#define XLogFileName(fname, tli, log, seg)	\
+	snprintf(fname, MAXFNAMELEN, "%08X%08X%08X", tli, log, seg)
+
+/* Size of the streaming replication protocol header */
+#define STREAMING_HEADER_SIZE (1+8+8+8)
+
+/*
+ * Open a new WAL file in the specified directory. Store the name
+ * (not including the full directory) in namebuf. Assumes there is
+ * enough room in this buffer...
+ */
+static int
+open_walfile(XLogRecPtr startpoint, uint32 timeline, char *basedir, char *namebuf)
+{
+	int			f;
+	char		fn[MAXPGPATH];
+
+	XLogFileName(namebuf, timeline, startpoint.xlogid,
+				 startpoint.xrecoff / XLOG_SEG_SIZE);
+
+	snprintf(fn, sizeof(fn), "%s/%s", basedir, namebuf);
+	f = open(fn, O_WRONLY | O_CREAT | O_EXCL, 0666);
+	if (f == -1)
+		fprintf(stderr, _("%s: Could not open WAL segment %s: %s\n"),
+				progname, namebuf, strerror(errno));
+	return f;
+}
+
+/*
+ * Receive a log stream starting at the specified position.
+ *
+ * Note: The log position *must* be at a log segment change, or we will
+ * end up streaming an incomplete file.
+ */
+bool
+ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline, char *basedir, segment_finish_callback segment_finish)
+{
+	char		query[128];
+	char		current_walfile_name[MAXPGPATH];
+	PGresult   *res;
+	char	   *copybuf = NULL;
+	int			walfile = -1;
+
+	/* Initiate the replication stream at specified location */
+	snprintf(query, sizeof(query), "START_REPLICATION %X/%X", startpos.xlogid, startpos.xrecoff);
+	res = PQexec(conn, query);
+	if (PQresultStatus(res) != PGRES_COPY_BOTH)
+	{
+		fprintf(stderr, _("%s: could not start replication: %s\n"),
+				progname, PQresultErrorMessage(res));
+		return false;
+	}
+	PQclear(res);
+
+	/*
+	 * Receive the actual xlog data
+	 */
+	while (1)
+	{
+		XLogRecPtr	blockstart;
+		int			r;
+		int			xlogoff;
+
+		if (copybuf != NULL)
+		{
+			PQfreemem(copybuf);
+			copybuf = NULL;
+		}
+
+		r = PQgetCopyData(conn, &copybuf, 0);
+		if (r == -1)
+			/* End of copy stream */
+			break;
+		if (r == -2)
+		{
+			fprintf(stderr, _("%s: could not read copy data: %s\n"),
+					progname, PQerrorMessage(conn));
+			return false;
+		}
+		if (r < STREAMING_HEADER_SIZE + 1)
+		{
+			fprintf(stderr, _("%s: streaming header too small: %i\n"),
+					progname, r);
+			return false;
+		}
+		if (copybuf[0] != 'w')
+		{
+			fprintf(stderr, _("%s: streaming header corrupt: \"%c\"\n"),
+					progname, copybuf[0]);
+			return false;
+		}
+
+		/* Extract WAL location for this block */
+		memcpy(&blockstart, copybuf + 1, 8);
+
+		xlogoff = blockstart.xrecoff % XLOG_SEG_SIZE;
+
+		if (walfile == -1)
+		{
+			/* No file open yet */
+			if (xlogoff != 0)
+			{
+				fprintf(stderr, _("%s: received xlog record for offset %u with no file open\n"),
+						progname, xlogoff);
+				return false;
+			}
+			walfile = open_walfile(blockstart, timeline,
+								   basedir, current_walfile_name);
+			if (walfile == -1)
+				return false;
+		}
+		else
+		{
+			/* More data in existing segment */
+			/* XXX: store seek value don't reseek all the time */
+			if (lseek(walfile, 0, SEEK_CUR) != xlogoff)
+			{
+				fprintf(stderr, _("%s: got WAL data offset %i, expected %i\n"),
+						progname, xlogoff, (int) lseek(walfile, 0, SEEK_CUR));
+				return false;
+			}
+			/* Position matches, write happens lower down */
+		}
+
+		/* We have a file open in the correct position */
+		if (write(walfile, copybuf + STREAMING_HEADER_SIZE,
+				  r - STREAMING_HEADER_SIZE) != r - STREAMING_HEADER_SIZE)
+		{
+			fprintf(stderr, _("%s: could not write %u bytes to WAL file %s: %s\n"),
+					progname,
+					r - STREAMING_HEADER_SIZE,
+					current_walfile_name,
+					strerror(errno));
+			return false;
+		}
+
+		/* XXX: callback after each write */
+
+		/* Check if we are at the end of a segment */
+		if (lseek(walfile, 0, SEEK_CUR) == XLOG_SEG_SIZE)
+		{
+			/* Offset zero in new file, close and sync the old one */
+			fsync(walfile);
+			close(walfile);
+			walfile = -1;
+
+			if (segment_finish != NULL)
+			{
+				/*
+				 * Callback when the segment finished, and return if it told
+				 * us to.
+				 *
+				 * A block in the wal stream can never cross a segment
+				 * boundary, so we can safely just add the current block size
+				 * to the offset, so the xlog pointer points to what we have
+				 * actually written.
+				 */
+				blockstart.xrecoff += r - STREAMING_HEADER_SIZE;
+				if (segment_finish(blockstart, timeline))
+					return true;
+			}
+		}
+	}
+
+	/*
+	 * The only way to get out of the loop is if the server shut down the
+	 * replication stream. If it's a controlled shutdown, the server will send
+	 * a shutdown message, and we'll return the latest xlog location that has
+	 * been streamed.
+	 */
+
+	res = PQgetResult(conn);
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+	{
+		fprintf(stderr, _("%s: unexpected termination of replication stream: %s\n"),
+				progname, PQresultErrorMessage(res));
+		return false;
+	}
+	PQclear(res);
+	return true;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
new file mode 100644
index 0000000..ae34dd6
--- /dev/null
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -0,0 +1,13 @@
+#include "access/xlogdefs.h"
+
+/*
+ * Called whenever a segment is finished, return true to stop
+ * the streaming at this point.
+ */
+typedef bool (*segment_finish_callback)(XLogRecPtr segendpos, uint32 timeline);
+
+bool ReceiveXlogStream(PGconn *conn,
+					   XLogRecPtr startpos,
+					   uint32 timeline,
+					   char *basedir,
+					   segment_finish_callback segment_finish);
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
new file mode 100644
index 0000000..9f5c36f
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -0,0 +1,160 @@
+/*-------------------------------------------------------------------------
+ *
+ * streamutil.c - utility functions for pg_basebackup and pg_receivelog
+ *
+ * Author: Magnus Hagander <mag...@hagander.net>
+ *
+ * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		  src/bin/pg_basebackup/streamutil.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include <stdio.h>
+#include <string.h>
+
+#include "streamutil.h"
+
+const char *progname;
+char	   *dbhost = NULL;
+char	   *dbuser = NULL;
+char	   *dbport = NULL;
+int			dbgetpassword = 0;	/* 0=auto, -1=never, 1=always */
+static char *dbpassword = NULL;
+PGconn	   *conn = NULL;
+
+/*
+ * strdup() and malloc() replacements that prints an error and exits
+ * if something goes wrong. Can never return NULL.
+ */
+char *
+xstrdup(const char *s)
+{
+	char	   *result;
+
+	result = strdup(s);
+	if (!result)
+	{
+		fprintf(stderr, _("%s: out of memory\n"), progname);
+		exit(1);
+	}
+	return result;
+}
+
+void *
+xmalloc0(int size)
+{
+	void	   *result;
+
+	result = malloc(size);
+	if (!result)
+	{
+		fprintf(stderr, _("%s: out of memory\n"), progname);
+		exit(1);
+	}
+	MemSet(result, 0, size);
+	return result;
+}
+
+
+PGconn *
+GetConnection(void)
+{
+	PGconn	   *tmpconn;
+	int			argcount = 4;	/* dbname, replication, fallback_app_name,
+								 * password */
+	int			i;
+	const char **keywords;
+	const char **values;
+	char	   *password = NULL;
+
+	if (dbhost)
+		argcount++;
+	if (dbuser)
+		argcount++;
+	if (dbport)
+		argcount++;
+
+	keywords = xmalloc0((argcount + 1) * sizeof(*keywords));
+	values = xmalloc0((argcount + 1) * sizeof(*values));
+
+	keywords[0] = "dbname";
+	values[0] = "replication";
+	keywords[1] = "replication";
+	values[1] = "true";
+	keywords[2] = "fallback_application_name";
+	values[2] = progname;
+	i = 3;
+	if (dbhost)
+	{
+		keywords[i] = "host";
+		values[i] = dbhost;
+		i++;
+	}
+	if (dbuser)
+	{
+		keywords[i] = "user";
+		values[i] = dbuser;
+		i++;
+	}
+	if (dbport)
+	{
+		keywords[i] = "port";
+		values[i] = dbport;
+		i++;
+	}
+
+	while (true)
+	{
+		if (password)
+			free(password);
+
+		if (dbpassword)
+		{
+			/*
+			 * We've saved a password when a previous connection succeeded,
+			 * meaning this is the call for a second session to the same
+			 * database, so just forcibly reuse that password.
+			 */
+			keywords[argcount - 1] = "password";
+			values[argcount - 1] = dbpassword;
+			dbgetpassword = -1; /* Don't try again if this fails */
+		}
+		else if (dbgetpassword == 1)
+		{
+			password = simple_prompt(_("Password: "), 100, false);
+			keywords[argcount - 1] = "password";
+			values[argcount - 1] = password;
+		}
+
+		tmpconn = PQconnectdbParams(keywords, values, true);
+
+		if (PQstatus(tmpconn) == CONNECTION_BAD &&
+			PQconnectionNeedsPassword(tmpconn) &&
+			dbgetpassword != -1)
+		{
+			dbgetpassword = 1;	/* ask for password next time */
+			PQfinish(tmpconn);
+			continue;
+		}
+
+		if (PQstatus(tmpconn) != CONNECTION_OK)
+		{
+			fprintf(stderr, _("%s: could not connect to server: %s\n"),
+					progname, PQerrorMessage(tmpconn));
+			exit(1);
+		}
+
+		/* Connection ok! */
+		free(values);
+		free(keywords);
+
+		/* Store the password for next run */
+		if (password)
+			dbpassword = password;
+		return tmpconn;
+	}
+}
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
new file mode 100644
index 0000000..cef529a
--- /dev/null
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -0,0 +1,23 @@
+#include "access/xlogdefs.h"
+#include "libpq-fe.h"
+
+extern const char *progname;
+extern char *dbhost;
+extern char *dbuser;
+extern char *dbport;
+extern int	dbgetpassword;
+
+/* Connection kept global so we can disconnect easily */
+extern PGconn *conn;
+
+#define disconnect_and_exit(code)				\
+	{											\
+	if (conn != NULL) PQfinish(conn);			\
+	exit(code);									\
+	}
+
+
+char	   *xstrdup(const char *s);
+void	   *xmalloc0(int size);
+
+PGconn	   *GetConnection(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to