From 5a32d4fb3a4aff772de9653b5f40f2d6c9c71636 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Fri, 21 Nov 2025 13:21:42 +0800
Subject: [PATCH v6 3/3] Fix race conditions causing invalidation of newly
 synced slots

When initially syncing a replication slot to the standby server, the remote
restart_lsn queried from the publisher is used as the initial restart_lsn for
newly synced slots. As slots are synced asynchronously, the synced restart_lsn
may lag behind the standby's redo pointer. Due to the lack of interlock between
WAL reservation and checkpoints, this creates a race conditions where
checkpoints might remove required WALs and invalidate the newly synced slots.

To address this issue, this commit changes the WAL reservation to compare the
remote restart_lsn with both minimum slot LSN and redo pointer. If either local
LSN is less than the remote restart_lsn, we update the local slot with the
remote value. Otherwise, we use the minimum of the two local LSNs.

Additionally, we acquire both ReplicationSlotAllocationLock and
ReplicationSlotControlLock during reservation to prevent the checkpoint from
updating the redo pointer and other backend processes from updating the minimum
slot LSN concurrently.
---
 src/backend/access/transam/xlog.c          |  3 +-
 src/backend/replication/logical/slotsync.c | 97 ++++++++++++----------
 src/include/access/xlog.h                  |  1 +
 3 files changed, 53 insertions(+), 48 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 22d0a2e8c3a..17324259cff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -668,7 +668,6 @@ static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
 												  TimeLineID newTLI);
 static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
 static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
-static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
 static void AdvanceXLInsertBuffer(XLogRecPtr upto, TimeLineID tli,
 								  bool opportunistic);
@@ -2678,7 +2677,7 @@ XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn)
  * Return the oldest LSN we must retain to satisfy the needs of some
  * replication slot.
  */
-static XLogRecPtr
+XLogRecPtr
 XLogGetReplicationSlotMinimumLSN(void)
 {
 	XLogRecPtr	retval;
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 8b4afd87dc9..de404f47381 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -49,6 +49,7 @@
 
 #include <time.h>
 
+#include "access/xlog.h"
 #include "access/xlog_internal.h"
 #include "access/xlogrecovery.h"
 #include "catalog/pg_database.h"
@@ -482,70 +483,74 @@ drop_local_obsolete_slots(List *remote_slot_list)
  * Reserve WAL for the currently active local slot using the specified WAL
  * location (restart_lsn).
  *
- * If the given WAL location has been removed, reserve WAL using the oldest
- * existing WAL segment.
+ * If the given WAL location has been removed or is at risk of being removed,
+ * reserve WAL using the oldest segment that is non-removable.
  */
 static void
 reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
 {
-	XLogSegNo	oldest_segno;
+	XLogRecPtr	slot_min_lsn;
+	XLogRecPtr	min_safe_lsn;
 	XLogSegNo	segno;
 	ReplicationSlot *slot = MyReplicationSlot;
 
 	Assert(slot != NULL);
 	Assert(!XLogRecPtrIsValid(slot->data.restart_lsn));
 
-	while (true)
-	{
-		SpinLockAcquire(&slot->mutex);
-		slot->data.restart_lsn = restart_lsn;
-		SpinLockRelease(&slot->mutex);
+	/*
+	 * Acquire an exclusive lock to prevent the checkpoint process from
+	 * concurrently calculating the minimum slot LSN (see
+	 * CheckPointReplicationSlots), ensuring that if WAL reservation occurs
+	 * first, the checkpoint must wait for the restart_lsn update before
+	 * calculating the minimum LSN.
+	 *
+	 * Note: Unlike in ReplicationSlotReserveWal(), this lock does not protect
+	 * a newly synced slot if a checkpoint occurs before the WAL reservation
+	 * here. The initial restart_lsn passed here may be outdated, preceding
+	 * the redo pointer. Therefore, when initializing a new slot below, we
+	 * might use the redo pointer or the minimum slot LSN instead of relying
+	 * solely on the passed value.
+	 */
+	LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
 
-		/* Prevent WAL removal as fast as possible */
-		ReplicationSlotsComputeRequiredLSN();
+	/*
+	 * Acquire an lock to prevent other backends from updating the minimum slot
+	 * LSN concurrently, ensuring that WALs prior to the minimum LSN are not
+	 * removable during this lock period.
+	 */
+	LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
 
-		XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+	/*
+	 * Select the safe non-removable LSN by evaluating the redo pointer
+	 * alongside the minimum slot LSN.
+	 */
+	min_safe_lsn = GetRedoRecPtr();
+	slot_min_lsn = XLogGetReplicationSlotMinimumLSN();
 
-		/*
-		 * Find the oldest existing WAL segment file.
-		 *
-		 * Normally, we can determine it by using the last removed segment
-		 * number. However, if no WAL segment files have been removed by a
-		 * checkpoint since startup, we need to search for the oldest segment
-		 * file from the current timeline existing in XLOGDIR.
-		 *
-		 * XXX: Currently, we are searching for the oldest segment in the
-		 * current timeline as there is less chance of the slot's restart_lsn
-		 * from being some prior timeline, and even if it happens, in the
-		 * worst case, we will wait to sync till the slot's restart_lsn moved
-		 * to the current timeline.
-		 */
-		oldest_segno = XLogGetLastRemovedSegno() + 1;
+	if (XLogRecPtrIsValid(slot_min_lsn) && min_safe_lsn > slot_min_lsn)
+		min_safe_lsn = slot_min_lsn;
 
-		if (oldest_segno == 1)
-		{
-			TimeLineID	cur_timeline;
+	/*
+	 * If the minimum safe LSN is greater than the given restart_lsn, use it as
+	 * the initial restart_lsn for the newly synced slot.
+	 */
+	if (min_safe_lsn > restart_lsn)
+		restart_lsn = min_safe_lsn;
 
-			GetWalRcvFlushRecPtr(NULL, &cur_timeline);
-			oldest_segno = XLogGetOldestSegno(cur_timeline);
-		}
+	SpinLockAcquire(&slot->mutex);
+	slot->data.restart_lsn = restart_lsn;
+	SpinLockRelease(&slot->mutex);
 
-		elog(DEBUG1, "segno: " UINT64_FORMAT " of purposed restart_lsn for the synced slot, oldest_segno: " UINT64_FORMAT " available",
-			 segno, oldest_segno);
+	LWLockRelease(ReplicationSlotControlLock);
 
-		/*
-		 * If all required WAL is still there, great, otherwise retry. The
-		 * slot should prevent further removal of WAL, unless there's a
-		 * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
-		 * the new restart_lsn above, so normally we should never need to loop
-		 * more than twice.
-		 */
-		if (segno >= oldest_segno)
-			break;
+	ReplicationSlotsComputeRequiredLSN();
 
-		/* Retry using the location of the oldest wal segment */
-		XLogSegNoOffsetToRecPtr(oldest_segno, 0, wal_segment_size, restart_lsn);
-	}
+	XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
+	if (XLogGetLastRemovedSegno() >= segno)
+		elog(ERROR, "WAL required by replication slot %s has been removed concurrently",
+			 NameStr(slot->data.name));
+
+	LWLockRelease(ReplicationSlotAllocationLock);
 }
 
 /*
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 605280ed8fb..935d3df8758 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -215,6 +215,7 @@ extern XLogSegNo XLogGetLastRemovedSegno(void);
 extern XLogSegNo XLogGetOldestSegno(TimeLineID tli);
 extern void XLogSetAsyncXactLSN(XLogRecPtr asyncXactLSN);
 extern void XLogSetReplicationSlotMinimumLSN(XLogRecPtr lsn);
+extern XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
 
 extern void xlog_redo(struct XLogReaderState *record);
 extern void xlog_desc(StringInfo buf, struct XLogReaderState *record);
-- 
2.51.1.windows.1

