From cc8e9fe807ea60c5ff6bb226ee2fbeb7b9c4d349 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <sherlockcpp@foxmail.com>
Date: Fri, 23 Feb 2024 08:29:30 +0800
Subject: [PATCH v1022] Allow logical walsenders to wait for the physical
 standby

This patch introduces a mechanism to ensure that physical standby servers,
which are potential failover candidates, have received and flushed changes
before making them visible to subscribers. By doing so, it guarantees that
the promoted standby server is not lagging behind the subscribers when a
failover is necessary.

A new parameter named 'standby_slot_names' is introduced. The logical
walsender now guarantees that all local changes are sent and flushed to
the standby servers corresponding to the replication slots specified in
'standby_slot_names' before sending those changes to the subscriber.

Additionally, The SQL functions pg_logical_slot_get_changes and
pg_replication_slot_advance are modified to wait for the replication slots
mentioned in 'standby_slot_names' to catch up before returning.
---
 doc/src/sgml/config.sgml                      |  29 ++
 doc/src/sgml/logicaldecoding.sgml             |  12 +
 .../replication/logical/logicalfuncs.c        |  12 +
 src/backend/replication/logical/slotsync.c    |  11 +
 src/backend/replication/slot.c                | 370 +++++++++++++++++-
 src/backend/replication/slotfuncs.c           |  12 +
 src/backend/replication/walsender.c           | 139 ++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  14 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/replication/slot.h                |   5 +
 src/include/replication/walsender.h           |   1 +
 src/include/replication/walsender_private.h   |   7 +
 src/include/utils/guc_hooks.h                 |   3 +
 src/test/recovery/t/006_logical_decoding.pl   |   3 +-
 .../t/040_standby_failover_slots_sync.pl      | 202 ++++++++++
 16 files changed, 802 insertions(+), 21 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 43b1a132a2..ea95561420 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4559,6 +4559,35 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+      <term><varname>standby_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>standby_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Lists the streaming replication standby server slot names that logical
+        WAL sender processes will wait for. Logical WAL sender processes will
+        send decoded changes to plugins only after the specified replication
+        slots confirm receiving WAL. This guarantees that logical replication
+        slots with failover enabled do not consume changes until those changes
+        are received and flushed to corresponding physical standbys. If a
+        logical replication connection is meant to switch to a physical standby
+        after the standby is promoted, the physical replication slot for the
+        standby should be listed here. Note that logical replication will not
+        proceed if the slots specified in the standby_slot_names do not exist or
+        are invalidated.
+       </para>
+       <para>
+        The standbys corresponding to the physical replication slots in
+        <varname>standby_slot_names</varname> must configure
+        <literal>sync_replication_slots = true</literal> so they can receive
+        failover logical slots changes from the primary.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 930c0fa8a6..5da3ac7f4c 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -384,6 +384,18 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      must be enabled on the standby. It is also necessary to specify a valid
      <literal>dbname</literal> in the
      <link linkend="guc-primary-conninfo"><varname>primary_conninfo</varname></link>.
+     It's highly recommended that the said physical replication slot is named in
+     <link linkend="guc-standby-slot-names"><varname>standby_slot_names</varname></link>
+     list on the primary, to prevent the subscriber from consuming changes
+     faster than the hot standby. Even when correctly configured, some latency
+     is expected when sending changes to logical subscribers due to the waiting
+     on slots named in
+     <link linkend="guc-standby-slot-names"><varname>standby_slot_names</varname></link>.
+     When <varname>standby_slot_names</varname> is utilized, the
+     primary server will not completely shut down until the corresponding
+     standbys, associated with the physical replication slots specified
+     in <varname>standby_slot_names</varname>, have confirmed
+     receiving the WAL up to the latest flushed position on the primary server.
     </para>
 
     <para>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b0081d3ce5..f28d6c9015 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -109,6 +109,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	XLogRecPtr	end_of_wal;
+	XLogRecPtr	wait_for_wal_lsn;
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	ArrayType  *arr;
@@ -228,6 +229,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 							NameStr(MyReplicationSlot->data.plugin),
 							format_procedure(fcinfo->flinfo->fn_oid))));
 
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to wait_for_wal_lsn.
+		 */
+		if (XLogRecPtrIsInvalid(upto_lsn))
+			wait_for_wal_lsn = end_of_wal;
+		else
+			wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
+
+		WaitForStandbyConfirmation(wait_for_wal_lsn);
+
 		ctx->output_writer_private = p;
 
 		/*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8ecb85b86a..d1562cf7b3 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -491,6 +491,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	latestFlushPtr = GetStandbyFlushRecPtr(NULL);
 	if (remote_slot->confirmed_lsn > latestFlushPtr)
 	{
+		/*
+		 * Can get here only if GUC 'standby_slot_names' on the primary server
+		 * was not configured correctly.
+		 */
 		ereport(am_slotsync_worker ? LOG : ERROR,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("skipping slot synchronization as the received slot sync"
@@ -860,6 +864,13 @@ validate_remote_info(WalReceiverConn *wrconn)
 	remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
 	Assert(!isnull);
 
+	/*
+	 * Slot sync is currently not supported on a cascading standby. This is
+	 * because if we allow it, the primary server needs to wait for all the
+	 * cascading standbys, otherwise, logical subscribers can still be ahead
+	 * of one of the cascading standbys which we plan to promote. Thus, to
+	 * avoid this additional complexity, we restrict it for the time being.
+	 */
 	if (remote_in_recovery)
 		ereport(ERROR,
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0f173f63a2..2f4bba2592 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,13 +46,18 @@
 #include "common/string.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
+#include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/memutils.h"
+#include "utils/varlena.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -115,10 +120,25 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 /* My backend's replication slot in the shared memory array */
 ReplicationSlot *MyReplicationSlot = NULL;
 
-/* GUC variable */
+/* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 
+/*
+ * This GUC lists streaming replication standby server slot names that
+ * logical WAL sender processes will wait for.
+ */
+char	   *standby_slot_names;
+
+/* This is parsed and cached list for raw standby_slot_names. */
+static List *standby_slot_names_list = NIL;
+
+/*
+ * Oldest LSN that has been confirmed to be flushed to the standbys
+ * corresponding to the physical slots specified in the standby_slot_names GUC.
+ */
+static XLogRecPtr standby_slot_oldest_flush_lsn = InvalidXLogRecPtr;
+
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -2345,3 +2365,351 @@ GetSlotInvalidationCause(const char *conflict_reason)
 	Assert(found);
 	return result;
 }
+
+/*
+ * A helper function to validate slots specified in GUC standby_slot_names.
+ */
+static bool
+validate_standby_slots(char **newval)
+{
+	char	   *rawname;
+	List	   *elemlist;
+	bool		ok;
+
+	/* Need a modifiable copy of string */
+	rawname = pstrdup(*newval);
+
+	/* Verify syntax and parse string into a list of identifiers */
+	ok = SplitIdentifierString(rawname, ',', &elemlist);
+
+	if (!ok)
+	{
+		GUC_check_errdetail("List syntax is invalid.");
+	}
+	else if (!ReplicationSlotCtl)
+	{
+		/*
+		 * We cannot validate the replication slot if the replication slots'
+		 * data has not been initialized. This is ok as we will validate the
+		 * specified slot when waiting for them to catch up. See
+		 * StandbySlotsHaveCaughtup for details.
+		 */
+	}
+	else
+	{
+		/*
+		 * If the replication slots' data have been initialized, verify if the
+		 * specified slots exist and are logical slots.
+		 */
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+		foreach_ptr(char, name, elemlist)
+		{
+			ReplicationSlot *slot;
+
+			slot = SearchNamedReplicationSlot(name, false);
+
+			if (!slot)
+			{
+				GUC_check_errdetail("replication slot \"%s\" does not exist",
+									name);
+				ok = false;
+				break;
+			}
+
+			if (!SlotIsPhysical(slot))
+			{
+				GUC_check_errdetail("\"%s\" is not a physical replication slot",
+									name);
+				ok = false;
+				break;
+			}
+		}
+
+		LWLockRelease(ReplicationSlotControlLock);
+	}
+
+	pfree(rawname);
+	list_free(elemlist);
+	return ok;
+}
+
+/*
+ * GUC check_hook for standby_slot_names
+ */
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+	if ((*newval)[0] == '\0')
+		return true;
+
+	/*
+	 * "*" is not accepted as in that case primary will not be able to know
+	 * for which all standbys to wait for. Even if we have physical slots
+	 * info, there is no way to confirm whether there is any standby
+	 * configured for the known physical slots.
+	 */
+	if (strcmp(*newval, "*") == 0)
+	{
+		GUC_check_errdetail("\"*\" is not accepted for standby_slot_names");
+		return false;
+	}
+
+	/* Now verify if the specified slots really exist and have correct type */
+	if (!validate_standby_slots(newval))
+		return false;
+
+	*extra = guc_strdup(ERROR, *newval);
+
+	return true;
+}
+
+/*
+ * GUC assign_hook for standby_slot_names
+ */
+void
+assign_standby_slot_names(const char *newval, void *extra)
+{
+	List	   *standby_slots;
+	MemoryContext oldcxt;
+	char	   *standby_slot_names_cpy = extra;
+
+	/*
+	 * The standby slots may have changed, so we need to recompute the oldest
+	 * LSN.
+	 */
+	standby_slot_oldest_flush_lsn = InvalidXLogRecPtr;
+
+	list_free(standby_slot_names_list);
+	standby_slot_names_list = NIL;
+
+	/* No value is specified for standby_slot_names. */
+	if (standby_slot_names_cpy == NULL)
+		return;
+
+	if (!SplitIdentifierString(standby_slot_names_cpy, ',', &standby_slots))
+	{
+		/* This should not happen if GUC checked check_standby_slot_names. */
+		elog(ERROR, "invalid list syntax");
+	}
+
+	/*
+	 * Switch to the memory context under which GUC variables are allocated
+	 * (GUCMemoryContext).
+	 */
+	oldcxt = MemoryContextSwitchTo(GetMemoryChunkContext(standby_slot_names_cpy));
+	standby_slot_names_list = list_copy(standby_slots);
+	MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Return the standby_slot_names_list.
+ *
+ * Note that since we do not support syncing slots to cascading standbys, we
+ * return NIL if we are running in a standby to indicate that no standby slots
+ * need to be waited for.
+ */
+List *
+GetStandbySlotList(void)
+{
+	if (RecoveryInProgress())
+		return NIL;
+	else
+		return standby_slot_names_list;
+}
+
+/*
+ * Return true if the slots specified in standby_slot_names have caught up to
+ * the given WAL location, false otherwise.
+ *
+ * The elevel parameter determines the error level used for logging messages
+ * related to slots that do not exist, are invalidated, or are inactive.
+ */
+bool
+StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
+{
+	int			caught_up_slot_num = 0;
+	XLogRecPtr	min_restart_lsn = InvalidXLogRecPtr;
+
+	/*
+	 * Don't need to wait for the standby to catch up if there is no value in
+	 * standby_slot_names.
+	 */
+	if (!standby_slot_names_list)
+		return true;
+
+	/*
+	 * If we are on a standby server, we do not need to wait for the standby to
+	 * catch up since we do not support syncing slots to cascading standbys.
+	 */
+	if (RecoveryInProgress())
+		return true;
+
+	/*
+	 * Return true if all the standbys have already caught up to the passed in
+	 * WAL localtion.
+	 */
+	if (!XLogRecPtrIsInvalid(standby_slot_oldest_flush_lsn) &&
+		standby_slot_oldest_flush_lsn >= wait_for_lsn)
+		return true;
+
+	/*
+	 * To prevent concurrent slot dropping and creation while filtering the
+	 * slots, take the ReplicationSlotControlLock outside of the loop.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+	foreach_ptr(char, name, standby_slot_names_list)
+	{
+		XLogRecPtr	restart_lsn;
+		bool		invalidated;
+		bool		inactive;
+		ReplicationSlot *slot;
+
+		slot = SearchNamedReplicationSlot(name, false);
+
+		if (!slot)
+		{
+			/*
+			 * It may happen that the slot specified in standby_slot_names GUC
+			 * value is dropped, so let's skip over it.
+			 */
+			ereport(elevel,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("replication slot \"%s\" specified in parameter %s does not exist",
+						   name, "standby_slot_names"),
+					errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+							  name),
+					errhint("Consider creating the slot \"%s\" or amend parameter %s.",
+							name, "standby_slot_names"));
+			continue;
+		}
+
+		if (SlotIsLogical(slot))
+		{
+			/*
+			 * If a logical slot name is provided in standby_slot_names, report
+			 * a message and skip it. It is possible for user to specify a
+			 * logical slot name in standby_slot_names just before the server
+			 * startup. The GUC check_hook(validate_standby_slots) can not
+			 * validate such a slot during startup as the ReplicationSlotCtl
+			 * shared memory is not initialized by that time. It is also
+			 * possible for user to drop an existing physical slot and
+			 * recreate a logical slot with the same name.
+			 */
+			ereport(elevel,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("cannot have logical replication slot \"%s\" in parameter %s",
+						   name, "standby_slot_names"),
+					errdetail("Logical replication is waiting for correction on \"%s\".",
+							  name),
+					errhint("Consider removing logical slot \"%s\" from parameter %s.",
+							name, "standby_slot_names"));
+			continue;
+		}
+
+		SpinLockAcquire(&slot->mutex);
+		restart_lsn = slot->data.restart_lsn;
+		invalidated = slot->data.invalidated != RS_INVAL_NONE;
+		inactive = slot->active_pid == 0;
+		SpinLockRelease(&slot->mutex);
+
+		if (invalidated)
+		{
+			/* Specified physical slot has been invalidated */
+			ereport(elevel,
+					errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+					errmsg("physical slot \"%s\" specified in parameter %s has been invalidated",
+						   name, "standby_slot_names"),
+					errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+							  name),
+					errhint("Consider dropping and recreating the slot \"%s\" or amend parameter %s.",
+							name, "standby_slot_names"));
+			continue;
+		}
+
+		if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
+		{
+			/* Log a message if no active_pid for this physical slot */
+			if (inactive)
+				ereport(elevel,
+						errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+						errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+							   name, "standby_slot_names"),
+						errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+								  name),
+						errhint("Consider starting standby associated with \"%s\" or amend parameter %s.",
+								name, "standby_slot_names"));
+
+			/* Continue if the current slot hasn't caught up. */
+			continue;
+		}
+
+		Assert(restart_lsn >= wait_for_lsn);
+
+		if (XLogRecPtrIsInvalid(min_restart_lsn) ||
+			min_restart_lsn > restart_lsn)
+			min_restart_lsn = restart_lsn;
+
+		caught_up_slot_num++;
+	}
+
+	LWLockRelease(ReplicationSlotControlLock);
+
+	/*
+	 * Return false if not all the standbys have caught up to the specified WAL
+	 * location.
+	 */
+	if (caught_up_slot_num != list_length(standby_slot_names_list))
+		return false;
+
+	standby_slot_oldest_flush_lsn = min_restart_lsn;
+
+	return true;
+}
+
+/*
+ * Wait for physical standbys to confirm receiving the given lsn.
+ *
+ * Used by logical decoding SQL functions that acquired failover enabled slot.
+ * It waits for physical standbys corresponding to the physical slots specified
+ * in the standby_slot_names GUC.
+ */
+void
+WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
+{
+	/*
+	 * Don't need to wait for the standby to catch up if the current acquired
+	 * slot is not a failover enabled slot, or there is no value in
+	 * standby_slot_names.
+	 */
+	if (!MyReplicationSlot->data.failover || !standby_slot_names_list)
+		return;
+
+	ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+
+	for (;;)
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+		}
+
+		/* Exit if done waiting for every slot. */
+		if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
+			break;
+
+		/*
+		 * We wait for the slots in the standby_slot_names to catch up, but we
+		 * use a timeout (1s) so we can also check if the standby_slot_names
+		 * has been changed.
+		 */
+		ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
+									WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
+	}
+
+	ConditionVariableCancelSleep();
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 768a304723..d864fe4133 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -464,6 +464,12 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 		 * crash, but this makes the data consistent after a clean shutdown.
 		 */
 		ReplicationSlotMarkDirty();
+
+		/*
+		 * Wake up logical walsenders holding failover enabled slots after
+		 * updating the restart_lsn of the physical slot.
+		 */
+		PhysicalWakeupLogicalWalSnd();
 	}
 
 	return retlsn;
@@ -504,6 +510,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 											   .segment_close = wal_segment_close),
 									NULL, NULL, NULL);
 
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to moveto lsn.
+		 */
+		WaitForStandbyConfirmation(moveto);
+
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
 		 * a valid record.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 13bc3e0aee..5a64bf60b7 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1728,26 +1728,104 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 		ProcessPendingWrites();
 }
 
+/*
+ * Wake up the logical walsender processes with failover enabled slots if the
+ * currently acquired physical slot is specified in standby_slot_names GUC.
+ */
+void
+PhysicalWakeupLogicalWalSnd(void)
+{
+	List	   *standby_slots;
+
+	Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot));
+
+	standby_slots = GetStandbySlotList();
+
+	foreach_ptr(char, name, standby_slots)
+	{
+		if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0)
+		{
+			ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+			return;
+		}
+	}
+}
+
+/*
+ * Returns true if the WALs before the target_lsn has been flushed to the disk,
+ * or if not all standbys have caught up to the flushed position (flushed_lsn)
+ * when failover_slot is true; otherwise, returns false.
+ *
+ * Set prioritize_stop to true to skip waiting for WALs if the shutdown signal
+ * is received.
+ *
+ * Set failover_slot to true if the current acquired slot is a failover enabled
+ * slot and we are streaming.
+ *
+ * If returning true, the function sets the appropriate wait event in
+ * wait_event; otherwise, wait_event is set to 0.
+ */
+static bool
+NeedToWaitForWal(XLogRecPtr target_lsn, XLogRecPtr flushed_lsn,
+				 bool prioritize_stop, bool failover_slot,
+				 uint32 *wait_event)
+{
+	int	elevel = got_STOPPING ? ERROR : WARNING;
+
+	/*
+	 * Check if we need to wait for WALs to be flushed to disk. We don't need
+	 * to wait for WALs after receiving the shutdown signal unless the
+	 * prioritize_stop is false.
+	 */
+	if (target_lsn > flushed_lsn && !(prioritize_stop && got_STOPPING))
+		*wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
+
+	/*
+	 * Check if the standby slots have caught up to the flushed position. It is
+	 * good to wait up to flushed position and then let it send the changes to
+	 * logical subscribers one by one which are already covered in flushed
+	 * position without needing to wait on every change for standby
+	 * confirmation. Note that after receiving the shutdown signal, an ERROR is
+	 * reported if any slots are dropped, invalidated, or inactive. This
+	 * measure is taken to prevent the walsender from waiting indefinitely.
+	 */
+	else if (failover_slot && !StandbySlotsHaveCaughtup(flushed_lsn, elevel))
+		*wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
+	else
+		*wait_event = 0;
+
+	return *wait_event != 0;
+}
+
 /*
  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
  *
- * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
- * if we detect a shutdown request (either from postmaster or client)
- * we will return early, so caller must always check.
+ * If the walsender holds a logical slot that has enabled failover, we also
+ * wait for all the specified streaming replication standby servers to
+ * confirm receipt of WAL up to RecentFlushPtr.
+ *
+ * Returns end LSN of flushed WAL.  Normally this will be >= loc, but if we
+ * detect a shutdown request (either from postmaster or client) we will return
+ * early, so caller must always check.
  */
 static XLogRecPtr
 WalSndWaitForWal(XLogRecPtr loc)
 {
 	int			wakeEvents;
+	bool		failover_slot;
+	uint32		wait_event = 0;
 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
 
+	failover_slot = (replication_active && MyReplicationSlot->data.failover);
+
 	/*
 	 * Fast path to avoid acquiring the spinlock in case we already know we
-	 * have enough WAL available. This is particularly interesting if we're
-	 * far behind.
+	 * have enough WAL available and all the standby servers have confirmed
+	 * receipt of WAL up to RecentFlushPtr. This is particularly interesting if
+	 * we're far behind.
 	 */
-	if (RecentFlushPtr != InvalidXLogRecPtr &&
-		loc <= RecentFlushPtr)
+	if (!XLogRecPtrIsInvalid(RecentFlushPtr) &&
+		!NeedToWaitForWal(loc, RecentFlushPtr, false, failover_slot, &wait_event))
 		return RecentFlushPtr;
 
 	/* Get a more recent flush pointer. */
@@ -1784,20 +1862,29 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (got_STOPPING)
 			XLogBackgroundFlush();
 
-		/* Update our idea of the currently flushed position. */
-		if (!RecoveryInProgress())
-			RecentFlushPtr = GetFlushRecPtr(NULL);
-		else
-			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+		/*
+		 * Update our idea of the currently flushed position only if we are not
+		 * waiting for standbys to catch up, otherwise the standby would have
+		 * to catch up to a newer WAL location in each cycle.
+		 */
+		if (wait_event != WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+		{
+			if (!RecoveryInProgress())
+				RecentFlushPtr = GetFlushRecPtr(NULL);
+			else
+				RecentFlushPtr = GetXLogReplayRecPtr(NULL);
+		}
 
 		/*
-		 * If postmaster asked us to stop, don't wait anymore.
+		 * If postmaster asked us to stop and the standby slots have caught up
+		 * to the flushed position, don't wait anymore.
 		 *
 		 * It's important to do this check after the recomputation of
 		 * RecentFlushPtr, so we can send all remaining data before shutting
 		 * down.
 		 */
-		if (got_STOPPING)
+		if (got_STOPPING && (!failover_slot ||
+			StandbySlotsHaveCaughtup(RecentFlushPtr, ERROR)))
 			break;
 
 		/*
@@ -1813,11 +1900,17 @@ WalSndWaitForWal(XLogRecPtr loc)
 			!waiting_for_ping_response)
 			WalSndKeepalive(false, InvalidXLogRecPtr);
 
-		/* check whether we're done */
-		if (loc <= RecentFlushPtr)
+		/*
+		 * Exit the loop if already caught up and doesn't need to wait for
+		 * standby slots.
+		 */
+		if (!NeedToWaitForWal(loc, RecentFlushPtr, true, failover_slot, &wait_event))
 			break;
 
-		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
+		/*
+		 * Waiting for new WAL or waiting for standbys to catch up. Since
+		 * we need to wait, we're now caught up.
+		 */
 		WalSndCaughtUp = true;
 
 		/*
@@ -1855,7 +1948,7 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (pq_is_send_pending())
 			wakeEvents |= WL_SOCKET_WRITEABLE;
 
-		WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
+		WalSndWait(wakeEvents, sleeptime, wait_event);
 	}
 
 	/* reactivate latch so WalSndLoop knows to continue */
@@ -2265,6 +2358,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		ReplicationSlotMarkDirty();
 		ReplicationSlotsComputeRequiredLSN();
+		PhysicalWakeupLogicalWalSnd();
 	}
 
 	/*
@@ -3538,6 +3632,7 @@ WalSndShmemInit(void)
 
 		ConditionVariableInit(&WalSndCtl->wal_flush_cv);
 		ConditionVariableInit(&WalSndCtl->wal_replay_cv);
+		ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv);
 	}
 }
 
@@ -3607,8 +3702,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
 	 *
 	 * And, we use separate shared memory CVs for physical and logical
 	 * walsenders for selective wake ups, see WalSndWakeup() for more details.
+	 *
+	 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
+	 * until awakened by physical walsenders after the walreceiver confirms
+	 * the receipt of the LSN.
 	 */
-	if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+	if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+		ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+	else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
 		ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
 	else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
 		ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index ec2f31f82a..c08e00d1d6 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -78,6 +78,7 @@ GSS_OPEN_SERVER	"Waiting to read data from the client while establishing a GSSAP
 LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to remote server."
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
+WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for WAL to be received and flushed by the physical standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 93ded31ed9..326cefcf4b 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4670,6 +4670,20 @@ struct config_string ConfigureNamesString[] =
 		check_debug_io_direct, assign_debug_io_direct, NULL
 	},
 
+	{
+		{"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+			gettext_noop("Lists streaming replication standby server slot "
+						 "names that logical WAL sender processes will wait for."),
+			gettext_noop("Logical WAL sender processes will send decoded "
+						 "changes to plugins only after the specified  "
+						 "replication slots confirm receiving WAL."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&standby_slot_names,
+		"",
+		check_standby_slot_names, assign_standby_slot_names, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index edcc0282b2..2244ee52f7 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -343,6 +343,8 @@
 				# method to choose sync standbys, number of sync standbys,
 				# and comma-separated list of application_name
 				# from standby(s); '*' = all
+#standby_slot_names = '' # streaming replication standby server slot names that
+				# logical walsender processes will wait for
 
 # - Standby Servers -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index acbf567150..61a6b97952 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -226,6 +226,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT char *standby_slot_names;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -274,4 +275,8 @@ extern void CheckSlotPermissions(void);
 extern ReplicationSlotInvalidationCause
 			GetSlotInvalidationCause(const char *conflict_reason);
 
+extern List *GetStandbySlotList(void);
+extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
+extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
+
 #endif							/* SLOT_H */
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 0c3996e926..f2d8297f01 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -39,6 +39,7 @@ extern void InitWalSender(void);
 extern bool exec_replication_command(const char *cmd_string);
 extern void WalSndErrorCleanup(void);
 extern void WalSndResourceCleanup(bool isCommit);
+extern void PhysicalWakeupLogicalWalSnd(void);
 extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 3113e9ea47..3c134c8edf 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -113,6 +113,13 @@ typedef struct
 	ConditionVariable wal_flush_cv;
 	ConditionVariable wal_replay_cv;
 
+	/*
+	 * Used by physical walsenders holding slots specified in
+	 * standby_slot_names to wake up logical walsenders holding failover
+	 * enabled slots when a walreceiver confirms the receipt of LSN.
+	 */
+	ConditionVariable wal_confirm_rcv_cv;
+
 	WalSnd		walsnds[FLEXIBLE_ARRAY_MEMBER];
 } WalSndCtlData;
 
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index c8a7aa9a11..d64dc5fcdb 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -174,5 +174,8 @@ extern bool check_wal_consistency_checking(char **newval, void **extra,
 extern void assign_wal_consistency_checking(const char *newval, void *extra);
 extern bool check_wal_segment_size(int *newval, void **extra, GucSource source);
 extern void assign_wal_sync_method(int new_wal_sync_method, void *extra);
+extern bool check_standby_slot_names(char **newval, void **extra,
+									 GucSource source);
+extern void assign_standby_slot_names(const char *newval, void *extra);
 
 #endif							/* GUC_HOOKS_H */
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 5c7b4ca5e3..b95d95c06f 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'},
 	undef, 'logical slot was actually dropped with DB');
 
 # Test logical slot advancing and its durability.
+# Passing failover=true (last arg) should not have any impact on advancing.
 my $logical_slot = 'logical_slot';
 $node_primary->safe_psql('postgres',
-	"SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);"
+	"SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);"
 );
 $node_primary->psql(
 	'postgres', "
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 825c26da6f..9a43951b92 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -500,11 +500,213 @@ ok( $standby1->poll_query_until(
 		"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
 	'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
 
+##################################################
+# Test primary disallowing specified logical replication slots getting ahead of
+# specified physical replication slots. It uses the following set up:
+#
+#				(physical standbys)
+#				| ----> standby1 (primary_slot_name = sb1_slot)
+#				| ----> standby2 (primary_slot_name = sb2_slot)
+# primary -----	|
+#				(logical replication)
+#				| ----> subscriber1 (failover = true, slot_name = lsub1_slot)
+#				| ----> subscriber2 (failover = false, slot_name = lsub2_slot)
+#
+# standby_slot_names = 'sb1_slot'
+#
+# Set up is configured in such a way that the logical slot of subscriber1 is
+# enabled for failover, thus it will wait for the physical slot of
+# standby1(sb1_slot) to catch up before sending decoded changes to subscriber1.
+##################################################
+
+$backup_name = 'backup3';
+
+$primary->psql('postgres',
+	q{SELECT pg_create_physical_replication_slot('sb2_slot');});
+
+$primary->backup($backup_name);
+
+# Create another standby
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby2->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'sb2_slot'
+));
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Configure primary to disallow any logical slots that enabled failover from
+# getting ahead of specified physical replication slot (sb1_slot).
+$primary->append_conf(
+	'postgresql.conf', qq(
+standby_slot_names = 'sb1_slot'
+));
+$primary->reload;
+
+# Create another subscriber node without enabling failover, wait for sync to
+# complete
+my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
+$subscriber2->init;
+$subscriber2->start;
+$subscriber2->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tab_int (a int PRIMARY KEY);
+	CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);
+]);
+
+$subscriber2->wait_for_subscription_sync;
+
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+
+# Stop the standby associated with the specified physical replication slot
+# (sb1_slot) so that the logical replication slot (lsub1_slot) won't receive
+# changes until the standby comes up.
+$standby1->stop;
+
+# Create some data on the primary
+my $primary_row_count = 20;
+$primary->safe_psql('postgres',
+	"INSERT INTO tab_int SELECT generate_series(11, $primary_row_count);");
+
+# Wait until the standby2 that's still running gets the data from the primary
+$primary->wait_for_replay_catchup($standby2);
+$result = $standby2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby2 gets data from primary");
+
+# Wait for regress_mysub2 to get the data from the primary. This subscription
+# was not enabled for failover so it gets the data without waiting for any
+# standbys.
+$primary->wait_for_catchup('regress_mysub2');
+$result = $subscriber2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "subscriber2 gets data from primary");
+
+# The regress_mysub1 was enabled for failover so it doesn't get the data from
+# primary and keeps waiting for the standby specified in standby_slot_names
+# (sb1_slot aka standby1).
+$result =
+  $subscriber1->safe_psql('postgres', "SELECT count(*) <> $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 doesn't get data from primary until standby1 acknowledges changes"
+);
+
+# Start the standby specified in standby_slot_names (sb1_slot aka standby1) and
+# wait for it to catch up with the primary.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Now that the standby specified in standby_slot_names is up and running, the
+# primary can send the decoded changes to the subscription enabled for failover
+# (i.e. regress_mysub1). While the standby was down, regress_mysub1 didn't
+# receive any data from the primary. i.e. the primary didn't allow it to go
+# ahead of standby.
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary after standby1 acknowledges changes");
+
+##################################################
+# Verify that when using pg_logical_slot_get_changes to consume changes from a
+# logical slot with failover enabled, it will also wait for the slots specified
+# in standby_slot_names to catch up.
+##################################################
+
+# Stop the standby associated with the specified physical replication slot so
+# that the logical replication slot won't receive changes until the standby
+# slot's restart_lsn is advanced or the slot is removed from the
+# standby_slot_names list.
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+$primary->wait_for_catchup('regress_mysub1');
+$standby1->stop;
+
+# Create a logical 'test_decoding' replication slot with failover enabled
+$primary->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);"
+);
+
+my $back_q = $primary->background_psql(
+	'postgres',
+	on_error_stop => 0,
+	timeout => $PostgreSQL::Test::Utils::timeout_default);
+
+# Try and get changes from the logical slot with failover enabled.
+my $offset = -s $primary->logfile;
+$back_q->query_until(qr//,
+	"SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);\n");
+
+# Wait until the primary server logs a warning indicating that it is waiting
+# for the sb1_slot to catch up.
+$primary->wait_for_log(
+	qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
+	$offset);
+
+# Remove the standby from the standby_slot_names list and reload the
+# configuration.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''");
+$primary->reload;
+
+# Since there are no slots in standby_slot_names, the function
+# pg_logical_slot_get_changes should now return, and the session can be
+# stopped.
+$back_q->quit;
+
+$primary->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('test_slot');"
+);
+
+# Add the physical slot (sb1_slot) back to the standby_slot_names for further
+# tests.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "'sb1_slot'");
+$primary->reload;
+
+##################################################
+# Test that logical replication will wait for the user-created inactive
+# physical slot to catch up until we remove the slot from standby_slot_names.
+##################################################
+
+# Create some data on the primary
+$primary_row_count = 10;
+$primary->safe_psql('postgres',
+	"INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);");
+
+# The regress_mysub1 doesn't get the data from primary because the specified
+# standby slot (sb1_slot) in standby_slot_names is inactive.
+$result =
+  $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;");
+is($result, 't',
+	"subscriber1 doesn't get data as the sb1_slot doesn't catch up");
+
+# Remove the standby from the standby_slot_names list and reload the
+# configuration.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''");
+$primary->reload;
+
+# Since there are no slots in standby_slot_names, the primary server should now
+# send the decoded changes to the subscription.
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary after standby1 is removed from the standby_slot_names list"
+);
+
 ##################################################
 # Promote the standby1 to primary. Confirm that:
 # a) the slot 'lsub1_slot' is retained on the new primary
 # b) logical replication for regress_mysub1 is resumed successfully after failover
 ##################################################
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
 $standby1->promote;
 
 # Update subscription with the new primary's connection info
-- 
2.30.0.windows.2

