commit 82e22bebb30b6b1d184039131d94ab26a3d4b92e
Author: Robert Haas <rhaas@postgresql.org>
Date:   Tue Apr 12 22:50:13 2011 -0700

    Support time-delayed standbys.
    
    Review by Greg Stark, Jaime Casanova, Fujii Masao.

diff --git a/doc/src/sgml/recovery-config.sgml b/doc/src/sgml/recovery-config.sgml
index de60905..375f43b 100644
--- a/doc/src/sgml/recovery-config.sgml
+++ b/doc/src/sgml/recovery-config.sgml
@@ -279,6 +279,28 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="recovery-time-delay"
+                   xreflabel="recovery_time_delay">
+      <term><varname>recovery_time_delay</varname>
+        (<type>integer</type>)
+      </term>
+      <indexterm>
+        <primary><varname>recovery_time_delay</> recovery parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Specifies the amount of time (in milliseconds, if no unit is specified)
+        which recovery of transaction commits should lag the master.  This
+        parameter allows creation of a time-delayed standby.  For example, if
+        you set this parameter to <literal>5min</literal>, the standby will
+        replay each transaction commit only when the system time on the standby
+        is at least five minutes past the commit time reported by the master.
+        Note that if the master and standby system clocks are not synchronized,
+        this might lead to unexpected results.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
    </sect1>
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 4952d22..41b3ae9 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -195,6 +195,8 @@ static bool recoveryPauseAtTarget = true;
 static TransactionId recoveryTargetXid;
 static TimestampTz recoveryTargetTime;
 static char *recoveryTargetName;
+static int recovery_time_delay = 0;
+static TimestampTz recoveryDelayUntilTime;
 
 /* options taken from recovery.conf for XLOG streaming */
 static bool StandbyMode = false;
@@ -606,6 +608,7 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
 static void recoveryPausesHere(void);
 static bool RecoveryIsPaused(void);
 static void SetRecoveryPause(bool recoveryPause);
+static void recoveryDelay(void);
 static void SetLatestXTime(TimestampTz xtime);
 static TimestampTz GetLatestXTime(void);
 static void CheckRequiredParameterValues(void);
@@ -5396,6 +5399,19 @@ readRecoveryCommandFile(void)
 					(errmsg("trigger_file = '%s'",
 							TriggerFile)));
 		}
+		else if (strcmp(item->name, "recovery_time_delay") == 0)
+		{
+			const char *hintmsg;
+
+			if (!parse_int(item->value, &recovery_time_delay, GUC_UNIT_MS,
+				&hintmsg))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("parameter \"%s\" requires a temporal value", "recovery_time_delay"),
+						 hintmsg ? errhint("%s", _(hintmsg)) : 0));
+			ereport(DEBUG2,
+					(errmsg("recovery_time_delay = '%s'", item->value)));
+		}
 		else
 			ereport(FATAL,
 					(errmsg("unrecognized recovery parameter \"%s\"",
@@ -5579,7 +5595,8 @@ exitArchiveRecovery(TimeLineID endTLI, uint32 endLogId, uint32 endLogSeg)
  * We also track the timestamp of the latest applied COMMIT/ABORT
  * record in XLogCtl->recoveryLastXTime, for logging purposes.
  * Also, some information is saved in recoveryStopXid et al for use in
- * annotating the new timeline's history file.
+ * annotating the new timeline's history file; and recoveryDelayUntilTime
+ * is updated, for time-delayed standbys.
  */
 static bool
 recoveryStopsHere(XLogRecord *record, bool *includeThis)
@@ -5589,6 +5606,9 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
 	TimestampTz recordXtime;
 	char		recordRPName[MAXFNAMELEN];
 
+	/* Clear any previous recovery delay time */
+	recoveryDelayUntilTime = 0;
+
 	/* We only consider stopping at COMMIT, ABORT or RESTORE POINT records */
 	if (record->xl_rmid != RM_XACT_ID && record->xl_rmid != RM_XLOG_ID)
 		return false;
@@ -5599,6 +5619,10 @@ recoveryStopsHere(XLogRecord *record, bool *includeThis)
 
 		recordXactCommitData = (xl_xact_commit *) XLogRecGetData(record);
 		recordXtime = recordXactCommitData->xact_time;
+		if (recovery_time_delay > 0)
+			recoveryDelayUntilTime =
+				TimestampTzPlusMilliseconds(recordXactCommitData->xact_time,
+											recovery_time_delay);
 	}
 	else if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT)
 	{
@@ -5839,6 +5863,47 @@ pg_is_xlog_replay_paused(PG_FUNCTION_ARGS)
 }
 
 /*
+ * When recovery_time_delay is set, we wait long enough to make sure we are
+ * at least that far behind the master.
+ */
+static void
+recoveryDelay(void)
+{
+	while (!CheckForStandbyTrigger())
+	{
+		long	secs;
+		int		microsecs;
+
+		HandleStartupProcInterrupts();
+		TimestampDifference(GetCurrentTimestamp(), recoveryDelayUntilTime,
+							&secs, &microsecs);
+
+		/*
+		 * If the timestamp we found in the commit record is out of whack, we
+		 * refuse to wait.  In the worst case this might mean going into the
+		 * tank essentially forever.
+		 */
+		if (secs > (recovery_time_delay / 1000) + 1)
+		{
+			elog(LOG, "recovery delay is %d seconds, skipping wait of %ld seconds",
+				recovery_time_delay / 1000, secs);
+			break;
+		}
+
+  		/*
+		 * Sleep in intervals of at most one second, so that we can be
+		 * responsive to interrupts and changes in the system clock.
+		 */
+		if (secs > 0)
+			pg_usleep(USECS_PER_SEC);
+		else if (microsecs > 0)
+			pg_usleep(microsecs);
+		else
+			break;
+	}
+}
+
+/*
  * Save timestamp of latest processed commit/abort record.
  *
  * We keep this in XLogCtl, not a simple static variable, so that it can be
@@ -6573,6 +6638,13 @@ StartupXLOG(void)
 					recoveryPausesHere();
 
 				/*
+				 * If we've been asked to lag the master, pause until enough
+				 * time has passed.
+				 */
+				if (recovery_time_delay > 0)
+					recoveryDelay();
+
+				/*
 				 * If we are attempting to enter Hot Standby mode, process
 				 * XIDs we see
 				 */
