From 08761e6371fdb3e1485b9127948d7ae444749421 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Thu, 28 Mar 2024 20:19:22 +0800
Subject: [PATCH v2] advance the restart_lsn of synced slots using logical
 decoding

---
 src/backend/replication/logical/logical.c     |  5 +-
 src/backend/replication/logical/slotsync.c    | 65 ++++++++++++-------
 src/backend/replication/slotfuncs.c           | 16 ++++-
 src/include/replication/slot.h                |  3 +
 .../t/040_standby_failover_slots_sync.pl      |  8 +--
 5 files changed, 64 insertions(+), 33 deletions(-)

diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c0..2a691e95e5 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -36,6 +36,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
+#include "replication/slotsync.h"
 #include "replication/snapbuild.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
@@ -516,7 +517,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("cannot use physical replication slot for logical decoding")));
 
-	if (slot->data.database != MyDatabaseId)
+	if (slot->data.database != MyDatabaseId && !fast_forward)
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				 errmsg("replication slot \"%s\" was not created in this database",
@@ -526,7 +527,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
 	 * Do not allow consumption of a "synchronized" slot until the standby
 	 * gets promoted.
 	 */
-	if (RecoveryInProgress() && slot->data.synced)
+	if (RecoveryInProgress() && slot->data.synced && !IsSyncingReplicationSlots())
 		ereport(ERROR,
 				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
 				errmsg("cannot use replication slot \"%s\" for logical decoding",
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 30480960c5..7c6ec06e13 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -25,6 +25,12 @@
  * which slot sync worker can perform the sync periodically or user can call
  * pg_sync_replication_slots() periodically to perform the syncs.
  *
+ * If synchronized slots fail to build a consistent snapshot from the
+ * restart_lsn, they would become unreliable after promotion due to potential
+ * data loss from changes before reaching a consistent point. So, we mark such
+ * slots as RS_TEMPORARY. Once they successfully reach the consistent point,
+ * they will be marked to RS_PERSISTENT.
+ *
  * The slot sync worker waits for some time before the next synchronization,
  * with the duration varying based on whether any slots were updated during
  * the last cycle. Refer to the comments above wait_for_slot_activity() for
@@ -149,26 +155,36 @@ static void slotsync_failure_callback(int code, Datum arg);
  * local slot) return false, otherwise true.
  */
 static bool
-update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+						 bool *found_consistent_point)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
-	bool		xmin_changed;
-	bool		restart_lsn_changed;
 	NameData	plugin_name;
+	bool		updated_lsn = false;
 
 	Assert(slot->data.invalidated == RS_INVAL_NONE);
 
-	xmin_changed = (remote_slot->catalog_xmin != slot->data.catalog_xmin);
-	restart_lsn_changed = (remote_slot->restart_lsn != slot->data.restart_lsn);
+	if (remote_slot->confirmed_lsn != slot->data.confirmed_flush)
+	{
+		/*
+		 * By advancing the restart_lsn, confirmed_lsn, and xmin using
+		 * fast-forward logical decoding, we can verify whether a consistent
+		 * snapshot can be built. This process also involves saving necessary
+		 * snapshots to disk during decoding, ensuring that logical decoding
+		 * efficiently reaches a consistent point at the restart_lsn without
+		 * the potential loss of data during snapshot creation.
+		 */
+		pg_logical_replication_slot_advance(remote_slot->confirmed_lsn,
+											found_consistent_point);
+		ReplicationSlotsComputeRequiredLSN();
+		updated_lsn = true;
+	}
 
-	if (!xmin_changed &&
-		!restart_lsn_changed &&
-		remote_dbid == slot->data.database &&
+	if (remote_dbid == slot->data.database &&
 		remote_slot->two_phase == slot->data.two_phase &&
 		remote_slot->failover == slot->data.failover &&
-		remote_slot->confirmed_lsn == slot->data.confirmed_flush &&
 		strcmp(remote_slot->plugin, NameStr(slot->data.plugin)) == 0)
-		return false;
+		return updated_lsn;
 
 	/* Avoid expensive operations while holding a spinlock. */
 	namestrcpy(&plugin_name, remote_slot->plugin);
@@ -178,18 +194,8 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 	slot->data.database = remote_dbid;
 	slot->data.two_phase = remote_slot->two_phase;
 	slot->data.failover = remote_slot->failover;
-	slot->data.restart_lsn = remote_slot->restart_lsn;
-	slot->data.confirmed_flush = remote_slot->confirmed_lsn;
-	slot->data.catalog_xmin = remote_slot->catalog_xmin;
-	slot->effective_catalog_xmin = remote_slot->catalog_xmin;
 	SpinLockRelease(&slot->mutex);
 
-	if (xmin_changed)
-		ReplicationSlotsComputeRequiredXmin(false);
-
-	if (restart_lsn_changed)
-		ReplicationSlotsComputeRequiredLSN();
-
 	return true;
 }
 
@@ -413,6 +419,7 @@ static bool
 update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 {
 	ReplicationSlot *slot = MyReplicationSlot;
+	bool	found_consistent_point = false;
 
 	/*
 	 * Check if the primary server has caught up. Refer to the comment atop
@@ -443,9 +450,19 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		return false;
 	}
 
-	/* First time slot update, the function must return true */
-	if (!update_local_synced_slot(remote_slot, remote_dbid))
-		elog(ERROR, "failed to update slot");
+	(void) update_local_synced_slot(remote_slot, remote_dbid,
+									&found_consistent_point);
+
+	/*
+	 * Don't persist the slot if it cannot reach the consistent point from the
+	 * restart_lsn.
+	 */
+	if (!found_consistent_point)
+	{
+		elog(DEBUG1, "The synced slot could not find consistent point from %X/%X",
+			 LSN_FORMAT_ARGS(slot->data.restart_lsn));
+		return false;
+	}
 
 	ReplicationSlotPersist();
 
@@ -578,7 +595,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 					 LSN_FORMAT_ARGS(remote_slot->restart_lsn));
 
 			/* Make sure the slot changes persist across server restart */
-			if (update_local_synced_slot(remote_slot, remote_dbid))
+			if (update_local_synced_slot(remote_slot, remote_dbid, NULL))
 			{
 				ReplicationSlotMarkDirty();
 				ReplicationSlotSave();
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index da57177c25..de4c471429 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -501,9 +501,13 @@ pg_physical_replication_slot_advance(XLogRecPtr moveto)
  * because we need to digest WAL to advance restart_lsn allowing to recycle
  * WAL and removal of old catalog tuples.  As decoding is done in fast_forward
  * mode, no changes are generated anyway.
+ *
+ * *found_consistent_point will be set to true if the logical decoding reaches
+ * the consistent point; Otherwise, it will be set to false.
  */
-static XLogRecPtr
-pg_logical_replication_slot_advance(XLogRecPtr moveto)
+XLogRecPtr
+pg_logical_replication_slot_advance(XLogRecPtr moveto,
+									bool *found_consistent_point)
 {
 	LogicalDecodingContext *ctx;
 	ResourceOwner old_resowner = CurrentResourceOwner;
@@ -511,6 +515,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 
 	Assert(moveto != InvalidXLogRecPtr);
 
+	if (found_consistent_point)
+		*found_consistent_point = false;
+
 	PG_TRY();
 	{
 		/*
@@ -567,6 +574,9 @@ pg_logical_replication_slot_advance(XLogRecPtr moveto)
 			CHECK_FOR_INTERRUPTS();
 		}
 
+		if (DecodingContextReady(ctx) && found_consistent_point)
+			*found_consistent_point = true;
+
 		/*
 		 * Logical decoding could have clobbered CurrentResourceOwner during
 		 * transaction management, so restore the executor's value.  (This is
@@ -680,7 +690,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 
 	/* Do the actual slot update, depending on the slot type */
 	if (OidIsValid(MyReplicationSlot->data.database))
-		endlsn = pg_logical_replication_slot_advance(moveto);
+		endlsn = pg_logical_replication_slot_advance(moveto, NULL);
 	else
 		endlsn = pg_physical_replication_slot_advance(moveto);
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 7b937d1a0c..534f1301b2 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -282,4 +282,7 @@ extern bool SlotExistsInStandbySlotNames(const char *slot_name);
 extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
 extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
 
+extern XLogRecPtr pg_logical_replication_slot_advance(XLogRecPtr moveto,
+													  bool *found_consistent_point);
+
 #endif							/* SLOT_H */
diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl
index f47bfd78eb..0e70bcc075 100644
--- a/src/test/recovery/t/040_standby_failover_slots_sync.pl
+++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl
@@ -280,11 +280,11 @@ is( $standby1->safe_psql(
 	'logical slot is re-synced');
 
 # Reset the log_min_messages to the default value.
-$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
-$primary->reload;
+#$primary->append_conf('postgresql.conf', "log_min_messages = 'warning'");
+#$primary->reload;
 
-$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
-$standby1->reload;
+#$standby1->append_conf('postgresql.conf', "log_min_messages = 'warning'");
+#$standby1->reload;
 
 ##################################################
 # Test that a synchronized slot can not be decoded, altered or dropped by the
-- 
2.30.0.windows.2

