From ad9e4b1484ed93afd86ef3017528d3bcb1b826c3 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Tue, 11 Nov 2025 18:12:53 +0800
Subject: [PATCH v4HEAD] Fix a race condition of updating
 procArray->replication_slot_xmin.

Previously, ReplicationSlotsComputeRequiredXmin() computed the oldest
xmin across all slots while not holding ProcArrayLock if
already_locked is false, and acquires the ProcArrayLock just before
updating the replication slot xmin.

This could lead to a race condition: if a backend creates a new slot and
attempts to initialize the slot.xmin, but meanwhile, another backend invokes
ReplicationSlotsComputeRequiredXmin() with already_locked set to false, the
global slot xmin may be initially updated by the newly created slot, only to be
subsequently overwritten by the backend running
ReplicationSlotsComputeRequiredXmin() with an invalid or new xid value.

In the reported failure, a walsender for an apply worker computes
InvalidTransaction as the oldest xmin and overwrote a valid
replication slot xmin value computed by a walsender for a tablesync
worker with this value. Then the walsender for a tablesync worker
ended up computing the transaction id by
GetOldestSafeDecodingTransactionId() without considering replication
slot xmin. That led to an error ""cannot build an initial slot
snapshot as oldest safe xid %u follows snapshot's xmin %u", which was
an assertion failure prior to 240e0dbacd3.

To address the bug, we acquire the ReplicationSlotControlLock exclusively during
slot creation to do the initial update of the slot xmin. In
ReplicationSlotsComputeRequiredXmin(), we hold the ReplicationSlotControlLock in
shared mode until the global slot xmin is updated in
ProcArraySetReplicationSlotXmin(). This approach prevents concurrent
computations and updates of new xmin horizons by other backends during the
initial slot xmin update process, while it still permits concurrent calls to
ReplicationSlotsComputeRequiredXmin().
---
 src/backend/replication/logical/launcher.c |  2 ++
 src/backend/replication/logical/logical.c  | 12 +++++----
 src/backend/replication/logical/slotsync.c |  2 ++
 src/backend/replication/slot.c             | 31 ++++++++++++++++++----
 4 files changed, 37 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index fdf1ccad462..5592223a52b 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1540,6 +1540,7 @@ init_conflict_slot_xmin(void)
 	Assert(MyReplicationSlot &&
 		   !TransactionIdIsValid(MyReplicationSlot->data.xmin));
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(false);
@@ -1552,6 +1553,7 @@ init_conflict_slot_xmin(void)
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	/* Write this slot to disk */
 	ReplicationSlotMarkDirty();
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 866f92cf799..e9a07e67a73 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -405,11 +405,11 @@ CreateInitDecodingContext(const char *plugin,
 	 * without further interlock its return value might immediately be out of
 	 * date.
 	 *
-	 * So we have to acquire the ProcArrayLock to prevent computation of new
-	 * xmin horizons by other backends, get the safe decoding xid, and inform
-	 * the slot machinery about the new limit. Once that's done the
-	 * ProcArrayLock can be released as the slot machinery now is
-	 * protecting against vacuum.
+	 * So we have to acquire both the ReplicationSlotControlLock and the
+	 * ProcArrayLock to prevent concurrent computation and update of new xmin
+	 * horizons by other backends, get the safe decoding xid, and inform the
+	 * slot machinery about the new limit. Once that's done the both locks
+	 * can be released as the slot machinery now is protecting against vacuum.
 	 *
 	 * Note that, temporarily, the data, not just the catalog, xmin has to be
 	 * reserved if a data snapshot is to be exported.  Otherwise the initial
@@ -422,6 +422,7 @@ CreateInitDecodingContext(const char *plugin,
 	 *
 	 * ----
 	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
@@ -436,6 +437,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8b4afd87dc9..84f1d62b572 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -775,6 +775,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 
 		reserve_wal_for_local_slot(remote_slot->restart_lsn);
 
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
 		SpinLockAcquire(&slot->mutex);
@@ -783,6 +784,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		SpinLockRelease(&slot->mutex);
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
+		LWLockRelease(ReplicationSlotControlLock);
 
 		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1ec1e997b27..61c575fdd4b 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1170,8 +1170,11 @@ ReplicationSlotPersist(void)
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  *
- * If already_locked is true, ProcArrayLock has already been acquired
- * exclusively.
+ * If already_locked is true, both the ReplicationSlotControlLock and
+ * the ProcArrayLock have already been acquired exclusively.
+ *
+ * Note that the ReplicationSlotControlLock must be locked first to avoid
+ * deadlocks.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -1181,8 +1184,25 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 	TransactionId agg_catalog_xmin = InvalidTransactionId;
 
 	Assert(ReplicationSlotCtl != NULL);
+	Assert(!already_locked ||
+		   (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
+			LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
 
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	/*
+	 * Hold the ReplicationSlotControlLock until after updating the slot xmin
+	 * values, so no backend update the initial xmin for newly created slot
+	 * concurrently. A shared lock is used here to minimize lock contention,
+	 * especially when many slots exist and advancements occur frequently. This
+	 * is safe since an exclusive lock is taken during initial slot xmin update
+	 * in slot creation.
+	 *
+	 * One might think that we can hold the ProcArrayLock exclusively and update
+	 * the slot xmin values, but it could increase lock contention on the
+	 * ProcArrayLock, which is not great since this function can be called at
+	 * non-negligible frequency.
+	 */
+	if (!already_locked)
+		LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -1217,9 +1237,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 			agg_catalog_xmin = effective_catalog_xmin;
 	}
 
-	LWLockRelease(ReplicationSlotControlLock);
-
 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
+
+	if (!already_locked)
+		LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*
-- 
2.51.1.windows.1

