From 50bbe34aea2ca665d21137dde53dc136af8df5ad Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Thu, 4 Apr 2024 14:18:51 +0530
Subject: [PATCH v6] Fix the handling of LSN and xmin in slot sync.

Previously, an ERROR was reported when the local restart_lsn exceeded the
remote restart_lsn, because we directly update slot info without logical
decoding. But now we use fast forward logical decoding to advance the slot
which can lead to the advancement of the standby's restart_lsn ahead of the
remote slot, which is OK. To prevent unnecessary ERROR reporting in such cases,
this update modifies the comparison to consider confirmed_lsn rather than
restart_lsn.

Moreover, the syncing of slot information now occurs only when the
confirmed_lsn, restart_lsn, or catalog_xmin of the remote slot is ahead of the
local slot.

While on it, ensure that when updating the catalog_xmin of the synced slots, it
is first written to disk before changing the in-memory value
(effective_catalog_xmin). This is to prevent a scenario where the in-memory
value change triggers vacuum to remove catalog tuples before the catalog_xmin
is written to disk, In the event of a crash before the catalog_xmin is
persisted, we would not know that some catalog tuples have been removed.
---
 src/backend/replication/logical/slotsync.c | 158 +++++++++++++++------
 1 file changed, 114 insertions(+), 44 deletions(-)

diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 97440cb6bf..05c2af373e 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -162,22 +162,73 @@ static void update_synced_slots_inactive_since(void);
  * *found_consistent_snapshot will be true iff the remote slot's LSN or xmin is
  * modified, and decoding from the corresponding LSN's can reach a
  * consistent snapshot.
+ *
+ * *remote_slot_precedes will be true if the remote slot's LSN and xmin didn't
+ * catch up to locally reserved position.
  */
 static bool
 update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
-						 bool *found_consistent_snapshot)
+						 bool *found_consistent_snapshot,
+						 bool *remote_slot_precedes)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
-	bool		slot_updated = false;
+	bool		updated_xmin_or_lsn = false;
+	bool		updated_config = false;
 
 	Assert(slot->data.invalidated == RS_INVAL_NONE);
 
 	if (found_consistent_snapshot)
 		*found_consistent_snapshot = false;
 
-	if (remote_slot->confirmed_lsn != slot->data.confirmed_flush ||
-		remote_slot->restart_lsn != slot->data.restart_lsn ||
-		remote_slot->catalog_xmin != slot->data.catalog_xmin)
+	if (remote_slot_precedes)
+		*remote_slot_precedes = false;
+
+	/*
+	 * Don't overwrite if we already have a newer catalog_xmin and
+	 * restart_lsn.
+	 */
+	if (remote_slot->restart_lsn < slot->data.restart_lsn ||
+		TransactionIdPrecedes(remote_slot->catalog_xmin,
+							  slot->data.catalog_xmin))
+	{
+		/*
+		 * If the slot is temporary, it means the remote slot didn't catch up
+		 * to locally reserved position.
+		 *
+		 * If the slot is persistent, restart_lsn of the synced slot could
+		 * still be ahead of the remote slot. Since we use advance slot
+		 * functionality to reach to a consistent state while syncing a slot,
+		 * there is a possibility that for a synced slot, while consuming
+		 * changes from remote's slot restart_lsn till confirmed_flush, slot
+		 * sync find running xacts record after reaching consistent point, for
+		 * which it serializes the snapshot and thus end up advancing
+		 * restart_lsn of standby ahead of remote slot.
+		 *
+		 * We report a LOG message if the slot is temporary as it can help
+		 * user to understand why the slot is not sync-ready. In the case of a
+		 * persistent slot, we use DEBUG1 elevel since it's a common case.
+		 */
+		ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
+				errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
+					   remote_slot->name),
+				errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
+						  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
+						  remote_slot->catalog_xmin,
+						  LSN_FORMAT_ARGS(slot->data.restart_lsn),
+						  slot->data.catalog_xmin));
+
+		if (remote_slot_precedes)
+			*remote_slot_precedes = true;
+	}
+
+	/*
+	 * Attempt to sync LSNs and xmins only if remote slot is ahead of local
+	 * slot.
+	 */
+	else if (remote_slot->confirmed_lsn > slot->data.confirmed_flush ||
+			 remote_slot->restart_lsn > slot->data.restart_lsn ||
+			 TransactionIdFollows(remote_slot->catalog_xmin,
+								  slot->data.catalog_xmin))
 	{
 		/*
 		 * We can't directly copy the remote slot's LSN or xmin unless there
@@ -198,7 +249,6 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 			slot->data.restart_lsn = remote_slot->restart_lsn;
 			slot->data.confirmed_flush = remote_slot->confirmed_lsn;
 			slot->data.catalog_xmin = remote_slot->catalog_xmin;
-			slot->effective_catalog_xmin = remote_slot->catalog_xmin;
 			SpinLockRelease(&slot->mutex);
 
 			if (found_consistent_snapshot)
@@ -208,12 +258,18 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		{
 			LogicalSlotAdvanceAndCheckSnapState(remote_slot->confirmed_lsn,
 												found_consistent_snapshot);
-		}
 
-		ReplicationSlotsComputeRequiredXmin(false);
-		ReplicationSlotsComputeRequiredLSN();
+			/* Sanity check */
+			if (slot->data.confirmed_flush != remote_slot->confirmed_lsn)
+				ereport(ERROR,
+						errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
+										remote_slot->name),
+						errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
+										   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
+										   LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
+		}
 
-		slot_updated = true;
+		updated_xmin_or_lsn = true;
 	}
 
 	if (remote_dbid != slot->data.database ||
@@ -233,10 +289,37 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
 		slot->data.failover = remote_slot->failover;
 		SpinLockRelease(&slot->mutex);
 
-		slot_updated = true;
+		updated_config = true;
 	}
 
-	return slot_updated;
+	/*
+	 * We have to write the changed xmin to disk *before* we change the
+	 * in-memory value, otherwise after a crash we wouldn't know that some
+	 * catalog tuples might have been removed already.
+	 */
+	if (updated_config || updated_xmin_or_lsn)
+	{
+		ReplicationSlotMarkDirty();
+		ReplicationSlotSave();
+	}
+
+	/*
+	 * Now the new xmin is safely on disk, we can let the global value
+	 * advance. We do not take ProcArrayLock or similar since we only advance
+	 * xmin here and there's not much harm done by a concurrent computation
+	 * missing that.
+	 */
+	if (updated_xmin_or_lsn)
+	{
+		SpinLockAcquire(&slot->mutex);
+		slot->effective_catalog_xmin = remote_slot->catalog_xmin;
+		SpinLockRelease(&slot->mutex);
+
+		ReplicationSlotsComputeRequiredXmin(false);
+		ReplicationSlotsComputeRequiredLSN();
+	}
+
+	return updated_config || updated_xmin_or_lsn;
 }
 
 /*
@@ -460,14 +543,17 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
 	bool		found_consistent_snapshot = false;
+	bool		remote_slot_precedes = false;
+
+	(void) update_local_synced_slot(remote_slot, remote_dbid,
+									&found_consistent_snapshot,
+									&remote_slot_precedes);
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
 	 * the file for details on this check.
 	 */
-	if (remote_slot->restart_lsn < slot->data.restart_lsn ||
-		TransactionIdPrecedes(remote_slot->catalog_xmin,
-							  slot->data.catalog_xmin))
+	if (remote_slot_precedes)
 	{
 		/*
 		 * The remote slot didn't catch up to locally reserved position.
@@ -476,23 +562,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * current location when recreating the slot in the next cycle. It may
 		 * take more time to create such a slot. Therefore, we keep this slot
 		 * and attempt the synchronization in the next cycle.
-		 *
-		 * XXX should this be changed to elog(DEBUG1) perhaps?
 		 */
-		ereport(LOG,
-				errmsg("could not sync slot \"%s\" as remote slot precedes local slot",
-					   remote_slot->name),
-				errdetail("Remote slot has LSN %X/%X and catalog xmin %u, but local slot has LSN %X/%X and catalog xmin %u.",
-						  LSN_FORMAT_ARGS(remote_slot->restart_lsn),
-						  remote_slot->catalog_xmin,
-						  LSN_FORMAT_ARGS(slot->data.restart_lsn),
-						  slot->data.catalog_xmin));
 		return false;
 	}
 
-	(void) update_local_synced_slot(remote_slot, remote_dbid,
-									&found_consistent_snapshot);
-
 	/*
 	 * Don't persist the slot if it cannot reach the consistent point from the
 	 * restart_lsn. See comments atop this file.
@@ -633,23 +706,20 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 			/*
 			 * Sanity check: As long as the invalidations are handled
 			 * appropriately as above, this should never happen.
+			 *
+			 * We do not intend to check restart_lsn here. See the comments in
+			 * update_local_synced_slot() for details.
 			 */
-			if (remote_slot->restart_lsn < slot->data.restart_lsn)
-				elog(ERROR,
-					 "cannot synchronize local slot \"%s\" LSN(%X/%X)"
-					 " to remote slot's LSN(%X/%X) as synchronization"
-					 " would move it backwards", remote_slot->name,
-					 LSN_FORMAT_ARGS(slot->data.restart_lsn),
-					 LSN_FORMAT_ARGS(remote_slot->restart_lsn));
-
-			/* Make sure the slot changes persist across server restart */
-			if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
-			{
-				ReplicationSlotMarkDirty();
-				ReplicationSlotSave();
-
-				slot_updated = true;
-			}
+			if (remote_slot->confirmed_lsn < slot->data.confirmed_flush)
+				ereport(ERROR,
+						errmsg_internal("cannot synchronize local slot \"%s\"",
+										remote_slot->name),
+						errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
+										   LSN_FORMAT_ARGS(slot->data.confirmed_flush),
+										   LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
+
+			slot_updated = update_local_synced_slot(remote_slot, remote_dbid,
+													NULL, NULL);
 		}
 	}
 	/* Otherwise create the slot first. */
-- 
2.30.0.windows.2

