From e9ff2322ef095f1ec911adfa8c077093b7f3d328 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <sherlockcpp@foxmail.com>
Date: Fri, 23 Feb 2024 08:29:30 +0800
Subject: [PATCH v98] Allow logical walsenders to wait for the physical standby

This patch introduces a mechanism to ensure that physical standby servers,
which are potential failover candidates, have received and flushed changes
before making them visible to subscribers. By doing so, it guarantees that
the promoted standby server is not lagging behind the subscribers when a
failover is necessary.

A new parameter named standby_slot_names is introduced. The logical
walsender now guarantees that all local changes are sent and flushed to
the standby servers corresponding to the replication slots specified in
standby_slot_names before sending those changes to the subscriber.

Additionally, The SQL functions pg_logical_slot_get_changes and
pg_replication_slot_advance are modified to wait for the replication slots
mentioned in standby_slot_names to catch up before returning the changes
to the user.
---
 doc/src/sgml/config.sgml                      |  24 ++
 doc/src/sgml/logicaldecoding.sgml             |   8 +
 .../replication/logical/logicalfuncs.c        |  12 +
 src/backend/replication/logical/slotsync.c    |  13 +
 src/backend/replication/slot.c                | 309 +++++++++++++++++-
 src/backend/replication/slotfuncs.c           |   8 +
 src/backend/replication/walsender.c           | 111 ++++++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/backend/utils/misc/guc_tables.c           |  14 +
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/include/replication/slot.h                |   6 +
 src/include/replication/walsender.h           |   1 +
 src/include/replication/walsender_private.h   |   7 +
 src/include/utils/guc_hooks.h                 |   3 +
 src/test/recovery/t/006_logical_decoding.pl   |   3 +-
 .../t/040_standby_failover_slots_sync.pl      | 184 +++++++++++
 16 files changed, 690 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 36a2a5ce43..47bec6484d 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -4420,6 +4420,30 @@ restore_command = 'copy "C:\\server\\archivedir\\%f" "%p"'  # Windows
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-standby-slot-names" xreflabel="standby_slot_names">
+      <term><varname>standby_slot_names</varname> (<type>string</type>)
+      <indexterm>
+       <primary><varname>standby_slot_names</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        List of physical slots guarantees that logical replication slots with
+        failover enabled do not consume changes until those changes are received
+        and flushed to corresponding physical standbys. If a logical replication
+        connection is meant to switch to a physical standby after the standby is
+        promoted, the physical replication slot for the standby should be listed
+        here.
+       </para>
+       <para>
+        The standbys corresponding to the physical replication slots in
+        <varname>standby_slot_names</varname> must configure
+        <literal>sync_replication_slots = true</literal> so they can receive
+        failover logical slots changes from the primary.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 930c0fa8a6..73b2abeda5 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -384,6 +384,14 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      must be enabled on the standby. It is also necessary to specify a valid
      <literal>dbname</literal> in the
      <link linkend="guc-primary-conninfo"><varname>primary_conninfo</varname></link>.
+     It's also highly recommended that the said physical replication slot
+     is named in
+     <link linkend="guc-standby-slot-names"><varname>standby_slot_names</varname></link>
+     list on the primary, to prevent the subscriber from consuming changes
+     faster than the hot standby. But once we configure it, then certain latency
+     is expected in sending changes to logical subscribers due to wait on
+     physical replication slots in
+     <link linkend="guc-standby-slot-names"><varname>standby_slot_names</varname></link>
     </para>
 
     <para>
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b0081d3ce5..0cd09c0e1c 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -109,6 +109,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	MemoryContext per_query_ctx;
 	MemoryContext oldcontext;
 	XLogRecPtr	end_of_wal;
+	XLogRecPtr	wait_for_wal_lsn;
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
 	ArrayType  *arr;
@@ -228,6 +229,17 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 							NameStr(MyReplicationSlot->data.plugin),
 							format_procedure(fcinfo->flinfo->fn_oid))));
 
+		if (XLogRecPtrIsInvalid(upto_lsn))
+			wait_for_wal_lsn = end_of_wal;
+		else
+			wait_for_wal_lsn = Min(upto_lsn, end_of_wal);
+
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to wait_for_wal_lsn.
+		 */
+		WaitForStandbyConfirmation(wait_for_wal_lsn);
+
 		ctx->output_writer_private = p;
 
 		/*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 36773cfe73..de05389d61 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -491,6 +491,10 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	latestFlushPtr = GetStandbyFlushRecPtr(NULL);
 	if (remote_slot->confirmed_lsn > latestFlushPtr)
 	{
+		/*
+		 * Can get here only if GUC 'standby_slot_names' on the primary server
+		 * was not configured correctly.
+		 */
 		ereport(am_slotsync_worker ? LOG : ERROR,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("skipping slot synchronization as the received slot sync"
@@ -860,6 +864,15 @@ validate_remote_info(WalReceiverConn *wrconn)
 	remote_in_recovery = DatumGetBool(slot_getattr(tupslot, 1, &isnull));
 	Assert(!isnull);
 
+	/*
+	 * Slot sync is currently not supported on a cascading standby. This is
+	 * because if we allow it, the primary server needs to wait for all the
+	 * cascading standbys, otherwise, logical subscribers can still be ahead
+	 * of one of the cascading standbys which we plan to promote. Thus, to
+	 * avoid this additional complexity, we restrict it for the time being.
+	 *
+	 * XXX: If needed, this can be attempted in future.
+	 */
 	if (remote_in_recovery)
 		ereport(ERROR,
 				errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 0f173f63a2..c85a8c4d68 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -46,13 +46,18 @@
 #include "common/string.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "postmaster/interrupt.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
+#include "replication/walsender_private.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "utils/builtins.h"
+#include "utils/guc_hooks.h"
+#include "utils/memutils.h"
+#include "utils/varlena.h"
 
 /*
  * Replication slot on-disk data structure.
@@ -115,10 +120,19 @@ ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
 /* My backend's replication slot in the shared memory array */
 ReplicationSlot *MyReplicationSlot = NULL;
 
-/* GUC variable */
+/* GUC variables */
 int			max_replication_slots = 10; /* the maximum number of replication
 										 * slots */
 
+/*
+ * This GUC lists streaming replication standby server slot names that
+ * logical WAL sender processes will wait for.
+ */
+char	   *standby_slot_names;
+
+/* This is parsed and cached list for raw standby_slot_names. */
+static List *standby_slot_names_list = NIL;
+
 static void ReplicationSlotShmemExit(int code, Datum arg);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
@@ -2345,3 +2359,296 @@ GetSlotInvalidationCause(const char *conflict_reason)
 	Assert(found);
 	return result;
 }
+
+/*
+ * A helper function to validate slots specified in GUC standby_slot_names.
+ */
+static bool
+validate_standby_slots(char **newval)
+{
+	char	   *rawname;
+	List	   *elemlist;
+	ListCell   *lc;
+	bool		ok;
+
+	/* Need a modifiable copy of string */
+	rawname = pstrdup(*newval);
+
+	/* Verify syntax and parse string into a list of identifiers */
+	ok = SplitIdentifierString(rawname, ',', &elemlist);
+
+	if (!ok)
+		GUC_check_errdetail("List syntax is invalid.");
+
+	/*
+	 * If there is a syntax error in the name or if the replication slots'
+	 * data is not initialized yet (i.e., we are in the startup process), skip
+	 * the slot verification.
+	 */
+	if (!ok || !ReplicationSlotCtl)
+	{
+		pfree(rawname);
+		list_free(elemlist);
+		return ok;
+	}
+
+	foreach(lc, elemlist)
+	{
+		char	   *name = lfirst(lc);
+		ReplicationSlot *slot;
+
+		slot = SearchNamedReplicationSlot(name, true);
+
+		if (!slot)
+		{
+			GUC_check_errdetail("replication slot \"%s\" does not exist",
+								name);
+			ok = false;
+			break;
+		}
+
+		if (!SlotIsPhysical(slot))
+		{
+			GUC_check_errdetail("\"%s\" is not a physical replication slot",
+								name);
+			ok = false;
+			break;
+		}
+	}
+
+	pfree(rawname);
+	list_free(elemlist);
+	return ok;
+}
+
+/*
+ * GUC check_hook for standby_slot_names
+ */
+bool
+check_standby_slot_names(char **newval, void **extra, GucSource source)
+{
+	if (strcmp(*newval, "") == 0)
+		return true;
+
+	/*
+	 * "*" is not accepted as in that case primary will not be able to know
+	 * for which all standbys to wait for. Even if we have physical slots
+	 * info, there is no way to confirm whether there is any standby
+	 * configured for the known physical slots.
+	 */
+	if (strcmp(*newval, "*") == 0)
+	{
+		GUC_check_errdetail("\"%s\" is not accepted for standby_slot_names",
+							*newval);
+		return false;
+	}
+
+	/* Now verify if the specified slots really exist and have correct type */
+	if (!validate_standby_slots(newval))
+		return false;
+
+	*extra = guc_strdup(ERROR, *newval);
+
+	return true;
+}
+
+/*
+ * GUC assign_hook for standby_slot_names
+ */
+void
+assign_standby_slot_names(const char *newval, void *extra)
+{
+	List	   *standby_slots;
+	MemoryContext oldcxt;
+	char	   *standby_slot_names_cpy = extra;
+
+	list_free(standby_slot_names_list);
+	standby_slot_names_list = NIL;
+
+	/* No value is specified for standby_slot_names. */
+	if (standby_slot_names_cpy == NULL)
+		return;
+
+	if (!SplitIdentifierString(standby_slot_names_cpy, ',', &standby_slots))
+	{
+		/* This should not happen if GUC checked check_standby_slot_names. */
+		elog(ERROR, "invalid list syntax");
+	}
+
+	/*
+	 * Switch to the memory context under which GUC variables are allocated
+	 * (GUCMemoryContext).
+	 */
+	oldcxt = MemoryContextSwitchTo(GetMemoryChunkContext(standby_slot_names_cpy));
+	standby_slot_names_list = list_copy(standby_slots);
+	MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * Return a copy of standby_slot_names_list if the copy flag is set to true,
+ * otherwise return the original list.
+ *
+ * Note that since we do not support syncing slots to cascading standbys, we
+ * return NIL if we are running in a standby to indicate that no standby slots
+ * need to be waited for, regardless of the copy flag value.
+ */
+List *
+GetStandbySlotList(bool copy)
+{
+	if (RecoveryInProgress())
+		return NIL;
+
+	if (copy)
+		return list_copy(standby_slot_names_list);
+	else
+		return standby_slot_names_list;
+}
+
+/*
+ * Filter the standby slots based on the specified log sequence number
+ * (wait_for_lsn).
+ *
+ * This function updates the passed standby_slots list, removing any slots that
+ * have already caught up to or surpassed the given wait_for_lsn. Additionally,
+ * it removes slots that have been invalidated, dropped, or converted to
+ * logical slots.
+ */
+void
+FilterStandbySlots(XLogRecPtr wait_for_lsn, List **standby_slots)
+{
+	ListCell   *lc;
+	List	   *standby_slots_cpy = *standby_slots;
+
+	foreach(lc, standby_slots_cpy)
+	{
+		char	   *name = lfirst(lc);
+		char	   *warningfmt = NULL;
+		ReplicationSlot *slot;
+
+		slot = SearchNamedReplicationSlot(name, true);
+
+		if (!slot)
+		{
+			/*
+			 * It may happen that the slot specified in standby_slot_names GUC
+			 * value is dropped, so let's skip over it.
+			 */
+			warningfmt = _("replication slot \"%s\" specified in parameter %s does not exist, ignoring");
+		}
+		else if (SlotIsLogical(slot))
+		{
+			/*
+			 * If a logical slot name is provided in standby_slot_names, issue
+			 * a WARNING and skip it. Although logical slots are disallowed in
+			 * the GUC check_hook(validate_standby_slots), it is still
+			 * possible for a user to drop an existing physical slot and
+			 * recreate a logical slot with the same name. Since it is
+			 * harmless, a WARNING should be enough, no need to error out.
+			 */
+			warningfmt = _("cannot have logical replication slot \"%s\" in parameter %s, ignoring");
+		}
+		else
+		{
+			SpinLockAcquire(&slot->mutex);
+
+			if (slot->data.invalidated != RS_INVAL_NONE)
+			{
+				/*
+				 * Specified physical slot has been invalidated, so no point
+				 * in waiting for it.
+				 */
+				warningfmt = _("physical slot \"%s\" specified in parameter %s has been invalidated, ignoring");
+			}
+			else if (XLogRecPtrIsInvalid(slot->data.restart_lsn) ||
+					 slot->data.restart_lsn < wait_for_lsn)
+			{
+				bool		inactive = (slot->active_pid == 0);
+
+				SpinLockRelease(&slot->mutex);
+
+				/* Log warning if no active_pid for this physical slot */
+				if (inactive)
+					ereport(WARNING,
+							errmsg("replication slot \"%s\" specified in parameter %s does not have active_pid",
+								   name, "standby_slot_names"),
+							errdetail("Logical replication is waiting on the standby associated with \"%s\".",
+									  name),
+							errhint("Consider starting standby associated with \"%s\" or amend standby_slot_names.",
+									name));
+
+				/* Continue if the current slot hasn't caught up. */
+				continue;
+			}
+			else
+			{
+				Assert(slot->data.restart_lsn >= wait_for_lsn);
+			}
+
+			SpinLockRelease(&slot->mutex);
+		}
+
+		/*
+		 * Reaching here indicates that either the slot has passed the
+		 * wait_for_lsn or there is an issue with the slot that requires a
+		 * warning to be reported.
+		 */
+		if (warningfmt)
+			ereport(WARNING, errmsg(warningfmt, name, "standby_slot_names"));
+
+		standby_slots_cpy = foreach_delete_current(standby_slots_cpy, lc);
+	}
+
+	*standby_slots = standby_slots_cpy;
+}
+
+/*
+ * Wait for physical standby to confirm receiving the given lsn.
+ *
+ * Used by logical decoding SQL functions that acquired failover enabled slot.
+ * It waits for physical standbys corresponding to the physical slots specified
+ * in the standby_slot_names GUC.
+ */
+void
+WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
+{
+	List	   *standby_slots;
+
+	if (!MyReplicationSlot->data.failover)
+		return;
+
+	standby_slots = GetStandbySlotList(true);
+
+	if (standby_slots == NIL)
+		return;
+
+	ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+
+	for (;;)
+	{
+		CHECK_FOR_INTERRUPTS();
+
+		if (ConfigReloadPending)
+		{
+			ConfigReloadPending = false;
+			ProcessConfigFile(PGC_SIGHUP);
+			standby_slots = GetStandbySlotList(true);
+		}
+
+		FilterStandbySlots(wait_for_lsn, &standby_slots);
+
+		/* Exit if done waiting for every slot. */
+		if (standby_slots == NIL)
+			break;
+
+		/*
+		 * We wait for the slots in the standby_slot_names to catch up, but we
+		 * use a timeout (1s) so we can also check the if the
+		 * standby_slot_names has been changed.
+		 */
+		ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
+									WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
+	}
+
+	ConditionVariableCancelSleep();
+	list_free(standby_slots);
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 768a304723..09067febe1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -464,6 +464,8 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 		 * crash, but this makes the data consistent after a clean shutdown.
 		 */
 		ReplicationSlotMarkDirty();
+
+		PhysicalWakeupLogicalWalSnd();
 	}
 
 	return retlsn;
@@ -504,6 +506,12 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 											   .segment_close = wal_segment_close),
 									NULL, NULL, NULL);
 
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to moveto lsn.
+		 */
+		WaitForStandbyConfirmation(moveto);
+
 		/*
 		 * Start reading at the slot's restart_lsn, which we know to point to
 		 * a valid record.
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 13bc3e0aee..1394c353c5 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1728,27 +1728,77 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 		ProcessPendingWrites();
 }
 
+/*
+ * Wake up the logical walsender processes with failover enabled slots if the
+ * currently acquired physical slot is specified in standby_slot_names GUC.
+ */
+void
+PhysicalWakeupLogicalWalSnd(void)
+{
+	ListCell   *lc;
+	List	   *standby_slots;
+
+	Assert(MyReplicationSlot && SlotIsPhysical(MyReplicationSlot));
+
+	standby_slots = GetStandbySlotList(false);
+
+	foreach(lc, standby_slots)
+	{
+		char	   *name = lfirst(lc);
+
+		if (strcmp(name, NameStr(MyReplicationSlot->data.name)) == 0)
+		{
+			ConditionVariableBroadcast(&WalSndCtl->wal_confirm_rcv_cv);
+			return;
+		}
+	}
+}
+
 /*
  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
  *
- * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
- * if we detect a shutdown request (either from postmaster or client)
- * we will return early, so caller must always check.
+ * If the walsender holds a logical slot that has enabled failover, we also
+ * wait for all the specified streaming replication standby servers to
+ * confirm receipt of WAL up to RecentFlushPtr.
+ *
+ * Returns end LSN of flushed WAL.  Normally this will be >= loc, but if we
+ * detect a shutdown request (either from postmaster or client) we will return
+ * early, so caller must always check.
  */
 static XLogRecPtr
 WalSndWaitForWal(XLogRecPtr loc)
 {
 	int			wakeEvents;
+	bool		wait_for_standby = false;
+	uint32		wait_event;
+	List	   *standby_slots = NIL;
 	static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
 
+	if (MyReplicationSlot->data.failover && replication_active)
+		standby_slots = GetStandbySlotList(true);
+
 	/*
-	 * Fast path to avoid acquiring the spinlock in case we already know we
-	 * have enough WAL available. This is particularly interesting if we're
-	 * far behind.
+	 * Check if all the standby servers have confirmed receipt of WAL up to
+	 * RecentFlushPtr even when we already know we have enough WAL available.
+	 *
+	 * Note that we cannot directly return without checking the status of
+	 * standby servers because the standby_slot_names may have changed, which
+	 * means there could be new standby slots in the list that have not yet
+	 * caught up to the RecentFlushPtr.
 	 */
-	if (RecentFlushPtr != InvalidXLogRecPtr &&
-		loc <= RecentFlushPtr)
-		return RecentFlushPtr;
+	if (!XLogRecPtrIsInvalid(RecentFlushPtr) && loc <= RecentFlushPtr)
+	{
+		FilterStandbySlots(RecentFlushPtr, &standby_slots);
+
+		/*
+		 * Fast path to avoid acquiring the spinlock in case we already know
+		 * we have enough WAL available and all the standby servers have
+		 * confirmed receipt of WAL up to RecentFlushPtr. This is particularly
+		 * interesting if we're far behind.
+		 */
+		if (standby_slots == NIL)
+			return RecentFlushPtr;
+	}
 
 	/* Get a more recent flush pointer. */
 	if (!RecoveryInProgress())
@@ -1770,6 +1820,10 @@ WalSndWaitForWal(XLogRecPtr loc)
 		{
 			ConfigReloadPending = false;
 			ProcessConfigFile(PGC_SIGHUP);
+
+			if (MyReplicationSlot->data.failover && replication_active)
+				standby_slots = GetStandbySlotList(true);
+
 			SyncRepInitConfig();
 		}
 
@@ -1784,8 +1838,18 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (got_STOPPING)
 			XLogBackgroundFlush();
 
+		/*
+		 * Update the standby slots that have not yet caught up to the flushed
+		 * position. It is good to wait up to RecentFlushPtr and then let it
+		 * send the changes to logical subscribers one by one which are
+		 * already covered in RecentFlushPtr without needing to wait on every
+		 * change for standby confirmation.
+		 */
+		if (wait_for_standby)
+			FilterStandbySlots(RecentFlushPtr, &standby_slots);
+
 		/* Update our idea of the currently flushed position. */
-		if (!RecoveryInProgress())
+		else if (!RecoveryInProgress())
 			RecentFlushPtr = GetFlushRecPtr(NULL);
 		else
 			RecentFlushPtr = GetXLogReplayRecPtr(NULL);
@@ -1813,9 +1877,18 @@ WalSndWaitForWal(XLogRecPtr loc)
 			!waiting_for_ping_response)
 			WalSndKeepalive(false, InvalidXLogRecPtr);
 
-		/* check whether we're done */
-		if (loc <= RecentFlushPtr)
+		if (loc > RecentFlushPtr)
+			wait_event = WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL;
+		else if (standby_slots)
+		{
+			wait_event = WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION;
+			wait_for_standby = true;
+		}
+		else
+		{
+			/* Already caught up and doesn't need to wait for standby_slots. */
 			break;
+		}
 
 		/* Waiting for new WAL. Since we need to wait, we're now caught up. */
 		WalSndCaughtUp = true;
@@ -1855,9 +1928,11 @@ WalSndWaitForWal(XLogRecPtr loc)
 		if (pq_is_send_pending())
 			wakeEvents |= WL_SOCKET_WRITEABLE;
 
-		WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_FOR_WAL);
+		WalSndWait(wakeEvents, sleeptime, wait_event);
 	}
 
+	list_free(standby_slots);
+
 	/* reactivate latch so WalSndLoop knows to continue */
 	SetLatch(MyLatch);
 	return RecentFlushPtr;
@@ -2265,6 +2340,7 @@ PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
 	{
 		ReplicationSlotMarkDirty();
 		ReplicationSlotsComputeRequiredLSN();
+		PhysicalWakeupLogicalWalSnd();
 	}
 
 	/*
@@ -3538,6 +3614,7 @@ WalSndShmemInit(void)
 
 		ConditionVariableInit(&WalSndCtl->wal_flush_cv);
 		ConditionVariableInit(&WalSndCtl->wal_replay_cv);
+		ConditionVariableInit(&WalSndCtl->wal_confirm_rcv_cv);
 	}
 }
 
@@ -3607,8 +3684,14 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
 	 *
 	 * And, we use separate shared memory CVs for physical and logical
 	 * walsenders for selective wake ups, see WalSndWakeup() for more details.
+	 *
+	 * If the wait event is WAIT_FOR_STANDBY_CONFIRMATION, wait on another CV
+	 * until awakened by physical walsenders after the walreceiver confirms
+	 * the receipt of the LSN.
 	 */
-	if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+	if (wait_event == WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION)
+		ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
+	else if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
 		ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
 	else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
 		ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4fffb46625..2d1c700543 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -78,6 +78,7 @@ GSS_OPEN_SERVER	"Waiting to read data from the client while establishing a GSSAP
 LIBPQWALRECEIVER_CONNECT	"Waiting in WAL receiver to establish connection to remote server."
 LIBPQWALRECEIVER_RECEIVE	"Waiting in WAL receiver to receive data from remote server."
 SSL_OPEN_SERVER	"Waiting for SSL while attempting connection."
+WAIT_FOR_STANDBY_CONFIRMATION	"Waiting for the WAL to be received by physical standby."
 WAL_SENDER_WAIT_FOR_WAL	"Waiting for WAL to be flushed in WAL sender process."
 WAL_SENDER_WRITE_DATA	"Waiting for any activity when processing replies from WAL receiver in WAL sender process."
 
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 527a2b2734..2eea26b5d5 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -4639,6 +4639,20 @@ struct config_string ConfigureNamesString[] =
 		check_debug_io_direct, assign_debug_io_direct, NULL
 	},
 
+	{
+		{"standby_slot_names", PGC_SIGHUP, REPLICATION_PRIMARY,
+			gettext_noop("Lists streaming replication standby server slot "
+						 "names that logical WAL sender processes will wait for."),
+			gettext_noop("Decoded changes are sent out to plugins by logical "
+						 "WAL sender processes only after specified "
+						 "replication slots confirm receiving WAL."),
+			GUC_LIST_INPUT | GUC_LIST_QUOTE
+		},
+		&standby_slot_names,
+		"",
+		check_standby_slot_names, assign_standby_slot_names, NULL
+	},
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index c97f9a25f0..cadfe10958 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -334,6 +334,8 @@
 				# method to choose sync standbys, number of sync standbys,
 				# and comma-separated list of application_name
 				# from standby(s); '*' = all
+#standby_slot_names = '' # streaming replication standby server slot names that
+				# logical walsender processes will wait for
 
 # - Standby Servers -
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index acbf567150..04a2717138 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -226,6 +226,7 @@ extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
 
 /* GUCs */
 extern PGDLLIMPORT int max_replication_slots;
+extern PGDLLIMPORT char *standby_slot_names;
 
 /* shmem initialization functions */
 extern Size ReplicationSlotsShmemSize(void);
@@ -274,4 +275,9 @@ extern void CheckSlotPermissions(void);
 extern ReplicationSlotInvalidationCause
 			GetSlotInvalidationCause(const char *conflict_reason);
 
+extern List *GetStandbySlotList(bool copy);
+extern void FilterStandbySlots(XLogRecPtr wait_for_lsn,
+							   List **standby_slots);
+extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
+
 #endif							/* SLOT_H */
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 0c3996e926..f2d8297f01 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -39,6 +39,7 @@ extern void InitWalSender(void);
 extern bool exec_replication_command(const char *cmd_string);
 extern void WalSndErrorCleanup(void);
 extern void WalSndResourceCleanup(bool isCommit);
+extern void PhysicalWakeupLogicalWalSnd(void);
 extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *tli);
 extern void WalSndSignals(void);
 extern Size WalSndShmemSize(void);
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 3113e9ea47..3c134c8edf 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -113,6 +113,13 @@ typedef struct
 	ConditionVariable wal_flush_cv;
 	ConditionVariable wal_replay_cv;
 
+	/*
+	 * Used by physical walsenders holding slots specified in
+	 * standby_slot_names to wake up logical walsenders holding failover
+	 * enabled slots when a walreceiver confirms the receipt of LSN.
+	 */
+	ConditionVariable wal_confirm_rcv_cv;
+
 	WalSnd		walsnds[FLEXIBLE_ARRAY_MEMBER];
 } WalSndCtlData;
 
diff --git a/src/include/utils/guc_hooks.h b/src/include/utils/guc_hooks.h
index 339c490300..dcf9a040d1 100644
--- a/src/include/utils/guc_hooks.h
+++ b/src/include/utils/guc_hooks.h
@@ -163,5 +163,8 @@ extern bool check_wal_consistency_checking(char **newval, void **extra,
 extern void assign_wal_consistency_checking(const char *newval, void *extra);
 extern bool check_wal_segment_size(int *newval, void **extra, GucSource source);
 extern void assign_wal_sync_method(int new_wal_sync_method, void *extra);
+extern bool check_standby_slot_names(char **newval, void **extra,
+									 GucSource source);
+extern void assign_standby_slot_names(const char *newval, void *extra);
 
 #endif							/* GUC_HOOKS_H */
diff --git a/src/test/recovery/t/006_logical_decoding.pl b/src/test/recovery/t/006_logical_decoding.pl
index 5c7b4ca5e3..85f019774c 100644
--- a/src/test/recovery/t/006_logical_decoding.pl
+++ b/src/test/recovery/t/006_logical_decoding.pl
@@ -172,9 +172,10 @@ is($node_primary->slot('otherdb_slot')->{'slot_name'},
 	undef, 'logical slot was actually dropped with DB');
 
 # Test logical slot advancing and its durability.
+# Pass failover=true (last-arg), it should not have any impact on advancing.
 my $logical_slot = 'logical_slot';
 $node_primary->safe_psql('postgres',
-	"SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false);"
+	"SELECT pg_create_logical_replication_slot('$logical_slot', 'test_decoding', false, false, true);"
 );
 $node_primary->psql(
 	'postgres', "
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index 968aa7b05b..53480f5acf 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -446,11 +446,195 @@ ok( $standby1->poll_query_until(
 		"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
 	'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
 
+##################################################
+# Test primary disallowing specified logical replication slots getting ahead of
+# specified physical replication slots. It uses the following set up:
+#
+#				| ----> standby1 (primary_slot_name = sb1_slot)
+#				| ----> standby2 (primary_slot_name = sb2_slot)
+# primary -----	|
+#				| ----> subscriber1 (failover = true)
+#				| ----> subscriber2 (failover = false)
+#
+# standby_slot_names = 'sb1_slot'
+#
+# Set up is configured in such a way that the logical slot of subscriber1 is
+# enabled failover, thus it will wait for the physical slot of
+# standby1(sb1_slot) to catch up before sending decoded changes to subscriber1.
+##################################################
+
+$backup_name = 'backup3';
+
+$primary->psql('postgres',
+	q{SELECT pg_create_physical_replication_slot('sb2_slot');});
+
+$primary->backup($backup_name);
+
+# Create another standby
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup(
+	$primary, $backup_name,
+	has_streaming => 1,
+	has_restoring => 1);
+$standby2->append_conf(
+	'postgresql.conf', qq(
+primary_slot_name = 'sb2_slot'
+));
+$standby2->start;
+$primary->wait_for_replay_catchup($standby2);
+
+# Configure primary to disallow any logical slots that enabled failover from
+# getting ahead of specified physical replication slot (sb1_slot).
+$primary->append_conf(
+	'postgresql.conf', qq(
+standby_slot_names = 'sb1_slot'
+));
+$primary->reload;
+
+# Create another subscriber node without enabling failover, wait for sync to
+# complete
+my $subscriber2 = PostgreSQL::Test::Cluster->new('subscriber2');
+$subscriber2->init;
+$subscriber2->start;
+$subscriber2->safe_psql(
+	'postgres', qq[
+	CREATE TABLE tab_int (a int PRIMARY KEY);
+	CREATE SUBSCRIPTION regress_mysub2 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub2_slot);
+]);
+
+$subscriber1->wait_for_subscription_sync;
+
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+
+# Stop the standby associated with the specified physical replication slot so
+# that the logical replication slot won't receive changes until the standby
+# comes up.
+$standby1->stop;
+
+# Create some data on the primary
+my $primary_row_count = 20;
+$primary->safe_psql('postgres',
+	"INSERT INTO tab_int SELECT generate_series(11, $primary_row_count);");
+
+# Wait for the standby that's up and running gets the data from primary
+$primary->wait_for_replay_catchup($standby2);
+$result = $standby2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby2 gets data from primary");
+
+# Wait for the subscription that's up and running and is not enabled for failover.
+# It gets the data from primary without waiting for any standbys.
+$primary->wait_for_catchup('regress_mysub2');
+$result = $subscriber2->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "subscriber2 gets data from primary");
+
+# The subscription that's up and running and is enabled for failover
+# doesn't get the data from primary and keeps waiting for the
+# standby specified in standby_slot_names.
+$result =
+  $subscriber1->safe_psql('postgres', "SELECT count(*) <> $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 doesn't get data from primary until standby1 acknowledges changes"
+);
+
+# Start the standby specified in standby_slot_names and wait for it to catch
+# up with the primary.
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+$result = $standby1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't', "standby1 gets data from primary");
+
+# Now that the standby specified in standby_slot_names is up and running,
+# primary must send the decoded changes to subscription enabled for failover
+# While the standby was down, this subscriber didn't receive any data from
+# primary i.e. the primary didn't allow it to go ahead of standby.
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary after standby1 acknowledges changes");
+
+# Stop the standby associated with the specified physical replication slot so
+# that the logical replication slot won't receive changes until the standby
+# slot's restart_lsn is advanced or the slot is removed from the
+# standby_slot_names list.
+$primary->safe_psql('postgres', "TRUNCATE tab_int;");
+$primary->wait_for_catchup('regress_mysub1');
+$standby1->stop;
+
+##################################################
+# Verify that when using pg_logical_slot_get_changes to consume changes from a
+# logical slot with failover enabled, it will also wait for the slots specified
+# in standby_slot_names to catch up.
+##################################################
+
+# Create a logical 'test_decoding' replication slot with failover enabled
+$primary->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('test_slot', 'test_decoding', false, false, true);"
+);
+
+my $back_q = $primary->background_psql('postgres', on_error_stop => 0);
+my $pid = $back_q->query('SELECT pg_backend_pid()');
+
+# Try and get changes from the logical slot with failover enabled.
+my $offset = -s $primary->logfile;
+$back_q->query_until(qr//,
+	"SELECT pg_logical_slot_get_changes('test_slot', NULL, NULL);\n");
+
+# Wait until the primary server logs a warning indicating that it is waiting
+# for the sb1_slot to catch up.
+$primary->wait_for_log(
+	qr/replication slot \"sb1_slot\" specified in parameter standby_slot_names does not have active_pid/,
+	$offset);
+
+ok($primary->safe_psql('postgres', "SELECT pg_cancel_backend($pid)"),
+	"cancelling pg_logical_slot_get_changes command");
+
+$back_q->quit;
+
+$primary->safe_psql('postgres',
+	"SELECT pg_drop_replication_slot('test_slot');"
+);
+
+##################################################
+# Test that logical replication will wait for the user-created inactive
+# physical slot to catch up until we remove the slot from standby_slot_names.
+##################################################
+
+# Create some data on the primary
+$primary_row_count = 10;
+$primary->safe_psql('postgres',
+	"INSERT INTO tab_int SELECT generate_series(1, $primary_row_count);");
+
+$result =
+  $subscriber1->safe_psql('postgres', "SELECT count(*) = 0 FROM tab_int;");
+is($result, 't',
+	"subscriber1 doesn't get data as the sb1_slot doesn't catch up");
+
+# Remove the standby from the standby_slot_names list and reload the
+# configuration.
+$primary->adjust_conf('postgresql.conf', 'standby_slot_names', "''");
+$primary->reload;
+
+# Since there are no slots in standby_slot_names, the primary server should now
+# send the decoded changes to the subscription.
+$primary->wait_for_catchup('regress_mysub1');
+$result = $subscriber1->safe_psql('postgres',
+	"SELECT count(*) = $primary_row_count FROM tab_int;");
+is($result, 't',
+	"subscriber1 gets data from primary after standby1 is removed from the standby_slot_names list"
+);
+
 ##################################################
 # Promote the standby1 to primary. Confirm that:
 # a) the slot 'lsub1_slot' is retained on the new primary
 # b) logical replication for regress_mysub1 is resumed successfully after failover
 ##################################################
+$standby1->start;
+$primary->wait_for_replay_catchup($standby1);
+
 $standby1->promote;
 
 # Update subscription with the new primary's connection info
-- 
2.30.0.windows.2

