On Wed, Aug 6, 2014 at 5:10 PM,  <furu...@pm.nttdata.co.jp> wrote:
>> > -                                               break;  /* ignore
>> the rest of this XLogData packet */
>> >
>> > +                               return true;    /* ignore the rest of
>> this XLogData packet */
>> >
>> > For break statement at close of wal file, it is a return to true.
>> > It may be a behavior of continue statement. Is it satisfactory?
>>
>> Sorry I failed to see your point.
>>
>> In the original code, when we reach the end of WAL file and it's streaming
>> stopping point, we break out of inner loop in the code block for
>> processing XLogData packet. And then we goes back to top of outer loop
>> in HandleCopyStream. ISTM that the refactored code also works the same
>> way.
>> Anyway, could you elaborate the problem?
>
> I'm sorry. I was confused with the patch that I have to offer.
> It is necessary to exit the loop since the loop added to the continuously 
> received the message if the patch.
> Refactor patch is no problem with the contents of the presentation.

Okay, applied the patch.

I heavily modified your patch based on the master that the refactoring
patch has been applied. Attached is that modified version. Could you
review that?

Regards,

-- 
Fujii Masao
diff --git a/doc/src/sgml/ref/pg_receivexlog.sgml b/doc/src/sgml/ref/pg_receivexlog.sgml
index 7c50b01..c15776f 100644
--- a/doc/src/sgml/ref/pg_receivexlog.sgml
+++ b/doc/src/sgml/ref/pg_receivexlog.sgml
@@ -106,6 +106,21 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+       <term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
+       <term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
+       <listitem>
+        <para>
+        Specifies the maximum time to issue sync commands to ensure the
+        received WAL file is safely flushed to disk, in seconds. The default
+        value is zero, which disables issuing fsyncs except when WAL file is
+        closed. If <literal>-1</literal> is specified, WAL file is flushed as
+        soon as possible, that is, as soon as there are WAL data which has
+        not been flushed yet.
+        </para>
+       </listitem>
+      </varlistentry>
+
+     <varlistentry>
       <term><option>-v</option></term>
       <term><option>--verbose</option></term>
       <listitem>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 5df2eb8..0b02c4c 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -371,7 +371,7 @@ LogStreamerMain(logstreamer_param *param)
 	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
 						   param->sysidentifier, param->xlogdir,
 						   reached_end_position, standby_message_timeout,
-						   NULL))
+						   NULL, 0))
 
 		/*
 		 * Any errors will already have been reported in the function process,
diff --git a/src/bin/pg_basebackup/pg_receivexlog.c b/src/bin/pg_basebackup/pg_receivexlog.c
index 9640838..0b7af54 100644
--- a/src/bin/pg_basebackup/pg_receivexlog.c
+++ b/src/bin/pg_basebackup/pg_receivexlog.c
@@ -36,6 +36,7 @@ static char *basedir = NULL;
 static int	verbose = 0;
 static int	noloop = 0;
 static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+static int	fsync_interval = 0; /* 0 = default */
 static volatile bool time_to_abort = false;
 
 
@@ -62,6 +63,8 @@ usage(void)
 	printf(_("\nOptions:\n"));
 	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
 	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
+	printf(_("  -F  --fsync-interval=INTERVAL\n"
+			 "                         frequency of syncs to transaction log files (in seconds)\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
@@ -330,7 +333,8 @@ StreamLog(void)
 				starttli);
 
 	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
-					  stop_streaming, standby_message_timeout, ".partial");
+					  stop_streaming, standby_message_timeout, ".partial",
+					  fsync_interval);
 
 	PQfinish(conn);
 }
@@ -360,6 +364,7 @@ main(int argc, char **argv)
 		{"port", required_argument, NULL, 'p'},
 		{"username", required_argument, NULL, 'U'},
 		{"no-loop", no_argument, NULL, 'n'},
+		{"fsync-interval", required_argument, NULL, 'F'},
 		{"no-password", no_argument, NULL, 'w'},
 		{"password", no_argument, NULL, 'W'},
 		{"status-interval", required_argument, NULL, 's'},
@@ -389,7 +394,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nwWv",
+	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:nF:wWv",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -436,6 +441,15 @@ main(int argc, char **argv)
 			case 'n':
 				noloop = 1;
 				break;
+		case 'F':
+			fsync_interval = atoi(optarg) * 1000;
+			if (fsync_interval < -1000)
+			{
+				fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
+						progname, optarg);
+				exit(1);
+			}
+			break;
 			case 'v':
 				verbose++;
 				break;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d28e13b..89b22f2 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -31,12 +31,14 @@ static char current_walfile_name[MAXPGPATH] = "";
 static bool reportFlushPosition = false;
 static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
 
+static int64 last_fsync = -1;		/* timestamp of last WAL file flush */
 static bool still_sending = true;		/* feedback still needs to be sent? */
 
 static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
 				 uint32 timeline, char *basedir,
 			   stream_stop_callback stream_stop, int standby_message_timeout,
-				 char *partial_suffix, XLogRecPtr *stoppos);
+				  char *partial_suffix, XLogRecPtr *stoppos,
+				  int fsync_interval);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
@@ -48,6 +50,13 @@ static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
 									   XLogRecPtr blockpos, char *basedir, char *partial_suffix,
 									   XLogRecPtr *stoppos);
+static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
+								uint32 timeline, char *basedir,
+								stream_stop_callback stream_stop,
+								char *partial_suffix, XLogRecPtr *stoppos);
+static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+										 int64 last_status, int fsync_interval,
+										 XLogRecPtr blockpos);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
 						 uint32 *timeline);
@@ -200,6 +209,7 @@ close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
 				progname, current_walfile_name, partial_suffix);
 
 	lastFlushPosition = pos;
+	last_fsync = feGetCurrentTimestamp();
 	return true;
 }
 
@@ -430,13 +440,17 @@ CheckServerVersionForStreaming(PGconn *conn)
  * allows you to tell the difference between partial and completed files,
  * so that you can continue later where you left.
  *
+ * fsync_interval controls how often we flush to the received WAL file,
+ * in milliseconds.
+ *
  * Note: The log position *must* be at a log segment start!
  */
 bool
 ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 				  char *sysidentifier, char *basedir,
 				  stream_stop_callback stream_stop,
-				  int standby_message_timeout, char *partial_suffix)
+				  int standby_message_timeout, char *partial_suffix,
+				  int fsync_interval)
 {
 	char		query[128];
 	char		slotcmd[128];
@@ -581,7 +595,7 @@ ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		/* Stream the WAL */
 		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
 							   standby_message_timeout, partial_suffix,
-							   &stoppos);
+							   &stoppos, fsync_interval);
 		if (res == NULL)
 			goto error;
 
@@ -746,7 +760,7 @@ static PGresult *
 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 				 char *basedir, stream_stop_callback stream_stop,
 				 int standby_message_timeout, char *partial_suffix,
-				 XLogRecPtr *stoppos)
+				 XLogRecPtr *stoppos, int fsync_interval)
 {
 	char	   *copybuf = NULL;
 	int64		last_status = -1;
@@ -763,26 +777,36 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		/*
 		 * Check if we should continue streaming, or abort at this point.
 		 */
-		if (still_sending && stream_stop(blockpos, timeline, false))
+		if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+								stream_stop, partial_suffix, stoppos))
+			goto error;
+
+		now = feGetCurrentTimestamp();
+
+		/*
+		 * If fsync_interval has elapsed since last WAL flush and we've written
+		 * some WAL data, flush them to disk.
+		 */
+		if (lastFlushPosition < blockpos &&
+			walfile != -1 &&
+			((fsync_interval > 0 &&
+			  feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
+			 fsync_interval < 0))
 		{
-			if (!close_walfile(basedir, partial_suffix, blockpos))
-			{
-				/* Potential error message is written by close_walfile */
-				goto error;
-			}
-			if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+			if (fsync(walfile) != 0)
 			{
-				fprintf(stderr, _("%s: could not send copy-end packet: %s"),
-						progname, PQerrorMessage(conn));
+				fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+						progname, current_walfile_name, strerror(errno));
 				goto error;
 			}
-			still_sending = false;
+
+			lastFlushPosition = blockpos;
+			last_fsync = now;
 		}
 
 		/*
 		 * Potentially send a status message to the master
 		 */
-		now = feGetCurrentTimestamp();
 		if (still_sending && standby_message_timeout > 0 &&
 			feTimestampDifferenceExceeds(last_status, now,
 										 standby_message_timeout))
@@ -794,64 +818,58 @@ HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
 		}
 
 		/*
-		 * Compute how long send/receive loops should sleep
+		 * Calculate how long send/receive loops should sleep
 		 */
-		if (standby_message_timeout && still_sending)
-		{
-			int64		targettime;
-			long		secs;
-			int			usecs;
-
-			targettime = last_status + (standby_message_timeout - 1) * ((int64) 1000);
-			feTimestampDifference(now,
-								  targettime,
-								  &secs,
-								  &usecs);
-			/* Always sleep at least 1 sec */
-			if (secs <= 0)
-			{
-				secs = 1;
-				usecs = 0;
-			}
-
-			sleeptime = secs * 1000 + usecs / 1000;
-		}
-		else
-			sleeptime = -1;
+		sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
+												 last_status, fsync_interval, blockpos);
 
 		r = CopyStreamReceive(conn, sleeptime, &copybuf);
-		if (r == 0)
-			continue;
-		if (r == -1)
-			goto error;
-		if (r == -2)
+		while (r != 0)
 		{
-			PGresult	*res = HandleEndOfCopyStream(conn, copybuf, blockpos,
-													 basedir, partial_suffix, stoppos);
-			if (res == NULL)
+			if (r == -1)
 				goto error;
-			else
-				return res;
-		}
+			if (r == -2)
+			{
+				PGresult	*res = HandleEndOfCopyStream(conn, copybuf, blockpos,
+														 basedir, partial_suffix, stoppos);
+				if (res == NULL)
+					goto error;
+				else
+					return res;
+			}
 
-		/* Check the message type. */
-		if (copybuf[0] == 'k')
-		{
-			if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
-									 &last_status))
-				goto error;
-		}
-		else if (copybuf[0] == 'w')
-		{
-			if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
-									timeline, basedir, stream_stop, partial_suffix))
+			/* Check the message type. */
+			if (copybuf[0] == 'k')
+			{
+				if (!ProcessKeepaliveMsg(conn, copybuf, r, blockpos,
+										 &last_status))
+					goto error;
+			}
+			else if (copybuf[0] == 'w')
+			{
+				if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
+										timeline, basedir, stream_stop, partial_suffix))
+					goto error;
+
+				/*
+				 * Check if we should continue streaming, or abort at this point.
+				 */
+				if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
+										 stream_stop, partial_suffix, stoppos))
+					goto error;
+			}
+			else
+			{
+				fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
+						progname, copybuf[0]);
 				goto error;
-		}
-		else
-		{
-			fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),
-					progname, copybuf[0]);
-			goto error;
+			}
+
+			/*
+			 * Process the received data, and any subsequent data we
+			 * can read without blocking.
+			 */
+			r = CopyStreamReceive(conn, 0, &copybuf);
 		}
 	}
 
@@ -1193,3 +1211,80 @@ HandleEndOfCopyStream(PGconn *conn, char *copybuf,
 	*stoppos = blockpos;
 	return res;
 }
+
+/*
+ * Check if we should continue streaming, or abort at this point.
+ */
+static bool
+CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
+					char *basedir, stream_stop_callback stream_stop,
+					char *partial_suffix, XLogRecPtr *stoppos)
+{
+	if (still_sending && stream_stop(blockpos, timeline, false))
+	{
+		if (!close_walfile(basedir, partial_suffix, blockpos))
+		{
+			/* Potential error message is written by close_walfile */
+			return false;
+		}
+		if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+		{
+			fprintf(stderr, _("%s: could not send copy-end packet: %s"),
+					progname, PQerrorMessage(conn));
+			return false;
+		}
+		still_sending = false;
+	}
+
+	return true;
+}
+
+/*
+ * Calculate how long send/receive loops should sleep
+ */
+static long
+CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
+							 int64 last_status, int fsync_interval, XLogRecPtr blockpos)
+{
+	int64		targettime = 0;
+	int64		status_targettime = 0;
+	int64		fsync_targettime = 0;
+	long		sleeptime;
+
+	if (standby_message_timeout && still_sending)
+		status_targettime = last_status +
+			(standby_message_timeout - 1) * ((int64) 1000);
+
+	if (fsync_interval > 0 && lastFlushPosition < blockpos)
+		fsync_targettime = last_fsync +
+			(fsync_interval - 1) * ((int64) 1000);
+
+	if ((status_targettime < fsync_targettime && status_targettime > 0) ||
+		fsync_targettime == 0)
+		targettime = status_targettime;
+	else
+		targettime = fsync_targettime;
+
+	if (targettime > 0)
+	{
+		long		secs;
+		int			usecs;
+
+		feTimestampDifference(now,
+							  targettime,
+							  &secs,
+							  &usecs);
+		/* Always sleep at least 1 sec */
+		if (secs <= 0)
+		{
+			secs = 1;
+			usecs = 0;
+		}
+
+		sleeptime = secs * 1000 + usecs / 1000;
+	}
+	else
+		sleeptime = -1;
+
+	return sleeptime;
+}
diff --git a/src/bin/pg_basebackup/receivelog.h b/src/bin/pg_basebackup/receivelog.h
index f4789a5..72f8245 100644
--- a/src/bin/pg_basebackup/receivelog.h
+++ b/src/bin/pg_basebackup/receivelog.h
@@ -16,4 +16,5 @@ extern bool ReceiveXlogStream(PGconn *conn,
 				  char *basedir,
 				  stream_stop_callback stream_stop,
 				  int standby_message_timeout,
-				  char *partial_suffix);
+				  char *partial_suffix,
+				  int fsync_interval);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to