From 662a25da53f05ae9833cbf6641c7256fa55e82fc Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 28 Mar 2024 20:19:22 +0800
Subject: [PATCH v6 1/2] advance the restart_lsn of synced slots using logical
 decoding

Since we don't perform logical decoding for the synced slots when syncing the
lsn/xmin of slot, no logical snapshots will be serialized to disk. So, when
user starts to use these synced slots after promotion, it needs to re-build the
consistent snapshot from the restart_lsn if the WAL(xl_running_xacts) at
restart_lsn position indicates that there are running transactions. This
however could cause the data that before the consistent point to be missed.

To address the issue, if there is no existing serialized snapshot at the
syncing restart_lsn, we progress the lsn/xmin of the slot on the standby using
fast-forward logical decoding. This approach allows us to determine if the
synced slot can reach a consistent state, ensuring that the required snapshot
is saved to disk during decoding. Slots that achieve consistency are marked as
sync-ready.
---
 src/backend/replication/logical/logical.c     | 146 +++++++++++++++++-
 src/backend/replication/logical/slotsync.c    | 130 +++++++++++-----
 src/backend/replication/logical/snapbuild.c   |  23 +++
 src/backend/replication/slotfuncs.c           | 118 +-------------
 src/include/replication/logical.h             |   2 +
 src/include/replication/snapbuild.h           |   2 +
 .../t/040_standby_failover_slots_sync.pl      |  19 +--
 7 files changed, 272 insertions(+), 168 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c0..f2f509b94f 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/slotsync.h"
 #include "replication/snapbuild.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -516,17 +517,23 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot use physical replication slot for logical decoding")));
 
-	if (slot->data.database != MyDatabaseId)
+	/*
+	 * We need to access the system tables during decoding to build the
+	 * logical changes unless we are in fast-forward mode where no changes are
+	 * generated.
+	 */
+	if (slot->data.database != MyDatabaseId && !fast_forward)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("replication slot \"%s\" was not created in this database",
 						NameStr(slot->data.name))));
 
 	/*
-	 * Do not allow consumption of a "synchronized" slot until the standby
-	 * gets promoted.
+	 * The slots being synced from the primary can't be used for decoding as
+	 * they are used after failover. However, we do allow advancing the LSNs
+	 * during the synchronization of slots. See update_local_synced_slot.
 	 */
-	if (RecoveryInProgress() && slot->data.synced)
+	if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
 		ereport(ERROR,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("cannot use replication slot \"%s\" for logical decoding",
@@ -2034,3 +2041,134 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
 
 	return has_pending_wal;
 }
+
+/*
+ * Helper function for advancing our logical replication slot forward.
+ *
+ * The slot's restart_lsn is used as start point for reading records, while
+ * confirmed_flush is used as base point for the decoding context.
+ *
+ * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
+ * because we need to digest WAL to advance restart_lsn allowing to recycle
+ * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
+ * mode, no changes are generated anyway.
+ *
+ * *ready_for_decoding will be true if the initial decoding snapshot has
+ * been built; Otherwise, it will be false.
+ */
+XLogRecPtr
+LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *ready_for_decoding)
+{
+	LogicalDecodingContext *ctx;
+	ResourceOwner old_resowner = CurrentResourceOwner;
+	XLogRecPtr	retlsn;
+
+	Assert(moveto != InvalidXLogRecPtr);
+
+	if (ready_for_decoding)
+		*ready_for_decoding = false;
+
+	PG_TRY();
+	{
+		/*
+		 * Create our decoding context in fast_forward mode, passing start_lsn
+		 * as InvalidXLogRecPtr, so that we start processing from my slot's
+		 * confirmed_flush.
+		 */
+		ctx = CreateDecodingContext(InvalidXLogRecPtr,
+									NIL,
+									true,	/* fast_forward */
+									XL_ROUTINE(.page_read = read_local_xlog_page,
+											   .segment_open = wal_segment_open,
+											   .segment_close = wal_segment_close),
+									NULL, NULL, NULL);
+
+		/*
+		 * Wait for specified streaming replication standby servers (if any)
+		 * to confirm receipt of WAL up to moveto lsn.
+		 */
+		WaitForStandbyConfirmation(moveto);
+
+		/*
+		 * Start reading at the slot's restart_lsn, which we know to point to
+		 * a valid record.
+		 */
+		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
+
+		/* invalidate non-timetravel entries */
+		InvalidateSystemCaches();
+
+		/* Decode records until we reach the requested target */
+		while (ctx->reader->EndRecPtr < moveto)
+		{
+			char	   *errm = NULL;
+			XLogRecord *record;
+
+			/*
+			 * Read records.  No changes are generated in fast_forward mode,
+			 * but snapbuilder/slot statuses are updated properly.
+			 */
+			record = XLogReadRecord(ctx->reader, &errm);
+			if (errm)
+				elog(ERROR, "could not find record while advancing replication slot: %s",
+					 errm);
+
+			/*
+			 * Process the record.  Storage-level changes are ignored in
+			 * fast_forward mode, but other modules (such as snapbuilder)
+			 * might still have critical updates to do.
+			 */
+			if (record)
+				LogicalDecodingProcessRecord(ctx, ctx->reader);
+
+			CHECK_FOR_INTERRUPTS();
+		}
+
+		if (DecodingContextReady(ctx) && ready_for_decoding)
+			*ready_for_decoding = true;
+
+		/*
+		 * Logical decoding could have clobbered CurrentResourceOwner during
+		 * transaction management, so restore the executor's value.  (This is
+		 * a kluge, but it's not worth cleaning up right now.)
+		 */
+		CurrentResourceOwner = old_resowner;
+
+		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
+		{
+			LogicalConfirmReceivedLocation(moveto);
+
+			/*
+			 * If only the confirmed_flush LSN has changed the slot won't get
+			 * marked as dirty by the above. Callers on the walsender
+			 * interface are expected to keep track of their own progress and
+			 * don't need it written out. But SQL-interface users cannot
+			 * specify their own start positions and it's harder for them to
+			 * keep track of their progress, so we should make more of an
+			 * effort to save it for them.
+			 *
+			 * Dirty the slot so it is written out at the next checkpoint. The
+			 * LSN position advanced to may still be lost on a crash but this
+			 * makes the data consistent after a clean shutdown.
+			 */
+			ReplicationSlotMarkDirty();
+		}
+
+		retlsn = MyReplicationSlot->data.confirmed_flush;
+
+		/* free context, call shutdown callback */
+		FreeDecodingContext(ctx);
+
+		InvalidateSystemCaches();
+	}
+	PG_CATCH();
+	{
+		/* clear all timetravel entries */
+		InvalidateSystemCaches();
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	return retlsn;
+}
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..2c16082c03 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -25,6 +25,15 @@
  * which slot sync worker can perform the sync periodically or user can call
  * pg_sync_replication_slots() periodically to perform the syncs.
  *
+ * If synchronized slots fail to build a consistent snapshot from the
+ * restart_lsn before reaching confirmed_flush_lsn, they would become
+ * unreliable after promotion due to potential data loss from changes
+ * before reaching a consistent point. This can happen because the slots can
+ * be synced at some random time and we may not reach the consistent point
+ * at the same WAL location as the primary. So, we mark such slots as
+ * RS_TEMPORARY. Once the decoding from corresponding LSNs can reach a
+ * consistent point, they will be marked as RS_PERSISTENT.
+ *
  * The slot sync worker waits for some time before the next synchronization,
  * with the duration varying based on whether any slots were updated during
  * the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -49,8 +58,9 @@
 #include "postmaster/fork_process.h"
 #include "postmaster/interrupt.h"
 #include "postmaster/postmaster.h"
-#include "replication/slot.h"
+#include "replication/logical.h"
 #include "replication/slotsync.h"
+#include "replication/snapbuild.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
 #include "storage/proc.h"
@@ -147,50 +157,85 @@ static void slotsync_failure_callback(int code, Datum arg);
  *
  * If no update was needed (the data of the remote slot is the same as the
  * local slot) return false, otherwise true.
+ *
+ * *ready_for_decoding will be true iff the remote slot's LSN or xmin is
+ * modified, and decoding from the corresponding LSN's can reach a
+ * consistent snapshot.
  */
 static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+						 bool *ready_for_decoding)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
-	bool		xmin_changed;
-	bool		restart_lsn_changed;
-	NameData	plugin_name;
+	bool		slot_updated = false;
 
 	Assert(slot->data.invalidated == RS_INVAL_NONE);
 
-	xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
-	restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+	if (ready_for_decoding)
+		*ready_for_decoding = false;
 
-	if (!xmin_changed &&
-		!restart_lsn_changed &&
-		remote_dbid == slot->data.database &&
-		remote_slot->two_phase == slot->data.two_phase &&
-		remote_slot->failover == slot->data.failover &&
-		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
-		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
-		return false;
+	if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
+		remote_slot->restart_lsn != slot->data.restart_lsn ||
+		remote_slot->catalog_xmin != slot->data.catalog_xmin)
+	{
+		/*
+		 * We can't directly copy the remote slot's LSN or xmin unless there
+		 * exists a consistent snapshot at that point. Otherwise, after
+		 * promotion, the slots may not reach a consistent point before the
+		 * confirmed_flush_lsn which can lead to a data loss. To avoid data
+		 * loss, we let slot machinery advance the slot which ensures that
+		 * snapbuilder/slot statuses are updated properly.
+		 */
+		if (SnapBuildSnapshotExists(remote_slot->restart_lsn))
+		{
+			/*
+			 * Update the slot info directly if there is a serialized snapshot
+			 * at the restart_lsn, as the slot can quickly reach consistency
+			 * at restart_lsn by restoring the snapshot.
+			 */
+			SpinLockAcquire(&slot->mutex);
+			slot->data.restart_lsn = remote_slot->restart_lsn;
+			slot->data.confirmed_flush = remote_slot->confirmed_lsn;
+			slot->data.catalog_xmin = remote_slot->catalog_xmin;
+			slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+			SpinLockRelease(&slot->mutex);
 
-	/* Avoid expensive operations while holding a spinlock. */
-	namestrcpy(&plugin_name, remote_slot->plugin);
-
-	SpinLockAcquire(&slot->mutex);
-	slot->data.plugin = plugin_name;
-	slot->data.database = remote_dbid;
-	slot->data.two_phase = remote_slot->two_phase;
-	slot->data.failover = remote_slot->failover;
-	slot->data.restart_lsn = remote_slot->restart_lsn;
-	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
-	slot->data.catalog_xmin = remote_slot->catalog_xmin;
-	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
-	SpinLockRelease(&slot->mutex);
-
-	if (xmin_changed)
-		ReplicationSlotsComputeRequiredXmin(false);
+			if (ready_for_decoding)
+				*ready_for_decoding = true;
+		}
+		else
+		{
+			LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
+												ready_for_decoding);
+		}
 
-	if (restart_lsn_changed)
+		ReplicationSlotsComputeRequiredXmin(false);
 		ReplicationSlotsComputeRequiredLSN();
 
-	return true;
+		slot_updated = true;
+	}
+
+	if (remote_dbid != slot->data.database ||
+		remote_slot->two_phase != slot->data.two_phase ||
+		remote_slot->failover != slot->data.failover ||
+		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) != 0)
+	{
+		NameData	plugin_name;
+
+		/* Avoid expensive operations while holding a spinlock. */
+		namestrcpy(&plugin_name, remote_slot->plugin);
+
+		SpinLockAcquire(&slot->mutex);
+		slot->data.plugin = plugin_name;
+		slot->data.database = remote_dbid;
+		slot->data.two_phase = remote_slot->two_phase;
+		slot->data.failover = remote_slot->failover;
+		SpinLockRelease(&slot->mutex);
+
+		slot_updated = true;
+	}
+
+	return slot_updated;
 }
 
 /*
@@ -413,6 +458,7 @@ static bool
 update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool		ready_for_decoding = false;
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +489,19 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		return false;
 	}
 
-	/* First time slot update, the function must return true */
-	if (!update_local_synced_slot(remote_slot, remote_dbid))
-		elog(ERROR, "failed to update slot");
+	(void) update_local_synced_slot(remote_slot, remote_dbid,
+									&ready_for_decoding);
+
+	/*
+	 * Don't persist the slot if it cannot reach the consistent point from the
+	 * restart_lsn. See comments atop this file.
+	 */
+	if (!ready_for_decoding)
+	{
+		elog(DEBUG1, "could not find consistent point for synced slot; restart_lsn = %X/%X",
+			 LSN_FORMAT_ARGS(slot->data.restart_lsn));
+		return false;
+	}
 
 	ReplicationSlotPersist();
 
@@ -578,7 +634,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 					 LSN_FORMAT_ARGS(remote_slot->restart_lsn));
 
 			/* Make sure the slot changes persist across server restart */
-			if (update_local_synced_slot(remote_slot, remote_dbid))
+			if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
 			{
 				ReplicationSlotMarkDirty();
 				ReplicationSlotSave();
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index ac24b51860..e37e22f441 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -2134,3 +2134,26 @@ CheckPointSnapBuild(void)
 	}
 	FreeDir(snap_dir);
 }
+
+/*
+ * Check if a logical snapshot at the specified point has been serialized.
+ */
+bool
+SnapBuildSnapshotExists(XLogRecPtr lsn)
+{
+	char		path[MAXPGPATH];
+	int			ret;
+	struct stat stat_buf;
+
+	sprintf(path, "pg_logical/snapshots/%X-%X.snap",
+			LSN_FORMAT_ARGS(lsn));
+
+	ret = stat(path, &stat_buf);
+
+	if (ret != 0 && errno != ENOENT)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not stat file \"%s\": %m", path)));
+
+	return ret == 0;
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index da57177c25..dd6c1d5a7e 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -492,125 +492,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
 }
 
 /*
- * Helper function for advancing our logical replication slot forward.
- *
- * The slot's restart_lsn is used as start point for reading records, while
- * confirmed_flush is used as base point for the decoding context.
- *
- * We cannot just do LogicalConfirmReceivedLocation to update confirmed_flush,
- * because we need to digest WAL to advance restart_lsn allowing to recycle
- * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
- * mode, no changes are generated anyway.
+ * Advance our logical replication slot forward. See
+ * LogicalSlotAdvanceAndCheckSnapState for details.
  */
 static XLogRecPtr
 pg_logical_replication_slot_advance(XLogRecPtr moveto)
 {
-	LogicalDecodingContext *ctx;
-	ResourceOwner old_resowner = CurrentResourceOwner;
-	XLogRecPtr	retlsn;
-
-	Assert(moveto != InvalidXLogRecPtr);
-
-	PG_TRY();
-	{
-		/*
-		 * Create our decoding context in fast_forward mode, passing start_lsn
-		 * as InvalidXLogRecPtr, so that we start processing from my slot's
-		 * confirmed_flush.
-		 */
-		ctx = CreateDecodingContext(InvalidXLogRecPtr,
-									NIL,
-									true,	/* fast_forward */
-									XL_ROUTINE(.page_read = read_local_xlog_page,
-											   .segment_open = wal_segment_open,
-											   .segment_close = wal_segment_close),
-									NULL, NULL, NULL);
-
-		/*
-		 * Wait for specified streaming replication standby servers (if any)
-		 * to confirm receipt of WAL up to moveto lsn.
-		 */
-		WaitForStandbyConfirmation(moveto);
-
-		/*
-		 * Start reading at the slot's restart_lsn, which we know to point to
-		 * a valid record.
-		 */
-		XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);
-
-		/* invalidate non-timetravel entries */
-		InvalidateSystemCaches();
-
-		/* Decode records until we reach the requested target */
-		while (ctx->reader->EndRecPtr < moveto)
-		{
-			char	   *errm = NULL;
-			XLogRecord *record;
-
-			/*
-			 * Read records.  No changes are generated in fast_forward mode,
-			 * but snapbuilder/slot statuses are updated properly.
-			 */
-			record = XLogReadRecord(ctx->reader, &errm);
-			if (errm)
-				elog(ERROR, "could not find record while advancing replication slot: %s",
-					 errm);
-
-			/*
-			 * Process the record.  Storage-level changes are ignored in
-			 * fast_forward mode, but other modules (such as snapbuilder)
-			 * might still have critical updates to do.
-			 */
-			if (record)
-				LogicalDecodingProcessRecord(ctx, ctx->reader);
-
-			CHECK_FOR_INTERRUPTS();
-		}
-
-		/*
-		 * Logical decoding could have clobbered CurrentResourceOwner during
-		 * transaction management, so restore the executor's value.  (This is
-		 * a kluge, but it's not worth cleaning up right now.)
-		 */
-		CurrentResourceOwner = old_resowner;
-
-		if (ctx->reader->EndRecPtr != InvalidXLogRecPtr)
-		{
-			LogicalConfirmReceivedLocation(moveto);
-
-			/*
-			 * If only the confirmed_flush LSN has changed the slot won't get
-			 * marked as dirty by the above. Callers on the walsender
-			 * interface are expected to keep track of their own progress and
-			 * don't need it written out. But SQL-interface users cannot
-			 * specify their own start positions and it's harder for them to
-			 * keep track of their progress, so we should make more of an
-			 * effort to save it for them.
-			 *
-			 * Dirty the slot so it is written out at the next checkpoint. The
-			 * LSN position advanced to may still be lost on a crash but this
-			 * makes the data consistent after a clean shutdown.
-			 */
-			ReplicationSlotMarkDirty();
-		}
-
-		retlsn = MyReplicationSlot->data.confirmed_flush;
-
-		/* free context, call shutdown callback */
-		FreeDecodingContext(ctx);
-
-		InvalidateSystemCaches();
-	}
-	PG_CATCH();
-	{
-		/* clear all timetravel entries */
-		InvalidateSystemCaches();
-
-		PG_RE_THROW();
-	}
-	PG_END_TRY();
-
-	return retlsn;
+	return LogicalSlotAdvanceAndCheckSnapState(moveto, NULL);
 }
 
 /*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..b53b766f3d 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -149,5 +149,7 @@ extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
 extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal);
+extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto,
+													  bool *ready_for_decoding);
 
 #endif
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index fbdf362396..a3360a1c5e 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -91,4 +91,6 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
 										 struct xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
 
+extern bool SnapBuildSnapshotExists(XLogRecPtr lsn);
+
 #endif							/* SNAPBUILD_H */
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..0818c3c068 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -458,8 +458,8 @@ $standby1->wait_for_log(qr/slot sync worker started/,
 	$log_offset);
 
 ##################################################
-# Test to confirm that restart_lsn and confirmed_flush_lsn of the logical slot
-# on the primary is synced to the standby via the slot sync worker.
+# Test to confirm that confirmed_flush_lsn of the logical slot on the primary
+# is synced to the standby via the slot sync worker.
 ##################################################
 
 # Insert data on the primary
@@ -479,8 +479,8 @@ $subscriber1->safe_psql(
 
 $subscriber1->wait_for_subscription_sync;
 
-# Do not allow any further advancement of the restart_lsn and
-# confirmed_flush_lsn for the lsub1_slot.
+# Do not allow any further advancement of the confirmed_flush_lsn for the
+# lsub1_slot.
 $subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 DISABLE");
 
 # Wait for the replication slot to become inactive on the publisher
@@ -489,20 +489,15 @@ $primary->poll_query_until(
 	"SELECT COUNT(*) FROM pg_catalog.pg_replication_slots WHERE slot_name = 'lsub1_slot' AND active='f'",
 	1);
 
-# Get the restart_lsn for the logical slot lsub1_slot on the primary
-my $primary_restart_lsn = $primary->safe_psql('postgres',
-	"SELECT restart_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
-
 # Get the confirmed_flush_lsn for the logical slot lsub1_slot on the primary
 my $primary_flush_lsn = $primary->safe_psql('postgres',
 	"SELECT confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot';");
 
-# Confirm that restart_lsn and confirmed_flush_lsn of lsub1_slot slot are synced
-# to the standby
+# Confirm that confirmed_flush_lsn of lsub1_slot slot are synced to the standby
 ok( $standby1->poll_query_until(
 		'postgres',
-		"SELECT '$primary_restart_lsn' = restart_lsn AND '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
-	'restart_lsn and confirmed_flush_lsn of slot lsub1_slot synced to standby');
+		"SELECT '$primary_flush_lsn' = confirmed_flush_lsn from pg_replication_slots WHERE slot_name = 'lsub1_slot' AND synced AND NOT temporary;"),
+	'confirmed_flush_lsn of slot lsub1_slot synced to standby');
 
 ##################################################
 # Test that logical failover replication slots wait for the specified
-- 
2.30.0.windows.2

