From ccc2e4c76779d765a104b0de69f0154ddb71e36a Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Tue, 11 Nov 2025 18:12:53 +0800
Subject: [PATCH v3] Fix a race condition of updating
 procArray->replication_slot_xmin.

Previously, ReplicationSlotsComputeRequiredXmin() computed the oldest
xmin across all slots while not holding ProcArrayLock if
already_locked is false, and acquires the ProcArrayLock just before
updating the replication slot xmin. Therefore, if a process calls
ReplicationSlotsComputeRequiredXmin() with already_locked being false
and another process updates the replication slot xmin before the
process acquiring the lock, the slot xmin was overwritten with an old
value.

In the reported failure, a walsender for an apply worker computes
InvalidTransaction as the oldest xmin and overwrote a valid
replication slot xmin value computed by a walsender for a tablesync
worker with this value. Then the walsender for a tablesync worker
ended up computing the transaction id by
GetOldestSafeDecodingTransactionId() without considering replication
slot xmin. That led to an error ""cannot build an initial slot
snapshot as oldest safe xid %u follows snapshot's xmin %u", which was
an assertion failure prior to 240e0dbacd3.

This commit changes ReplicationSlotsComputeRequiredXmin() so that it
computes the oldest xmin while holding ProcArrayLock in exclusive
mode. We keep already_locked parameter in
ProcArraySetReplicationSlotXmin() on backbranches to not break ABI
compatibility.
---
 src/backend/replication/logical/launcher.c |  2 ++
 src/backend/replication/logical/logical.c  | 12 ++++----
 src/backend/replication/logical/slotsync.c |  2 ++
 src/backend/replication/slot.c             | 32 ++++++++++++++++++----
 4 files changed, 38 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 6214028eda9..86aced9bdf5 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -1540,6 +1540,7 @@ init_conflict_slot_xmin(void)
 	Assert(MyReplicationSlot &&
 		   !TransactionIdIsValid(MyReplicationSlot->data.xmin));
 
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(false);
@@ -1552,6 +1553,7 @@ init_conflict_slot_xmin(void)
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	/* Write this slot to disk */
 	ReplicationSlotMarkDirty();
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 866f92cf799..e9a07e67a73 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -405,11 +405,11 @@ CreateInitDecodingContext(const char *plugin,
 	 * without further interlock its return value might immediately be out of
 	 * date.
 	 *
-	 * So we have to acquire the ProcArrayLock to prevent computation of new
-	 * xmin horizons by other backends, get the safe decoding xid, and inform
-	 * the slot machinery about the new limit. Once that's done the
-	 * ProcArrayLock can be released as the slot machinery now is
-	 * protecting against vacuum.
+	 * So we have to acquire both the ReplicationSlotControlLock and the
+	 * ProcArrayLock to prevent concurrent computation and update of new xmin
+	 * horizons by other backends, get the safe decoding xid, and inform the
+	 * slot machinery about the new limit. Once that's done the both locks
+	 * can be released as the slot machinery now is protecting against vacuum.
 	 *
 	 * Note that, temporarily, the data, not just the catalog, xmin has to be
 	 * reserved if a data snapshot is to be exported.  Otherwise the initial
@@ -422,6 +422,7 @@ CreateInitDecodingContext(const char *plugin,
 	 *
 	 * ----
 	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
 	xmin_horizon = GetOldestSafeDecodingTransactionId(!need_full_snapshot);
@@ -436,6 +437,7 @@ CreateInitDecodingContext(const char *plugin,
 	ReplicationSlotsComputeRequiredXmin(true);
 
 	LWLockRelease(ProcArrayLock);
+	LWLockRelease(ReplicationSlotControlLock);
 
 	ReplicationSlotMarkDirty();
 	ReplicationSlotSave();
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8b4afd87dc9..84f1d62b572 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -775,6 +775,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 
 		reserve_wal_for_local_slot(remote_slot->restart_lsn);
 
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 		xmin_horizon = GetOldestSafeDecodingTransactionId(true);
 		SpinLockAcquire(&slot->mutex);
@@ -783,6 +784,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 		SpinLockRelease(&slot->mutex);
 		ReplicationSlotsComputeRequiredXmin(true);
 		LWLockRelease(ProcArrayLock);
+		LWLockRelease(ReplicationSlotControlLock);
 
 		update_and_persist_local_synced_slot(remote_slot, remote_dbid);
 
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 1ec1e997b27..6712c6ee7c7 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -1170,8 +1170,11 @@ ReplicationSlotPersist(void)
 /*
  * Compute the oldest xmin across all slots and store it in the ProcArray.
  *
- * If already_locked is true, ProcArrayLock has already been acquired
- * exclusively.
+ * If already_locked is true, both the ReplicationSlotControlLock and
+ * the ProcArrayLock have already been acquired exclusively.
+ *
+ * Note that the ReplicationSlotControlLock must be locked first to avoid
+ * deadlocks.
  */
 void
 ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -1181,8 +1184,26 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 	TransactionId agg_catalog_xmin = InvalidTransactionId;
 
 	Assert(ReplicationSlotCtl != NULL);
+	Assert(!already_locked ||
+		   (LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_EXCLUSIVE) &&
+			LWLockHeldByMeInMode(ProcArrayLock, LW_EXCLUSIVE)));
 
-	LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+	/*
+	 * Hold the ReplicationSlotControlLock exclusive until after updating the
+	 * slot xmin values, so no backend can compute and update the new value
+	 * concurrently.
+	 *
+	 * One might think that we can hold the ProcArrayLock exclusively, compute
+	 * the xmin values while holding the ReplicationSlotControlLock in shared
+	 * mode, and update the slot xmin values, but it could increase lock
+	 * contention on the ProcArrayLock, which is not great since this function
+	 * can be called at non-negligible frequency.
+	 *
+	 * We instead increase lock contention on the ReplicationSlotControlLock
+	 * but it would be less harmful.
+	 */
+	if (!already_locked)
+		LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 
 	for (i = 0; i < max_replication_slots; i++)
 	{
@@ -1217,9 +1238,10 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
 			agg_catalog_xmin = effective_catalog_xmin;
 	}
 
-	LWLockRelease(ReplicationSlotControlLock);
-
 	ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
+
+	if (!already_locked)
+		LWLockRelease(ReplicationSlotControlLock);
 }
 
 /*
-- 
2.31.1

