From cf737220af5e5a8fdad77f3a8abfb29a44f88be3 Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Wed, 3 Apr 2024 09:21:51 +0530
Subject: [PATCH v9] Ensure that the sync slots reach a consistent state after
 promotion without losing data.

We were directly copying the LSN locations while syncing the slots on the
standby. Now, it is possible that at some particular restart_lsn there are
some running xacts, which means if we start reading the WAL from that
location after promotion, we won't reach a consistent snapshot state at
that point. However, on the primary, we would have been already a
consistent snapshot state at that restart_lsn so we would have just
serialized the existing snapshot.

To avoid this problem we will use the advance_slot functionality unless
the snapshot already exists at the synced restart_lsn location. This will
help us to ensure that snapbuilder/slot statuses are updated properly
without generating any changes. Note that the synced slot will remain as
RS_TEMPORARY till the decoding from corresponding restart_lsn can reach a
consistent snapshot state after which they will be marked as
RS_PERSISTENT.

Per buildfarm

Author: Hou Zhijie
Reviewed-by: Bertrand Drouvot, Shveta Malik, Bharath Rupireddy, Amit Kapila
Discussion: https://postgr.es/m/OS0PR01MB5716B3942AE49F3F725ACA92943B2@OS0PR01MB5716.jpnprd01.prod.outlook.com
---
 src/backend/replication/logical/logical.c     | 147 +++++++++++++++++-
 src/backend/replication/logical/slotsync.c    | 133 +++++++++++-----
 src/backend/replication/logical/snapbuild.c   |  23 +++
 src/backend/replication/slotfuncs.c           | 118 +-------------
 src/include/replication/logical.h             |   2 +
 src/include/replication/snapbuild.h           |   2 +
 .../t/040_standby_failover_slots_sync.pl      |  97 ++++++++++--
 7 files changed, 351 insertions(+), 171 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c0..97a4d99c4e 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/slotsync.h"
 #include "replication/snapbuild.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot use physical replication slot for logical decoding")));
 
-	if (slot->data.database != MyDatabaseId)
+	/*
+	 * We need to access the system tables during decoding to build the
+	 * logical changes unless we are in fast_forward mode where no changes are
+	 * generated.
+	 */
+	if (slot->data.database != MyDatabaseId && !fast_forward)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("replication slot \"%s\" was not created in this database",
 						NameStr(slot->data.name))));
 
 	/*
-	 * Do not allow consumption of a "synchronized" slot until the standby
-	 * gets promoted.
+	 * The slots being synced from the primary can't be used for decoding as
+	 * they are used after failover. However, we do allow advancing the LSNs
+	 * during the synchronization of slots. See update_local_synced_slot.
 	 */
-	if (RecoveryInProgress() && slot->data.synced)
+	if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
 		ereport(ERROR,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2041,135 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
 
 	return has_pending_wal;
 }
+
+/*
+ * Helper function for advancing our logical replication slot forward.
+ *
+ * The slot's restart_lsn is used as start point for reading records, while
+ * confirmed_flush is used as base point for the decoding context.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
+ *
+ * *found_consistent_snapshot will be true if the initial decoding snapshot has
+ * been built; Otherwise, it will be false.
+ */
+XLogRecPtr
+LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
+									bool *found_consistent_snapshot)
+{
+	LogicalDecodingContext *ctx;
+	ResourceOwner old_resowner = CurrentResourceOwner;
+	XLogRecPtr	retlsn;
+
+	Assert(moveto != InvalidXLogRecPtr);
+
+	if (found_consistent_snapshot)
+		*found_consistent_snapshot = false;
+
+	PG_TRY();
+	{
+		/*
+		 * Create our decoding context in fast_forward mode, passing start_lsn
+		 * as InvalidXLogRecPtr, so that we start processing from my slot's
+		 * confirmed_flush.
+		 */
+		ctx = CreateDecodingContext(InvalidXLogRecPtr,
+									NIL,
+									true,	/* fast_forward */
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
+									NULL, NULL, NULL);
+
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to moveto lsn.
+		 */
+		WaitForStandbyConfirmation(moveto);
+
+		/*
+		 * Start reading at the slot's restart_lsn, which we know to point to
+		 * a valid record.
+		 */
+		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+		/* invalidate non-timetravel entries */
+		InvalidateSystemCaches();
+
+		/* Decode records until we reach the requested target */
+		while (ctx->reader->EndRecPtr < moveto)
+		{
+			char	   *errm = NULL;
+			XLogRecord *record;
+
+			/*
+			 * Read records.  No changes are generated in fast_forward mode,
+			 * but snapbuilder/slot statuses are updated properly.
+			 */
+			record = XLogReadRecord(ctx->reader, &errm);
+			if (errm)
+				elog(ERROR, "could not find record while advancing replication slot: %s",
+					 errm);
+
+			/*
+			 * Process the record.  Storage-level changes are ignored in
+			 * fast_forward mode, but other modules (such as snapbuilder)
+			 * might still have critical updates to do.
+			 */
+			if (record)
+				LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		if (found_consistent_snapshot && DecodingContextReady(ctx))
+			*found_consistent_snapshot = true;
+
+		/*
+		 * Logical decoding could have clobbered CurrentResourceOwner during
+		 * transaction management, so restore the executor's value.  (This is
+		 * a kluge, but it's not worth cleaning up right now.)
+		 */
+		CurrentResourceOwner = old_resowner;
+
+		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+		{
+			LogicalConfirmReceivedLocation(moveto);
+
+			/*
+			 * If only the confirmed_flush LSN has changed the slot won't get
+			 * marked as dirty by the above. Callers on the walsender
+			 * interface are expected to keep track of their own progress and
+			 * don't need it written out. But SQL-interface users cannot
+			 * specify their own start positions and it's harder for them to
+			 * keep track of their progress, so we should make more of an
+			 * effort to save it for them.
+			 *
+			 * Dirty the slot so it is written out at the next checkpoint. The
+			 * LSN position advanced to may still be lost on a crash but this
+			 * makes the data consistent after a clean shutdown.
+			 */
+			ReplicationSlotMarkDirty();
+		}
+
+		retlsn = MyReplicationSlot->data.confirmed_flush;
+
+		/* free context, call shutdown callback */
+		FreeDecodingContext(ctx);
+
+		InvalidateSystemCaches();
+	}
+	PG_CATCH();
+	{
+		/* clear all timetravel entries */
+		InvalidateSystemCaches();
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return retlsn;
+}
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..9ac847b780 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -25,6 +25,15 @@
  * which slot sync worker can perform the sync periodically or user can call
  * pg_sync_replication_slots() periodically to perform the syncs.
  *
+ * If synchronized slots fail to build a consistent snapshot from the
+ * restart_lsn before reaching confirmed_flush_lsn, they would become
+ * unreliable after promotion due to potential data loss from changes
+ * before reaching a consistent point. This can happen because the slots can
+ * be synced at some random time and we may not reach the consistent point
+ * at the same WAL location as the primary. So, we mark such slots as
+ * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
+ * consistent point, they will be marked as RS_PERSISTENT.
+ *
  * The slot sync worker waits for some time before the next synchronization,
  * with the duration varying based on whether any slots were updated during
  * the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -49,8 +58,9 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
-#include "replication/slot.h"
+#include "replication/logical.h"
 #include "replication/slotsync.h"
+#include "replication/snapbuild.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg);
  *
  * If no update was needed (the data of the remote slot is the same as the
  * local slot) return false, otherwise true.
+ *
+ * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
+ * modified, and decoding from the corresponding LSN's can reach a
+ * consistent snapshot.
  */
 static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+						 bool *found_consistent_snapshot)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
-	bool		xmin_changed;
-	bool		restart_lsn_changed;
-	NameData	plugin_name;
+	bool		slot_updated = false;
 
 	Assert(slot->data.invalidated == RS_INVAL_NONE);
 
-	xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
-	restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+	if (found_consistent_snapshot)
+		*found_consistent_snapshot = false;
 
-	if (!xmin_changed &&
-		!restart_lsn_changed &&
-		remote_dbid == slot->data.database &&
-		remote_slot->two_phase == slot->data.two_phase &&
-		remote_slot->failover == slot->data.failover &&
-		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
-		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
-		return false;
+	if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
+		remote_slot->restart_lsn != slot->data.restart_lsn ||
+		remote_slot->catalog_xmin != slot->data.catalog_xmin)
+	{
+		/*
+		 * We can't directly copy the remote slot's LSN or xmin unless there
+		 * exists a consistent snapshot at that point. Otherwise, after
+		 * promotion, the slots may not reach a consistent point before the
+		 * confirmed_flush_lsn which can lead to a data loss. To avoid data
+		 * loss, we let slot machinery advance the slot which ensures that
+		 * snapbuilder/slot statuses are updated properly.
+		 */
+		if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
+		{
+			/*
+			 * Update the slot info directly if there is a serialized snapshot
+			 * at the restart_lsn, as the slot can quickly reach consistency
+			 * at restart_lsn by restoring the snapshot.
+			 */
+			SpinLockAcquire(&slot->mutex);
+			slot->data.restart_lsn = remote_slot->restart_lsn;
+			slot->data.confirmed_flush = remote_slot->confirmed_lsn;
+			slot->data.catalog_xmin = remote_slot->catalog_xmin;
+			slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+			SpinLockRelease(&slot->mutex);
 
-	/* Avoid expensive operations while holding a spinlock. */
-	namestrcpy(&plugin_name, remote_slot->plugin);
-
-	SpinLockAcquire(&slot->mutex);
-	slot->data.plugin = plugin_name;
-	slot->data.database = remote_dbid;
-	slot->data.two_phase = remote_slot->two_phase;
-	slot->data.failover = remote_slot->failover;
-	slot->data.restart_lsn = remote_slot->restart_lsn;
-	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
-	slot->data.catalog_xmin = remote_slot->catalog_xmin;
-	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
-	SpinLockRelease(&slot->mutex);
-
-	if (xmin_changed)
-		ReplicationSlotsComputeRequiredXmin(false);
+			if (found_consistent_snapshot)
+				*found_consistent_snapshot = true;
+		}
+		else
+		{
+			LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
+												found_consistent_snapshot);
+		}
 
-	if (restart_lsn_changed)
+		ReplicationSlotsComputeRequiredXmin(false);
 		ReplicationSlotsComputeRequiredLSN();
 
-	return true;
+		slot_updated = true;
+	}
+
+	if (remote_dbid != slot->data.database ||
+		remote_slot->two_phase != slot->data.two_phase ||
+		remote_slot->failover != slot->data.failover ||
+		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+	{
+		NameData	plugin_name;
+
+		/* Avoid expensive operations while holding a spinlock. */
+		namestrcpy(&plugin_name, remote_slot->plugin);
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.plugin = plugin_name;
+		slot->data.database = remote_dbid;
+		slot->data.two_phase = remote_slot->two_phase;
+		slot->data.failover = remote_slot->failover;
+		SpinLockRelease(&slot->mutex);
+
+		slot_updated = true;
+	}
+
+	return slot_updated;
 }
 
 /*
@@ -413,6 +458,7 @@ static bool
 update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool		found_consistent_snapshot = false;
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +489,22 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		return false;
 	}
 
-	/* First time slot update, the function must return true */
-	if (!update_local_synced_slot(remote_slot, remote_dbid))
-		elog(ERROR, "failed to update slot");
+	(void) update_local_synced_slot(remote_slot, remote_dbid,
+									&found_consistent_snapshot);
+
+	/*
+	 * Don't persist the slot if it cannot reach the consistent point from the
+	 * restart_lsn. See comments atop this file.
+	 */
+	if (!found_consistent_snapshot)
+	{
+		ereport(LOG,
+				errmsg("could not sync slot \"%s\"", remote_slot->name),
+				errdetail("Logical decoding cannot find consistent point from local slot's LSN %X/%X.",
+						  LSN_FORMAT_ARGS(slot->data.restart_lsn)));
+
+		return false;
+	}
 
 	ReplicationSlotPersist();
 
@@ -578,7 +637,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 					 LSN_FORMAT_ARGS(remote_slot->restart_lsn));
 
 			/* Make sure the slot changes persist across server restart */
-			if (update_local_synced_slot(remote_slot, remote_dbid))
+			if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
 			{
 				ReplicationSlotMarkDirty();
 				ReplicationSlotSave();
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ac24b51860..e37e22f441 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void)
 	}
 	FreeDir(snap_dir);
 }
+
+/*
+ * Check if a logical snapshot at the specified point has been serialized.
+ */
+bool
+SnapBuildSnapshotExists(XLogRecPtr lsn)
+{
+	char		path[MAXPGPATH];
+	int			ret;
+	struct stat stat_buf;
+
+	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
+			LSN_FORMAT_ARGS(lsn));
+
+	ret = stat(path, &stat_buf);
+
+	if (ret != 0 && errno != ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m", path)));
+
+	return ret == 0;
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index da57177c25..dd6c1d5a7e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -492,125 +492,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 }
 
 /*
- * Helper function for advancing our logical replication slot forward.
- *
- * The slot's restart_lsn is used as start point for reading records, while
- * confirmed_flush is used as base point for the decoding context.
- *
- * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
- * because we need to digest WAL to advance restart_lsn allowing to recycle
- * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
- * mode, no changes are generated anyway.
+ * Advance our logical replication slot forward. See
+ * LogicalSlotAdvanceAndCheckSnapState for details.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
-	LogicalDecodingContext *ctx;
-	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn;
-
-	Assert(moveto != InvalidXLogRecPtr);
-
-	PG_TRY();
-	{
-		/*
-		 * Create our decoding context in fast_forward mode, passing start_lsn
-		 * as InvalidXLogRecPtr, so that we start processing from my slot's
-		 * confirmed_flush.
-		 */
-		ctx = CreateDecodingContext(InvalidXLogRecPtr,
-									NIL,
-									true,	/* fast_forward */
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
-
-		/*
-		 * Wait for specified streaming replication standby servers (if any)
-		 * to confirm receipt of WAL up to moveto lsn.
-		 */
-		WaitForStandbyConfirmation(moveto);
-
-		/*
-		 * Start reading at the slot's restart_lsn, which we know to point to
-		 * a valid record.
-		 */
-		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
-
-		/* invalidate non-timetravel entries */
-		InvalidateSystemCaches();
-
-		/* Decode records until we reach the requested target */
-		while (ctx->reader->EndRecPtr < moveto)
-		{
-			char	   *errm = NULL;
-			XLogRecord *record;
-
-			/*
-			 * Read records.  No changes are generated in fast_forward mode,
-			 * but snapbuilder/slot statuses are updated properly.
-			 */
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
-				elog(ERROR, "could not find record while advancing replication slot: %s",
-					 errm);
-
-			/*
-			 * Process the record.  Storage-level changes are ignored in
-			 * fast_forward mode, but other modules (such as snapbuilder)
-			 * might still have critical updates to do.
-			 */
-			if (record)
-				LogicalDecodingProcessRecord(ctx, ctx->reader);
-
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/*
-		 * Logical decoding could have clobbered CurrentResourceOwner during
-		 * transaction management, so restore the executor's value.  (This is
-		 * a kluge, but it's not worth cleaning up right now.)
-		 */
-		CurrentResourceOwner = old_resowner;
-
-		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
-		{
-			LogicalConfirmReceivedLocation(moveto);
-
-			/*
-			 * If only the confirmed_flush LSN has changed the slot won't get
-			 * marked as dirty by the above. Callers on the walsender
-			 * interface are expected to keep track of their own progress and
-			 * don't need it written out. But SQL-interface users cannot
-			 * specify their own start positions and it's harder for them to
-			 * keep track of their progress, so we should make more of an
-			 * effort to save it for them.
-			 *
-			 * Dirty the slot so it is written out at the next checkpoint. The
-			 * LSN position advanced to may still be lost on a crash but this
-			 * makes the data consistent after a clean shutdown.
-			 */
-			ReplicationSlotMarkDirty();
-		}
-
-		retlsn = MyReplicationSlot->data.confirmed_flush;
-
-		/* free context, call shutdown callback */
-		FreeDecodingContext(ctx);
-
-		InvalidateSystemCaches();
-	}
-	PG_CATCH();
-	{
-		/* clear all timetravel entries */
-		InvalidateSystemCaches();
-
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	return retlsn;
+	return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..aff38e8d04 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -149,5 +149,7 @@ extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
 extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
+													  bool *found_consistent_snapshot);
 
 #endif
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index fbdf362396..a3360a1c5e 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -91,4 +91,6 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
 										 struct xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
 
+extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
+
 #endif							/* SNAPBUILD_H */
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..336cd4b764 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -365,6 +365,68 @@ ok( $stderr =~
 
 $cascading_standby->stop;
 
+##################################################
+# Create a failover slot and advance the restart_lsn to a position where a
+# running transaction exists. This setup is for testing that the synced slots
+# can achieve the consistent snapshot state starting from the restart_lsn
+# after promotion without losing any data that otherwise would have been
+# received from the primary.
+##################################################
+
+$primary->safe_psql('postgres',
+	"SELECT pg_create_logical_replication_slot('snap_test_slot', 'test_decoding', false, false, true);"
+);
+
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Two xl_running_xacts logs are generated here. When decoding the first log, it
+# only serializes the snapshot, without advancing the restart_lsn to the latest
+# position. This is because if a transaction is running, the restart_lsn can
+# only move to a position before that transaction. Hence, the second
+# xl_running_xacts log is needed, the decoding for which allows the restart_lsn
+# to advance to the last serialized snapshot's position (the first log).
+$primary->safe_psql(
+	'postgres', qq(
+		BEGIN;
+		SELECT txid_current();
+		SELECT pg_log_standby_snapshot();
+		COMMIT;
+		BEGIN;
+		SELECT txid_current();
+		SELECT pg_log_standby_snapshot();
+		COMMIT;
+));
+
+$primary->wait_for_replay_catchup($standby1);
+
+# Advance the restart_lsn to the position of the first xl_running_xacts log
+# generated above. Note that there might be concurrent xl_running_xacts logs
+# written by the bgwriter, which could cause the position to be advanced to an
+# unexpected point, but that would be a rare scenario and doesn't affect the
+# test results.
+$primary->safe_psql('postgres',
+	"SELECT pg_replication_slot_advance('snap_test_slot', pg_current_wal_lsn());"
+);
+
+# Log a message that will be consumed on the standby after promotion using the
+# synced slot. See the test where we promote standby (Promote the standby1 to
+# primary.)
+$primary->safe_psql('postgres',
+	"SELECT pg_logical_emit_message(false, 'test', 'test');"
+);
+
+# Get the confirmed_flush_lsn for the logical slot snap_test_slot on the primary
+my $confirmed_flush_lsn = $primary->safe_psql('postgres',
+	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot';");
+
+$standby1->safe_psql('postgres', "SELECT pg_sync_replication_slots();");
+
+# Verify that confirmed_flush_lsn of snap_test_slot slot is synced to the standby
+ok( $standby1->poll_query_until(
+		'postgres',
+		"SELECT '$confirmed_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'snap_test_slot' AND synced AND NOT temporary;"),
+	'confirmed_flush_lsn of slot snap_test_slot synced to standby');
+
 ##################################################
 # Test to confirm that the slot synchronization is protected from malicious
 # users.
@@ -458,8 +520,8 @@ $standby1->wait_for_log(qr/slot sync worker started/,
 	$log_offset);
 
 ##################################################
-# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot
-# on the primary is synced to the standby via the slot sync worker.
+# Test to confirm that confirmed_flush_lsn of the logical slot on the primary
+# is synced to the standby via the slot sync worker.
 ##################################################
 
 # Insert data on the primary
@@ -479,8 +541,8 @@ $subscriber1->safe_psql(
 
 $subscriber1->wait_for_subscription_sync;
 
-# Do not allow any further advancement of the restart_lsn and
-# confirmed_flush_lsn for the lsub1_slot.
+# Do not allow any further advancement of the confirmed_flush_lsn for the
+# lsub1_slot.
 $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
 
 # Wait for the replication slot to become inactive on the publisher
@@ -489,20 +551,15 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
 	1);
 
-# Get the restart_lsn for the logical slot lsub1_slot on the primary
-my $primary_restart_lsn = $primary->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
-
 # Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
 my $primary_flush_lsn = $primary->safe_psql('postgres',
 	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
 
-# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced
-# to the standby
+# Confirm that confirmed_flush_lsn of lsub1_slot slot are synced to the standby
 ok( $standby1->poll_query_until(
 		'postgres',
-		"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
-	'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
+		"SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
+	'confirmed_flush_lsn of slot lsub1_slot synced to standby');
 
 ##################################################
 # Test that logical failover replication slots wait for the specified
@@ -744,8 +801,9 @@ $primary->reload;
 
 ##################################################
 # Promote the standby1 to primary. Confirm that:
-# a) the slot 'lsub1_slot' is retained on the new primary
+# a) the slot 'lsub1_slot' and 'snap_test_slot' are retained on the new primary
 # b) logical replication for regress_mysub1 is resumed successfully after failover
+# c) changes can be consumed from the synced slot 'snap_test_slot'
 ##################################################
 $standby1->start;
 $primary->wait_for_replay_catchup($standby1);
@@ -759,8 +817,8 @@ $subscriber1->safe_psql('postgres',
 
 # Confirm the synced slot 'lsub1_slot' is retained on the new primary
 is($standby1->safe_psql('postgres',
-	q{SELECT slot_name FROM pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;}),
-	'lsub1_slot',
+	q{SELECT count(*) = 2 FROM pg_replication_slots WHERE slot_name IN ('lsub1_slot', 'snap_test_slot') AND synced AND NOT temporary;}),
+	't',
 	'synced slot retained on the new primary');
 
 # Insert data on the new primary
@@ -773,4 +831,13 @@ is( $subscriber1->safe_psql('postgres', q{SELECT count(*) FROM tab_int;}),
 	"20",
 	'data replicated from the new primary');
 
+# Consume the data from the snap_test_slot. The synced slot should reach a
+# consistent point by restoring the snapshot at the restart_lsn serialized
+# during slot synchronization.
+$result = $standby1->safe_psql('postgres',
+	"SELECT count(*) FROM pg_logical_slot_get_changes('snap_test_slot', NULL, NULL) WHERE data ~ 'message*';"
+);
+
+is($result, '1', "data can be consumed using snap_test_slot");
+
 done_testing();
-- 
2.28.0.windows.1

