From f1962630e2c4f7344b58ebd1583227068069ffed Mon Sep 17 00:00:00 2001
From: Wu Hao <hawu@pivotal.io>
Date: Tue, 21 Jan 2020 18:36:58 +0530
Subject: [PATCH v2 2/2] Start WAL receiver before startup process replays
 existing WAL

If WAL receiver is started only after startup process finishes replaying
WAL already available in pg_wal, synchornous replication is impacted
adversly.  Consider a temporary network outage causing streaming
replication connection to break.  This leads to exit of WAL receiver
process.  If the startup process has fallen behind, it may take a long
time to finish replaying WAL and then start walreceiver again to
re-establish streaming replication.  Commits on master will have to wait
all this while for the standby to flush WAL upto commit LSN.

This patch starts WAL receiver from main redo loop, if it is found not
running, after consistent state has been reached, so as to alleviate the
problem described above.  We borroed a new GUC from Konstantin's patch
to control this behvior: wal_receiver_start_condition.

WAL receiver now remembers the start point to request streaming from in
pg_control.  Before creating a new WAL segment file, it records the new
WAL segment number in pg_control.  If the WAL receiver process exits and
must restart, streaming starts from the beginning of the saved segment
file.  We are leveraging the fact that WAL streaming always begins from
a WAL segment boundry.

Alternatives we thought of (but did not implement) for persisting the
starting point: (1) postgresql.auto.conf file, similar to how
primary_conninfo is remembered.  This option requires creating a new GUC
that represents the starting point.  Start point is never set by a user,
so using a GUC to represent it does not seem appropriate.  (2) introduce
a new flat file.  This incurs the overhead to maintain an additional
flat file.

Co-authored-by: Asim R P <apraveen@pivotal.io>
---
 src/backend/access/transam/xlog.c             | 31 ++++++++++++
 src/backend/replication/walreceiver.c         | 68 +++++++++++++++++++++++++++
 src/backend/replication/walreceiverfuncs.c    | 20 ++++++--
 src/backend/utils/misc/guc.c                  | 17 +++++++
 src/backend/utils/misc/postgresql.conf.sample |  1 +
 src/bin/pg_controldata/pg_controldata.c       |  4 ++
 src/include/catalog/pg_control.h              |  7 +++
 src/include/replication/walreceiver.h         |  7 +++
 src/test/recovery/t/018_replay_lag_syncrep.pl |  7 +++
 9 files changed, 157 insertions(+), 5 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7f4f784c0e..0d88fbdd25 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7082,6 +7082,37 @@ StartupXLOG(void)
 				/* Handle interrupt signals of startup process */
 				HandleStartupProcInterrupts();
 
+				/*
+				 * Start WAL receiver without waiting for startup process to
+				 * finish replay, so that streaming replication is established
+				 * at the earliest.  When the replication is configured to be
+				 * synchronous this would unblock commits waiting for WAL to
+				 * be written and/or flushed by synchronous standby.
+				 */
+				if (StandbyModeRequested &&
+					reachedConsistency &&
+					wal_receiver_start_condition == WAL_RCV_START_AT_CONSISTENCY &&
+					!WalRcvStreaming())
+				{
+					XLogRecPtr startpoint;
+					XLogSegNo startseg;
+					TimeLineID startpointTLI;
+					LWLockAcquire(ControlFileLock, LW_SHARED);
+					startseg = ControlFile->lastFlushedSeg;
+					startpointTLI = ControlFile->lastFlushedSegTLI;
+					LWLockRelease(ControlFileLock);
+					if (startpointTLI > 0)
+					{
+						elog(LOG, "found last flushed segment %lu on time line %d, starting WAL receiver",
+							 startseg, startpointTLI);
+						XLogSegNoOffsetToRecPtr(startseg, 0, wal_segment_size, startpoint);
+						RequestXLogStreaming(startpointTLI,
+											 startpoint,
+											 PrimaryConnInfo,
+											 PrimarySlotName);
+					}
+				}
+
 				/*
 				 * Pause WAL replay, if requested by a hot-standby session via
 				 * SetRecoveryPause().
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a5e85d32f3..0168ded801 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -50,6 +50,7 @@
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_control.h"
 #include "catalog/pg_type.h"
 #include "common/ip.h"
 #include "funcapi.h"
@@ -77,11 +78,14 @@ bool		wal_receiver_create_temp_slot;
 int			wal_receiver_status_interval;
 int			wal_receiver_timeout;
 bool		hot_standby_feedback;
+int         wal_receiver_start_condition;
 
 /* libpqwalreceiver connection */
 static WalReceiverConn *wrconn = NULL;
 WalReceiverFunctionsType *WalReceiverFunctions = NULL;
 
+static ControlFileData *ControlFile = NULL;
+
 #define NAPTIME_PER_CYCLE 100	/* max sleep time between cycles (100ms) */
 
 /*
@@ -163,6 +167,45 @@ ProcessWalRcvInterrupts(void)
 }
 
 
+/*
+ * Persist startpoint to pg_control file.  This is used to start replication
+ * without waiting for startup process to let us know where to start streaming
+ * from.
+ */
+static void
+SaveStartPoint(XLogRecPtr startpoint, TimeLineID startpointTLI)
+{
+	XLogSegNo oldseg, startseg;
+	TimeLineID oldTLI;
+
+	XLByteToSeg(startpoint, startseg, wal_segment_size);
+
+	LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
+
+#ifdef USE_ASSERT_CHECKING
+	/*
+	 * On a given timeline, the WAL segment to start streaming from should
+	 * never move backwards.
+	 */
+	if (ControlFile->lastFlushedSegTLI == startpointTLI)
+		Assert(ControlFile->lastFlushedSeg <= startseg);
+#endif
+
+	oldseg = ControlFile->lastFlushedSeg;
+	oldTLI = ControlFile->lastFlushedSegTLI;
+	if (oldseg < startseg || oldTLI != startpointTLI)
+	{
+		ControlFile->lastFlushedSeg = startseg;
+		ControlFile->lastFlushedSegTLI = startpointTLI;
+		UpdateControlFile();
+		elog(DEBUG3,
+			 "lastFlushedSeg (seg, TLI) old: (%lu, %u), new: (%lu, %u)",
+			 oldseg, oldTLI, startseg, startpointTLI);
+	}
+
+	LWLockRelease(ControlFileLock);
+}
+
 /* Main entry point for walreceiver process */
 void
 WalReceiverMain(void)
@@ -304,6 +347,10 @@ WalReceiverMain(void)
 	if (sender_host)
 		pfree(sender_host);
 
+	bool found;
+	ControlFile = ShmemInitStruct("Control File", sizeof(ControlFileData), &found);
+	Assert(found);
+
 	first_stream = true;
 	for (;;)
 	{
@@ -1055,6 +1102,27 @@ XLogWalRcvFlush(bool dying)
 		/* Also let the master know that we made some progress */
 		if (!dying)
 		{
+			/*
+			 * When a WAL segment file is completely filled,
+			 * LogstreamResult.Flush points to the beginning of the new WAL
+			 * segment file that will be created shortly.  Before sending a
+			 * reply with a LSN from the new WAL segment for the first time,
+			 * remember the LSN in pg_control.  The LSN is used as the
+			 * startpoint to start streaming again if the WAL receiver process
+			 * exits and starts again.
+			 *
+			 * It is important to update the LSN's segment number in
+			 * pg_control before including it in a replay back to the WAL
+			 * sender.  Once WAL sender receives the flush LSN from standby
+			 * reply, any older WAL segments that do not contain the flush LSN
+			 * may be cleaned up.  If the WAL receiver dies after sending a
+			 * reply but before updating pg_control, it is possible that the
+			 * starting segment saved in pg_control is no longer available on
+			 * master when it attempts to resume streaming.
+			 */
+			if (XLogSegmentOffset(LogstreamResult.Flush, wal_segment_size) == 0)
+				SaveStartPoint(LogstreamResult.Flush, ThisTimeLineID);
+
 			XLogWalRcvSendReply(false, false);
 			XLogWalRcvSendHSFeedback(false);
 		}
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 89c903e45a..955b8fcf83 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -239,10 +239,6 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 
 	SpinLockAcquire(&walrcv->mutex);
 
-	/* It better be stopped if we try to restart it */
-	Assert(walrcv->walRcvState == WALRCV_STOPPED ||
-		   walrcv->walRcvState == WALRCV_WAITING);
-
 	if (conninfo != NULL)
 		strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
 	else
@@ -253,12 +249,26 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
 	else
 		walrcv->slotname[0] = '\0';
 
+	/*
+	 * We used to assert that the WAL receiver is either in WALRCV_STOPPED or
+	 * in WALRCV_WAITING state.
+	 *
+	 * Such an assertion is not possible, now that this function is called by
+	 * startup process on two occasions.  One is just before starting to
+	 * replay WAL when starting up.  And the other is when it has finished
+	 * replaying all WAL in pg_xlog directory.  If the standby is starting up
+	 * after clean shutdown, there is not much WAL to be replayed and both
+	 * calls to this funcion can occur in quick succession.  By the time the
+	 * second request to start streaming is made, the WAL receiver can be in
+	 * any state.  We therefore cannot make any assertion on the state here.
+	 */
+
 	if (walrcv->walRcvState == WALRCV_STOPPED)
 	{
 		launch = true;
 		walrcv->walRcvState = WALRCV_STARTING;
 	}
-	else
+	else if (walrcv->walRcvState == WALRCV_WAITING)
 		walrcv->walRcvState = WALRCV_RESTARTING;
 	walrcv->startTime = now;
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e44f71e991..7633b949fd 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -464,6 +464,12 @@ const struct config_enum_entry ssl_protocol_versions_info[] = {
 	{NULL, 0, false}
 };
 
+const struct config_enum_entry wal_rcv_start_options[] = {
+	{"catchup", WAL_RCV_START_AT_CATCHUP, true},
+	{"consistency", WAL_RCV_START_AT_CONSISTENCY, true},
+	{NULL, 0, false}
+};
+
 static struct config_enum_entry shared_memory_options[] = {
 #ifndef WIN32
 	{"sysv", SHMEM_TYPE_SYSV, false},
@@ -4613,6 +4619,17 @@ static struct config_enum ConfigureNamesEnum[] =
 		check_ssl_max_protocol_version, NULL, NULL
 	},
 
+	{
+		{"wal_receiver_start_condition", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("When to start WAL receiver."),
+			NULL,
+		},
+		&wal_receiver_start_condition,
+		WAL_RCV_START_AT_CATCHUP,
+		wal_rcv_start_options,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index e1048c0047..76c01d640a 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -332,6 +332,7 @@
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
+#wal_receiver_start_condition = 'catchup' # wal receiver start condition
 
 # - Subscribers -
 
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index 19e21ab491..f98f36ffe5 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -234,6 +234,10 @@ main(int argc, char *argv[])
 		   dbState(ControlFile->state));
 	printf(_("pg_control last modified:             %s\n"),
 		   pgctime_str);
+	printf(_("Latest flushed WAL segment number:    %lu\n"),
+		   ControlFile->lastFlushedSeg);
+	printf(_("Latest flushed TimeLineID:            %u\n"),
+		   ControlFile->lastFlushedSegTLI);
 	printf(_("Latest checkpoint location:           %X/%X\n"),
 		   (uint32) (ControlFile->checkPoint >> 32),
 		   (uint32) ControlFile->checkPoint);
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index de5670e538..27260bbea5 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -143,6 +143,11 @@ typedef struct ControlFileData
 	 * to disk, we mustn't start up until we reach X again. Zero when not
 	 * doing archive recovery.
 	 *
+	 * lastFlushedSeg is the WAL segment number of the most recently flushed
+	 * WAL file by walreceiver.  It is updated by walreceiver when a received
+	 * WAL record falls on a new WAL segment file.  This is used as the start
+	 * point to resume WAL streaming if it is stopped.
+	 *
 	 * backupStartPoint is the redo pointer of the backup start checkpoint, if
 	 * we are recovering from an online backup and haven't reached the end of
 	 * backup yet. It is reset to zero when the end of backup is reached, and
@@ -165,6 +170,8 @@ typedef struct ControlFileData
 	 */
 	XLogRecPtr	minRecoveryPoint;
 	TimeLineID	minRecoveryPointTLI;
+	XLogSegNo	lastFlushedSeg;
+	TimeLineID	lastFlushedSegTLI;
 	XLogRecPtr	backupStartPoint;
 	XLogRecPtr	backupEndPoint;
 	bool		backupEndRequired;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index e08afc6548..36daa5fa48 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -22,11 +22,18 @@
 #include "storage/spin.h"
 #include "utils/tuplestore.h"
 
+typedef enum
+{
+	WAL_RCV_START_AT_CATCHUP, /* start a WAL receiver  after replaying all WAL files */
+	WAL_RCV_START_AT_CONSISTENCY /* start a WAL receiver once consistency has been reached */
+} WalRcvStartCondition;
+
 /* user-settable parameters */
 extern bool wal_receiver_create_temp_slot;
 extern int	wal_receiver_status_interval;
 extern int	wal_receiver_timeout;
 extern bool hot_standby_feedback;
+extern int  wal_receiver_start_condition;
 
 /*
  * MAXCONNINFO: maximum size of a connection string.
diff --git a/src/test/recovery/t/018_replay_lag_syncrep.pl b/src/test/recovery/t/018_replay_lag_syncrep.pl
index 9cd79fdc89..121a0a9171 100644
--- a/src/test/recovery/t/018_replay_lag_syncrep.pl
+++ b/src/test/recovery/t/018_replay_lag_syncrep.pl
@@ -74,6 +74,8 @@ $node_standby->init_from_backup($node_master, $backup_name,
 								has_streaming => 1);
 $node_standby->append_conf('postgresql.conf',
 						   q[primary_slot_name = 'phys_slot']);
+$node_standby->append_conf('postgresql.conf',
+						   q[wal_receiver_start_condition = 'consistency']);
 # Enable debug logging in standby
 $node_standby->append_conf('postgresql.conf',
 						   q[log_min_messages = debug5]);
@@ -125,6 +127,11 @@ my $walsender_pid = $node_master->safe_psql(
 # Kill walsender, so that the replication connection breaks.
 kill 'SIGTERM', $walsender_pid;
 
+# Wait for 10 seconds (recovery_min_apply_delay) to give startup
+# process a chance to restart WAL receiver.
+note('waiting for startup process to restart WAL receiver');
+sleep 10;
+
 # The replication connection should be re-establised much earlier than
 # what it takes to finish replay.  Try to commit a transaction with a
 # timeout of 2 seconds.  The timeout should not be hit.
-- 
2.14.3 (Apple Git-98)

