Hi Daniel,

Thanks for checking in on this patch.
Attached rebased version.

Regards,
Soumyadeep (VMware)
From 51149e4f877dc2f8bf47a1356fc8b0ec2512ac6d Mon Sep 17 00:00:00 2001
From: Soumyadeep Chakraborty <soumyadeep2...@gmail.com>
Date: Fri, 19 Nov 2021 00:33:17 -0800
Subject: [PATCH v4 1/1] Introduce feature to start WAL receiver eagerly

This commit introduces a new GUC wal_receiver_start_condition which can
enable the standby to start it's WAL receiver at an earlier stage. The
GUC will default to starting the WAL receiver after WAL from archives
and pg_wal have been exhausted, designated by the value 'exhaust'.
The value of 'startup' indicates that the WAL receiver will be started
immediately on standby startup. Finally, the value of 'consistency'
indicates that the server will start after the standby has replayed up
to the consistency point.

If 'startup' or 'consistency' is specified, the starting point for the
WAL receiver will always be the end of all locally available WAL in
pg_wal. The end is determined by finding the latest WAL segment in
pg_wal and then iterating to the earliest segment. The iteration is
terminated as soon as a valid WAL segment is found. Streaming can then
commence from the start of that segment.

Co-authors: Ashwin Agrawal, Asim Praveen, Wu Hao, Konstantin Knizhnik
Discussion:
https://www.postgresql.org/message-id/flat/CANXE4Tc3FNvZ_xAimempJWv_RH9pCvsZH7Yq93o1VuNLjUT-mQ%40mail.gmail.com
---
 doc/src/sgml/config.sgml                      |  33 ++++
 src/backend/access/transam/xlog.c             | 170 +++++++++++++++++-
 src/backend/replication/walreceiver.c         |   1 +
 src/backend/utils/misc/guc.c                  |  18 ++
 src/backend/utils/misc/postgresql.conf.sample |   3 +-
 src/include/replication/walreceiver.h         |  10 ++
 src/test/recovery/t/027_walreceiver_start.pl  |  96 ++++++++++
 7 files changed, 321 insertions(+), 10 deletions(-)
 create mode 100644 src/test/recovery/t/027_walreceiver_start.pl

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 3f806740d5d..b911615bf04 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4604,6 +4604,39 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-wal-receiver-start-condition" xreflabel="wal_receiver_start_condition">
+      <term><varname>wal_receiver_start_condition</varname> (<type>enum</type>)
+      <indexterm>
+       <primary><varname>wal_receiver_start_condition</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies when the WAL receiver process will be started for a standby
+        server.
+        The allowed values of <varname>wal_receiver_start_condition</varname>
+        are <literal>startup</literal> (start immediately when the standby starts),
+        <literal>consistency</literal> (start only after reaching consistency), and
+        <literal>exhaust</literal> (start only after all WAL from the archive and
+        pg_wal has been replayed)
+         The default setting is<literal>exhaust</literal>.
+       </para>
+
+       <para>
+        Traditionally, the WAL receiver process is started only after the
+        standby server has exhausted all WAL from the WAL archive and the local
+        pg_wal directory. In some environments there can be a significant volume
+        of local WAL left to replay, along with a large volume of yet to be
+        streamed WAL. Such environments can benefit from setting
+        <varname>wal_receiver_start_condition</varname> to
+        <literal>startup</literal> or <literal>consistency</literal>. These
+        values will lead to the WAL receiver starting much earlier, and from
+        the end of locally available WAL. The network will be utilized to stream
+        WAL concurrently with replay, improving performance significantly.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-wal-receiver-status-interval" xreflabel="wal_receiver_status_interval">
       <term><varname>wal_receiver_status_interval</varname> (<type>integer</type>)
       <indexterm>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 16164483688..9c1243a2f85 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -983,6 +983,8 @@ static XLogRecPtr XLogBytePosToRecPtr(uint64 bytepos);
 static XLogRecPtr XLogBytePosToEndRecPtr(uint64 bytepos);
 static uint64 XLogRecPtrToBytePos(XLogRecPtr ptr);
 static void checkXLogConsistency(XLogReaderState *record);
+static void StartWALReceiverEagerlyIfPossible(WalRcvStartCondition startPoint,
+											  TimeLineID currentTLI);
 
 static void WALInsertLockAcquire(void);
 static void WALInsertLockAcquireExclusive(void);
@@ -1538,6 +1540,152 @@ checkXLogConsistency(XLogReaderState *record)
 	}
 }
 
+/*
+ * Start WAL receiver eagerly without waiting to play all WAL from the archive
+ * and pg_wal. First, find the last valid WAL segment in pg_wal and then request
+ * streaming to commence from it's beginning. startPoint signifies whether we
+ * are trying the eager start right at startup or once we have reached
+ * consistency.
+ */
+static void
+StartWALReceiverEagerlyIfPossible(WalRcvStartCondition startPoint,
+								  TimeLineID currentTLI)
+{
+	DIR		   *dir;
+	struct dirent *de;
+	XLogSegNo	startsegno = -1;
+	XLogSegNo	endsegno = -1;
+
+	/*
+	 * We should not be starting the walreceiver during bootstrap/init
+	 * processing.
+	 */
+	if (!IsNormalProcessingMode())
+		return;
+
+	/* Only the startup process can request an eager walreceiver start. */
+	Assert(AmStartupProcess());
+
+	/* Return if we are not set up to start the WAL receiver eagerly. */
+	if (wal_receiver_start_condition == WAL_RCV_START_AT_EXHAUST)
+		return;
+
+	/*
+	 * Sanity checks: We must be in standby mode with primary_conninfo set up
+	 * for streaming replication, the WAL receiver should not already have
+	 * started and the intended startPoint must match the start condition GUC.
+	 */
+	if (!StandbyModeRequested || WalRcvStreaming() ||
+		!PrimaryConnInfo || strcmp(PrimaryConnInfo, "") == 0 ||
+		startPoint != wal_receiver_start_condition)
+		return;
+
+	/*
+	 * We must have reached consistency if we wanted to start the walreceiver
+	 * at the consistency point.
+	 */
+	if (wal_receiver_start_condition == WAL_RCV_START_AT_CONSISTENCY &&
+		!reachedConsistency)
+		return;
+
+	/* Find the latest and earliest WAL segments in pg_wal */
+	dir = AllocateDir("pg_wal");
+	while ((de = ReadDir(dir, "pg_wal")) != NULL)
+	{
+		/* Does it look like a WAL segment? */
+		if (IsXLogFileName(de->d_name))
+		{
+			int			logSegNo;
+			int			tli;
+
+			XLogFromFileName(de->d_name, &tli, &logSegNo, wal_segment_size);
+			if (tli != currentTLI)
+			{
+				/*
+				 * It seems wrong to stream WAL on a timeline different from
+				 * the one we are replaying on. So, bail in case a timeline
+				 * change is noticed.
+				 */
+				ereport(LOG,
+						(errmsg("Could not start streaming WAL eagerly"),
+						 errdetail("There are timeline changes in the locally available WAL files."),
+						 errhint("WAL streaming will begin once all local WAL and archives are exhausted.")));
+				return;
+			}
+			startsegno = (startsegno == -1) ? logSegNo : Min(startsegno, logSegNo);
+			endsegno = (endsegno == -1) ? logSegNo : Max(endsegno, logSegNo);
+		}
+	}
+	FreeDir(dir);
+
+	/*
+	 * We should have at least one valid WAL segment in pg_wal. By this point,
+	 * we must have read at the segment that included the checkpoint record we
+	 * started replaying from.
+	 */
+	Assert(startsegno != -1 && endsegno != -1);
+
+	/* Find the latest valid WAL segment and request streaming from its start */
+	while (endsegno >= startsegno)
+	{
+		XLogReaderState *state;
+		XLogRecPtr	startptr;
+		WALReadError errinfo;
+		char		xlogfname[MAXFNAMELEN];
+
+		XLogSegNoOffsetToRecPtr(endsegno, 0, wal_segment_size, startptr);
+		XLogFileName(xlogfname, currentTLI, endsegno,
+					 wal_segment_size);
+
+		state = XLogReaderAllocate(wal_segment_size, NULL,
+								   XL_ROUTINE(.segment_open = wal_segment_open,
+											  .segment_close = wal_segment_close),
+								   NULL);
+		if (!state)
+			ereport(ERROR,
+					(errcode(ERRCODE_OUT_OF_MEMORY),
+					 errmsg("out of memory"),
+					 errdetail("Failed while allocating a WAL reading processor.")));
+
+		/*
+		 * Read the first page of the current WAL segment and validate it by
+		 * inspecting the page header. Once we find a valid WAL segment, we
+		 * can request WAL streaming from its beginning.
+		 */
+		XLogBeginRead(state, startptr);
+
+		if (!WALRead(state, state->readBuf, startptr, XLOG_BLCKSZ,
+					 currentTLI,
+					 &errinfo))
+			WALReadRaiseError(&errinfo);
+
+		if (XLogReaderValidatePageHeader(state, startptr, state->readBuf))
+		{
+			ereport(LOG,
+					errmsg("Requesting stream from beginning of: %s",
+						   xlogfname));
+			XLogReaderFree(state);
+			RequestXLogStreaming(currentTLI, startptr, PrimaryConnInfo,
+								 PrimarySlotName, wal_receiver_create_temp_slot);
+			return;
+		}
+
+		ereport(LOG,
+				errmsg("Invalid WAL segment found while calculating stream start: %s. Skipping.",
+					   xlogfname));
+
+		XLogReaderFree(state);
+		endsegno--;
+	}
+
+	/*
+	 * We should never reach here as we should have at least one valid WAL
+	 * segment in pg_wal. By this point, we must have read at the segment that
+	 * included the checkpoint record we started replaying from.
+	 */
+	Assert(false);
+}
+
 /*
  * Subroutine of XLogInsertRecord.  Copies a WAL record to an already-reserved
  * area in the WAL.
@@ -7056,6 +7204,9 @@ StartupXLOG(void)
 		wasShutdown = ((record->xl_info & ~XLR_INFO_MASK) == XLOG_CHECKPOINT_SHUTDOWN);
 	}
 
+	/* Start WAL receiver eagerly if requested. */
+	StartWALReceiverEagerlyIfPossible(WAL_RCV_START_AT_STARTUP, replayTLI);
+
 	/*
 	 * Clear out any old relcache cache files.  This is *necessary* if we do
 	 * any WAL replay, since that would probably result in the cache files
@@ -8241,7 +8392,8 @@ StartupXLOG(void)
 /*
  * Checks if recovery has reached a consistent state. When consistency is
  * reached and we have a valid starting standby snapshot, tell postmaster
- * that it can start accepting read-only connections.
+ * that it can start accepting read-only connections. Also, attempt to start
+ * the WAL receiver eagerly if so configured.
  */
 static void
 CheckRecoveryConsistency(void)
@@ -8332,6 +8484,10 @@ CheckRecoveryConsistency(void)
 
 		SendPostmasterSignal(PMSIGNAL_BEGIN_HOT_STANDBY);
 	}
+
+	/* Start WAL receiver eagerly if requested. */
+	StartWALReceiverEagerlyIfPossible(WAL_RCV_START_AT_CONSISTENCY,
+									  XLogCtl->lastReplayedTLI);
 }
 
 /*
@@ -12716,10 +12872,12 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 
 					/*
 					 * Move to XLOG_FROM_STREAM state, and set to start a
-					 * walreceiver if necessary.
+					 * walreceiver if necessary. The WAL receiver may have
+					 * already started (if it was configured to start
+					 * eagerly).
 					 */
 					currentSource = XLOG_FROM_STREAM;
-					startWalReceiver = true;
+					startWalReceiver = !WalRcvStreaming();
 					break;
 
 				case XLOG_FROM_STREAM:
@@ -12829,12 +12987,6 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
 			case XLOG_FROM_ARCHIVE:
 			case XLOG_FROM_PG_WAL:
 
-				/*
-				 * WAL receiver must not be running when reading WAL from
-				 * archive or pg_wal.
-				 */
-				Assert(!WalRcvStreaming());
-
 				/* Close any old file we might have open. */
 				if (readFile >= 0)
 				{
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7a7eb3784e7..ad623a47f43 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -89,6 +89,7 @@
 int			wal_receiver_status_interval;
 int			wal_receiver_timeout;
 bool		hot_standby_feedback;
+int			wal_receiver_start_condition;
 
 /* libpqwalreceiver connection */
 static WalReceiverConn *wrconn = NULL;
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index e91d5a3cfda..598f89c841b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -557,6 +557,13 @@ static const struct config_enum_entry wal_compression_options[] = {
 	{NULL, 0, false}
 };
 
+const struct config_enum_entry wal_rcv_start_options[] = {
+	{"exhaust", WAL_RCV_START_AT_EXHAUST, false},
+	{"consistency", WAL_RCV_START_AT_CONSISTENCY, false},
+	{"startup", WAL_RCV_START_AT_STARTUP, false},
+	{NULL, 0, false}
+};
+
 /*
  * Options for enum values stored in other modules
  */
@@ -5007,6 +5014,17 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_receiver_start_condition", PGC_POSTMASTER, REPLICATION_STANDBY,
+			gettext_noop("When to start WAL receiver."),
+			NULL,
+		},
+		&wal_receiver_start_condition,
+		WAL_RCV_START_AT_EXHAUST,
+		wal_rcv_start_options,
+		NULL, NULL, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, 0, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 1cbc9feeb6f..faca3156afb 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -343,7 +343,8 @@
 #wal_retrieve_retry_interval = 5s	# time to wait before retrying to
 					# retrieve WAL after a failed attempt
 #recovery_min_apply_delay = 0		# minimum delay for applying changes during recovery
-
+#wal_receiver_start_condition = 'exhaust'#	'exhaust', 'consistency', or 'startup'
+					# (change requires restart)
 # - Subscribers -
 
 # These settings are ignored on a publisher.
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 0b607ed777b..a0d14791f13 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -24,9 +24,19 @@
 #include "storage/spin.h"
 #include "utils/tuplestore.h"
 
+typedef enum
+{
+	WAL_RCV_START_AT_STARTUP,	/* start a WAL receiver immediately at startup */
+	WAL_RCV_START_AT_CONSISTENCY,	/* start a WAL receiver once consistency
+									 * has been reached */
+	WAL_RCV_START_AT_EXHAUST,	/* start a WAL receiver after WAL from archive
+								 * and pg_wal has been replayed (default) */
+} WalRcvStartCondition;
+
 /* user-settable parameters */
 extern int	wal_receiver_status_interval;
 extern int	wal_receiver_timeout;
+extern int	wal_receiver_start_condition;
 extern bool hot_standby_feedback;
 
 /*
diff --git a/src/test/recovery/t/027_walreceiver_start.pl b/src/test/recovery/t/027_walreceiver_start.pl
new file mode 100644
index 00000000000..991d9bb6658
--- /dev/null
+++ b/src/test/recovery/t/027_walreceiver_start.pl
@@ -0,0 +1,96 @@
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Checks for wal_receiver_start_condition = 'consistency'
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use File::Copy;
+use Test::More tests => 2;
+
+# Initialize primary node and start it.
+my $node_primary = PostgreSQL::Test::Cluster->new('test');
+$node_primary->init(allows_streaming => 1);
+$node_primary->start;
+
+# Initial workload.
+$node_primary->safe_psql(
+	'postgres', qq {
+CREATE TABLE test_walreceiver_start(i int);
+SELECT pg_switch_wal();
+});
+
+# Take backup.
+my $backup_name = 'my_backup';
+$node_primary->backup($backup_name);
+
+# Run a post-backup workload, whose WAL we will manually copy over to the
+# standby before it starts.
+my $wal_file_to_copy = $node_primary->safe_psql('postgres',
+	"SELECT pg_walfile_name(pg_current_wal_lsn());");
+$node_primary->safe_psql(
+	'postgres', qq {
+INSERT INTO test_walreceiver_start VALUES(1);
+SELECT pg_switch_wal();
+});
+
+# Initialize standby node from the backup and copy over the post-backup WAL.
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+$node_standby->init_from_backup($node_primary, $backup_name,
+	has_streaming => 1);
+copy($node_primary->data_dir . '/pg_wal/' . $wal_file_to_copy,
+	$node_standby->data_dir . '/pg_wal')
+  or die "Copy failed: $!";
+
+# Set up a long delay to prevent the standby from replaying past the first
+# commit outside the backup.
+$node_standby->append_conf('postgresql.conf',
+	"recovery_min_apply_delay = '2h'");
+# Set up the walreceiver to start as soon as consistency is reached.
+$node_standby->append_conf('postgresql.conf',
+	"wal_receiver_start_condition = 'consistency'");
+
+$node_standby->start();
+
+# The standby should have reached consistency and should be blocked waiting for
+# recovery_min_apply_delay.
+$node_standby->poll_query_until(
+	'postgres', qq{
+SELECT wait_event = 'RecoveryApplyDelay' FROM pg_stat_activity
+WHERE backend_type='startup';
+}) or die "Timed out checking if startup is in recovery_min_apply_delay";
+
+# The walreceiver should have started, streaming from the end of valid locally
+# available WAL, i.e from the WAL file that was copied over.
+$node_standby->poll_query_until('postgres',
+	"SELECT COUNT(1) = 1 FROM pg_stat_wal_receiver;")
+  or die "Timed out while waiting for streaming to start";
+my $receive_start_lsn = $node_standby->safe_psql('postgres',
+	'SELECT receive_start_lsn FROM pg_stat_wal_receiver');
+is( $node_primary->safe_psql(
+		'postgres', "SELECT pg_walfile_name('$receive_start_lsn');"),
+	$wal_file_to_copy,
+	"walreceiver started from end of valid locally available WAL");
+
+# Now run a workload which should get streamed over.
+$node_primary->safe_psql(
+	'postgres', qq {
+SELECT pg_switch_wal();
+INSERT INTO test_walreceiver_start VALUES(2);
+});
+
+# The walreceiver should be caught up, including all WAL generated post backup.
+$node_primary->wait_for_catchup('standby', 'flush');
+
+# Now clear the delay so that the standby can replay the received WAL.
+$node_standby->safe_psql('postgres',
+	'ALTER SYSTEM SET recovery_min_apply_delay TO 0;');
+$node_standby->reload;
+
+# Now the replay should catch up.
+$node_primary->wait_for_catchup('standby', 'replay');
+is( $node_standby->safe_psql(
+		'postgres', 'SELECT count(*) FROM test_walreceiver_start;'),
+	2,
+	"querying test_walreceiver_start now should return 2 rows");
-- 
2.25.1

Reply via email to