From c95d010d878de6a3433722269759ffc08bcd7aef Mon Sep 17 00:00:00 2001
From: Nisha Moond <nisha.moond412@gmail.com>
Date: Mon, 18 Nov 2024 16:13:26 +0530
Subject: [PATCH v57 1/2] Enhance replication slot error handling, slot
 invalidation, and inactive_since setting logic

In ReplicationSlotAcquire(), raise an error for invalid slots if the
caller specifies error_if_invalid=true.

Add check if the slot is already acquired, then mark it invalidated directly.

Ensure same inactive_since time for all slots in update_synced_slots_inactive_since()
and RestoreSlotFromDisk().
---
 .../replication/logical/logicalfuncs.c        |  2 +-
 src/backend/replication/logical/slotsync.c    | 13 ++--
 src/backend/replication/slot.c                | 69 ++++++++++++++++---
 src/backend/replication/slotfuncs.c           |  2 +-
 src/backend/replication/walsender.c           |  4 +-
 src/backend/utils/adt/pg_upgrade_support.c    |  2 +-
 src/include/replication/slot.h                |  3 +-
 src/test/recovery/t/019_replslot_limit.pl     |  2 +-
 8 files changed, 75 insertions(+), 22 deletions(-)

diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index b4dd5cce75..56fc1a45a9 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -197,7 +197,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
 	else
 		end_of_wal = GetXLogReplayRecPtr(NULL);
 
-	ReplicationSlotAcquire(NameStr(*name), true);
+	ReplicationSlotAcquire(NameStr(*name), true, true);
 
 	PG_TRY();
 	{
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index f4f80b2312..e3645aea53 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -446,7 +446,7 @@ drop_local_obsolete_slots(List *remote_slot_list)
 
 			if (synced_slot)
 			{
-				ReplicationSlotAcquire(NameStr(local_slot->data.name), true);
+				ReplicationSlotAcquire(NameStr(local_slot->data.name), true, false);
 				ReplicationSlotDropAcquired();
 			}
 
@@ -665,7 +665,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		 * pre-check to ensure that at least one of the slot properties is
 		 * changed before acquiring the slot.
 		 */
-		ReplicationSlotAcquire(remote_slot->name, true);
+		ReplicationSlotAcquire(remote_slot->name, true, false);
 
 		Assert(slot == MyReplicationSlot);
 
@@ -1508,7 +1508,7 @@ ReplSlotSyncWorkerMain(char *startup_data, size_t startup_data_len)
 static void
 update_synced_slots_inactive_since(void)
 {
-	TimestampTz now = 0;
+	TimestampTz now;
 
 	/*
 	 * We need to update inactive_since only when we are promoting standby to
@@ -1523,6 +1523,9 @@ update_synced_slots_inactive_since(void)
 	/* The slot sync worker or SQL function mustn't be running by now */
 	Assert((SlotSyncCtx->pid == InvalidPid) && !SlotSyncCtx->syncing);
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (int i = 0; i < max_replication_slots; i++)
@@ -1537,10 +1540,6 @@ update_synced_slots_inactive_since(void)
 			/* The slot must not be acquired by any process */
 			Assert(s->active_pid == 0);
 
-			/* Use the same inactive_since time for all the slots. */
-			if (now == 0)
-				now = GetCurrentTimestamp();
-
 			SpinLockAcquire(&s->mutex);
 			s->inactive_since = now;
 			SpinLockRelease(&s->mutex);
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 4a206f9527..2a99c1f053 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -163,6 +163,7 @@ static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 static void RestoreSlotFromDisk(const char *name);
 static void CreateSlotOnDisk(ReplicationSlot *slot);
 static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
+static void RaiseSlotInvalidationError(ReplicationSlot *slot);
 
 /*
  * Report shared-memory space needed by ReplicationSlotsShmemInit.
@@ -535,9 +536,12 @@ ReplicationSlotName(int index, Name name)
  *
  * An error is raised if nowait is true and the slot is currently in use. If
  * nowait is false, we sleep until the slot is released by the owning process.
+ *
+ * An error is raised if error_if_invalid is true and the slot is found to
+ * be invalid.
  */
 void
-ReplicationSlotAcquire(const char *name, bool nowait)
+ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
 {
 	ReplicationSlot *s;
 	int			active_pid;
@@ -615,6 +619,13 @@ retry:
 	/* We made this slot active, so it's ours now. */
 	MyReplicationSlot = s;
 
+	/*
+	 * An error is raised if error_if_invalid is true and the slot is found to
+	 * be invalid.
+	 */
+	if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
+		RaiseSlotInvalidationError(s);
+
 	/*
 	 * The call to pgstat_acquire_replslot() protects against stats for a
 	 * different slot, from before a restart or such, being present during
@@ -785,7 +796,7 @@ ReplicationSlotDrop(const char *name, bool nowait)
 {
 	Assert(MyReplicationSlot == NULL);
 
-	ReplicationSlotAcquire(name, nowait);
+	ReplicationSlotAcquire(name, nowait, false);
 
 	/*
 	 * Do not allow users to drop the slots which are currently being synced
@@ -812,7 +823,7 @@ ReplicationSlotAlter(const char *name, const bool *failover,
 	Assert(MyReplicationSlot == NULL);
 	Assert(failover || two_phase);
 
-	ReplicationSlotAcquire(name, false);
+	ReplicationSlotAcquire(name, false, false);
 
 	if (SlotIsPhysical(MyReplicationSlot))
 		ereport(ERROR,
@@ -1676,11 +1687,12 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,
 		active_pid = s->active_pid;
 
 		/*
-		 * If the slot can be acquired, do so and mark it invalidated
-		 * immediately.  Otherwise we'll signal the owning process, below, and
-		 * retry.
+		 * If the slot can be acquired, do so and mark it as invalidated. If
+		 * the slot is already ours, mark it as invalidated. Otherwise, we'll
+		 * signal the owning process below and retry.
 		 */
-		if (active_pid == 0)
+		if (active_pid == 0 ||
+			(MyReplicationSlot == s && active_pid == MyProcPid))
 		{
 			MyReplicationSlot = s;
 			s->active_pid = MyProcPid;
@@ -2208,6 +2220,7 @@ RestoreSlotFromDisk(const char *name)
 	bool		restored = false;
 	int			readBytes;
 	pg_crc32c	checksum;
+	TimestampTz now;
 
 	/* no need to lock here, no concurrent access allowed yet */
 
@@ -2368,6 +2381,9 @@ RestoreSlotFromDisk(const char *name)
 						NameStr(cp.slotdata.name)),
 				 errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
+	/* Use same inactive_since time for all slots */
+	now = GetCurrentTimestamp();
+
 	/* nothing can be active yet, don't lock anything */
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -2400,7 +2416,7 @@ RestoreSlotFromDisk(const char *name)
 		 * slot from the disk into memory. Whoever acquires the slot i.e.
 		 * makes the slot active will reset it.
 		 */
-		slot->inactive_since = GetCurrentTimestamp();
+		slot->inactive_since = now;
 
 		restored = true;
 		break;
@@ -2793,3 +2809,40 @@ WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
 
 	ConditionVariableCancelSleep();
 }
+
+/*
+ * Raise an error based on the invalidation cause of the slot.
+ */
+static void
+RaiseSlotInvalidationError(ReplicationSlot *slot)
+{
+	StringInfoData err_detail;
+
+	initStringInfo(&err_detail);
+
+	switch (slot->data.invalidated)
+	{
+		case RS_INVAL_WAL_REMOVED:
+			appendStringInfo(&err_detail, _("This slot has been invalidated because the required WAL has been removed."));
+			break;
+
+		case RS_INVAL_HORIZON:
+			appendStringInfo(&err_detail, _("This slot has been invalidated because the required rows have been removed."));
+			break;
+
+		case RS_INVAL_WAL_LEVEL:
+			/* translator: %s is a GUC variable name */
+			appendStringInfo(&err_detail, _("This slot has been invalidated because \"%s\" is insufficient for slot."),
+							 "wal_level");
+			break;
+
+		case RS_INVAL_NONE:
+			pg_unreachable();
+	}
+
+	ereport(ERROR,
+			errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+			errmsg("can no longer get changes from replication slot \"%s\"",
+				   NameStr(slot->data.name)),
+			errdetail_internal("%s", err_detail.data));
+}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 488a161b3e..578cff64c8 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -536,7 +536,7 @@ pg_replication_slot_advance(PG_FUNCTION_ARGS)
 		moveto = Min(moveto, GetXLogReplayRecPtr(NULL));
 
 	/* Acquire the slot so we "own" it */
-	ReplicationSlotAcquire(NameStr(*slotname), true);
+	ReplicationSlotAcquire(NameStr(*slotname), true, true);
 
 	/* A slot whose restart_lsn has never been reserved cannot be advanced */
 	if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 371eef3ddd..b36ae90b2c 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -816,7 +816,7 @@ StartReplication(StartReplicationCmd *cmd)
 
 	if (cmd->slotname)
 	{
-		ReplicationSlotAcquire(cmd->slotname, true);
+		ReplicationSlotAcquire(cmd->slotname, true, true);
 		if (SlotIsLogical(MyReplicationSlot))
 			ereport(ERROR,
 					(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1434,7 +1434,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
 
 	Assert(!MyReplicationSlot);
 
-	ReplicationSlotAcquire(cmd->slotname, true);
+	ReplicationSlotAcquire(cmd->slotname, true, true);
 
 	/*
 	 * Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 8a45b5827e..e8bc986c07 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -298,7 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
 	slot_name = PG_GETARG_NAME(0);
 
 	/* Acquire the given slot */
-	ReplicationSlotAcquire(NameStr(*slot_name), true);
+	ReplicationSlotAcquire(NameStr(*slot_name), true, true);
 
 	Assert(SlotIsLogical(MyReplicationSlot));
 
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index d2cf786fd5..f5f2d22163 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -253,7 +253,8 @@ extern void ReplicationSlotDropAcquired(void);
 extern void ReplicationSlotAlter(const char *name, const bool *failover,
 								 const bool *two_phase);
 
-extern void ReplicationSlotAcquire(const char *name, bool nowait);
+extern void ReplicationSlotAcquire(const char *name, bool nowait,
+								   bool error_if_invalid);
 extern void ReplicationSlotRelease(void);
 extern void ReplicationSlotCleanup(bool synced_only);
 extern void ReplicationSlotSave(void);
diff --git a/src/test/recovery/t/019_replslot_limit.pl b/src/test/recovery/t/019_replslot_limit.pl
index efb4ba3af1..333e040e7f 100644
--- a/src/test/recovery/t/019_replslot_limit.pl
+++ b/src/test/recovery/t/019_replslot_limit.pl
@@ -234,7 +234,7 @@ my $failed = 0;
 for (my $i = 0; $i < 10 * $PostgreSQL::Test::Utils::timeout_default; $i++)
 {
 	if ($node_standby->log_contains(
-			"requested WAL segment [0-9A-F]+ has already been removed",
+			"This slot has been invalidated because the required WAL has been removed",
 			$logstart))
 	{
 		$failed = 1;
-- 
2.34.1

