From 86d7a437f737e3976ad28dcf8056b08ec64c87b4 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Wed, 6 Aug 2025 19:02:56 +0800
Subject: [PATCH v58 2/2] Resume retaining the information for conflict
 detection

The patch allows the launcher to re-initialized invalidated slot, if at
least one apply worker has confirmed that the retention duration is now within
the max_conflict_retention_duration.
---
 doc/src/sgml/config.sgml                   |   7 +-
 src/backend/replication/logical/launcher.c |  83 +++++++++++++----
 src/backend/replication/logical/worker.c   | 100 ++++++++++++++++-----
 src/include/replication/worker_internal.h  |   7 ++
 4 files changed, 158 insertions(+), 39 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 5a9aee66e3f..6cd0b48c8dd 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -5421,8 +5421,11 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
         <literal>retain_dead_tuples</literal> is enabled, confirm that the
         retention duration exceeded the
         <literal>max_conflict_retention_duration</literal>. To re-enable
-        retention, you can disable <literal>retain_dead_tuples</literal> and
-        re-enable it after confirming this replication slot has been dropped.
+        retention manually, you can disable <literal>retain_dead_tuples</literal>
+        and re-enable it after confirming this replication slot has been dropped.
+        Alternatively, the retention will be automatically resumed
+        once at least one apply worker confirms that the retention duration is
+        within the specified limit.
        </para>
        <para>
         This option is effective only if a subscription with
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 03aa5b2b7a9..c3ffff0fdb2 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -103,9 +103,12 @@ static int	logicalrep_pa_worker_count(Oid subid);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
-static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
+static void compute_min_nonremovable_xid(LogicalRepWorker *worker,
+										 bool can_advance_xmin,
+										 TransactionId *xmin);
 static bool acquire_conflict_slot_if_exists(void);
 static void update_conflict_slot_xmin(TransactionId new_xmin);
+static void init_conflict_slot_xmin(void);
 
 
 /*
@@ -465,6 +468,8 @@ retry:
 	worker->stream_fileset = NULL;
 	worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
 	worker->parallel_apply = is_parallel_apply_worker;
+	worker->stop_conflict_info_retention = (retain_dead_tuples &&
+											!TransactionIdIsValid(MyReplicationSlot->data.xmin));
 	worker->oldest_nonremovable_xid = retain_dead_tuples
 		? MyReplicationSlot->data.xmin
 		: InvalidTransactionId;
@@ -1259,8 +1264,8 @@ ApplyLauncherMain(Datum main_arg)
 				 * required for conflict detection among all running apply
 				 * workers that enables retain_dead_tuples.
 				 */
-				if (sub->retaindeadtuples && can_advance_xmin)
-					compute_min_nonremovable_xid(w, &xmin);
+				if (sub->retaindeadtuples)
+					compute_min_nonremovable_xid(w, can_advance_xmin, &xmin);
 
 				/* worker is running already */
 				continue;
@@ -1370,9 +1375,11 @@ ApplyLauncherMain(Datum main_arg)
  * in *xmin.
  */
 static void
-compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
+compute_min_nonremovable_xid(LogicalRepWorker *worker, bool can_advance_xmin,
+							 TransactionId *xmin)
 {
 	TransactionId nonremovable_xid;
+	bool		stop_retention;
 
 	Assert(worker != NULL);
 
@@ -1384,13 +1391,43 @@ compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
 
 	SpinLockAcquire(&worker->relmutex);
 	nonremovable_xid = worker->oldest_nonremovable_xid;
+	stop_retention = worker->stop_conflict_info_retention;
 	SpinLockRelease(&worker->relmutex);
 
 	/*
 	 * Skip collecting oldest_nonremovable_xid for workers that have stopped
 	 * conflict retention.
 	 */
+	if (stop_retention)
+		return;
+
+	/*
+	 * Initialize slot.xmin as a apply worker resumes retention of information
+	 * critical for conflict detection.
+	 */
+	if (!TransactionIdIsValid(MyReplicationSlot->data.xmin))
+		init_conflict_slot_xmin();
+
+	/*
+	 * Assign slot.xmin to the apply worker's oldest_nonremovable_xid if the
+	 * latter is invalid. This ensures the apply worker continues to maintain
+	 * the oldest_nonremovable_xid (see get_candidate_xid).
+	 */
 	if (!TransactionIdIsValid(nonremovable_xid))
+	{
+		nonremovable_xid = MyReplicationSlot->data.xmin;
+
+		SpinLockAcquire(&worker->relmutex);
+		worker->oldest_nonremovable_xid = nonremovable_xid;
+		SpinLockRelease(&worker->relmutex);
+
+		/* Notify the apply worker to start the next cycle of management */
+		LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
+		logicalrep_worker_wakeup_ptr(worker);
+		LWLockRelease(LogicalRepWorkerLock);
+	}
+
+	if (!can_advance_xmin)
 		return;
 
 	if (!TransactionIdIsValid(*xmin) ||
@@ -1452,23 +1489,15 @@ update_conflict_slot_xmin(TransactionId new_xmin)
 }
 
 /*
- * Create and acquire the replication slot used to retain information for
- * conflict detection, if not yet.
+ * Initialize the xmin for the conflict detection slot.
  */
-void
-CreateConflictDetectionSlot(void)
+static void
+init_conflict_slot_xmin(void)
 {
 	TransactionId xmin_horizon;
 
-	/* Exit early, if the replication slot is already created and acquired */
-	if (MyReplicationSlot)
-		return;
-
-	ereport(LOG,
-			errmsg("creating replication conflict detection slot"));
-
-	ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
-						  false, false);
+	Assert(MyReplicationSlot &&
+		   !TransactionIdIsValid(MyReplicationSlot->data.xmin));
 
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
@@ -1488,6 +1517,26 @@ CreateConflictDetectionSlot(void)
 	ReplicationSlotSave();
 }
 
+/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+	/* Exit early, if the replication slot is already created and acquired */
+	if (MyReplicationSlot)
+		return;
+
+	ereport(LOG,
+			errmsg("creating replication conflict detection slot"));
+
+	ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+						  false, false);
+
+	init_conflict_slot_xmin();
+}
+
 /*
  * Is current process the logical replication launcher?
  */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index e327ffe8e00..72deb83c228 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -418,6 +418,10 @@ typedef struct RetainDeadTuplesData
 	long		table_sync_wait_time;	/* time spent waiting for table sync
 										 * to finish */
 
+	bool		wait_for_initial_xid;	/* wait for the launcher to initialize
+										 * the apply worker's
+										 * oldest_nonremovable_xid */
+
 	/*
 	 * The following fields are used to determine the timing for the next
 	 * round of transaction ID advancement.
@@ -4375,10 +4379,6 @@ can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
 	if (!MySubscription->retaindeadtuples)
 		return false;
 
-	/* No need to advance if we have already stopped retaining */
-	if (!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
-		return false;
-
 	return true;
 }
 
@@ -4416,6 +4416,33 @@ get_candidate_xid(RetainDeadTuplesData *rdt_data)
 	TransactionId oldest_running_xid;
 	TimestampTz now;
 
+	/*
+	 * No need to advance if the apply worker has resumed retention but the
+	 * launcher has not yet initialized slot.xmin and assigned it to
+	 * oldest_nonremovable_xid.
+	 *
+	 * It might seem feasible to directly check the conflict detection
+	 * slot.xmin instead of relying on the launcher to assign the worker's
+	 * oldest_nonremovable_xid; however, that could lead to a race condition
+	 * where slot.xmin is set to InvalidTransactionId immediately after the
+	 * check. In such cases, oldest_nonremovable_xid would no longer be
+	 * protected by a replication slot and could become unreliable if a
+	 * wraparound occurs.
+	 */
+	if (rdt_data->wait_for_initial_xid)
+	{
+		TransactionId nonremovable_xid;
+
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		nonremovable_xid = MyLogicalRepWorker->oldest_nonremovable_xid;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+		if (!TransactionIdIsValid(nonremovable_xid))
+			return;
+
+		rdt_data->wait_for_initial_xid = false;
+	}
+
 	/*
 	 * Use last_recv_time when applying changes in the loop to avoid
 	 * unnecessary system time retrieval. If last_recv_time is not available,
@@ -4660,13 +4687,38 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	if (last_flushpos < rdt_data->remote_lsn)
 		return;
 
+	/*
+	 * If conflict info retention was previously stopped due to a timeout, and
+	 * the time required to advance the non-removable transaction ID has now
+	 * decreased to within acceptable limits, log a message. The next step is
+	 * to wait for the launcher to initialize the oldest_nonremovable_xid.
+	 */
+	if (MyLogicalRepWorker->stop_conflict_info_retention)
+	{
+		ereport(LOG,
+				errmsg("logical replication worker for subscription \"%s\" resumes retaining the information for detecting conflicts",
+					   MySubscription->name),
+				errdetail("The time spent applying changes up to LSN %X/%X is now within the maximum limit of %u ms.",
+						  LSN_FORMAT_ARGS(rdt_data->remote_lsn),
+						  max_conflict_retention_duration));
+
+		rdt_data->wait_for_initial_xid = true;
+	}
+
 	/*
 	 * Reaching here means the remote WAL position has been received, and all
 	 * transactions up to that position on the publisher have been applied and
 	 * flushed locally. So, we can advance the non-removable transaction ID.
+	 *
+	 * However, if oldest_nonremovable_xid is invalid, indicating that
+	 * retention was stopped and is now being resumed, refrain from updating
+	 * oldest_nonremovable_xid until the launcher provides an initial value
+	 * (see get_candidate_xid() for details).
 	 */
 	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+	if (TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
+		MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+	MyLogicalRepWorker->stop_conflict_info_retention = false;
 	SpinLockRelease(&MyLogicalRepWorker->relmutex);
 
 	elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
@@ -4709,9 +4761,8 @@ reset_retention_data_fields(RetainDeadTuplesData *rdt_data)
  * to InvalidTransactionId, notify the launcher to set the slot.xmin to
  * InvalidTransactionId as well, and return true. Return false otherwise.
  *
- * Currently, the retention will not resume automatically unless user manually
- * disables retain_dead_tuples and re-enables it after confirming that the
- * replication slot has been dropped.
+ * The retention will resume automatically if the worker has confirmed that the
+ * retention duration is now within the max_conflict_retention_duration.
  */
 static bool
 should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
@@ -4741,18 +4792,26 @@ should_stop_conflict_info_retention(RetainDeadTuplesData *rdt_data)
 									rdt_data->table_sync_wait_time))
 		return false;
 
-	ereport(LOG,
-			errmsg("logical replication worker for subscription \"%s\" will stop retaining conflict information",
-				   MySubscription->name),
-			errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.",
-					  max_conflict_retention_duration));
-
-	SpinLockAcquire(&MyLogicalRepWorker->relmutex);
-	MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
-	SpinLockRelease(&MyLogicalRepWorker->relmutex);
-
-	/* Notify launcher to update the conflict slot */
-	ApplyLauncherWakeup();
+	/*
+	 * Log a message and reset relevant data when the worker is about to stop
+	 * retaining conflict information.
+	 */
+	if (!MyLogicalRepWorker->stop_conflict_info_retention)
+	{
+		ereport(LOG,
+				errmsg("logical replication worker for subscription \"%s\" will stop retaining the information for detecting conflicts",
+					   MySubscription->name),
+				errdetail("The time spent advancing the non-removable transaction ID has exceeded the maximum limit of %u ms.",
+						  max_conflict_retention_duration));
+
+		SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+		MyLogicalRepWorker->oldest_nonremovable_xid = InvalidTransactionId;
+		MyLogicalRepWorker->stop_conflict_info_retention = true;
+		SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+		/* Notify launcher to update the conflict slot */
+		ApplyLauncherWakeup();
+	}
 
 	reset_retention_data_fields(rdt_data);
 
@@ -5607,6 +5666,7 @@ InitializeLogRepWorker(void)
 	 */
 	if (am_leader_apply_worker() &&
 		MySubscription->retaindeadtuples &&
+		!MyLogicalRepWorker->stop_conflict_info_retention &&
 		!TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
 	{
 		ereport(LOG,
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index b86c759394f..54a55e7c1bd 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -100,6 +100,13 @@ typedef struct LogicalRepWorker
 	 */
 	TransactionId oldest_nonremovable_xid;
 
+	/*
+	 * Indicates whether the apply worker has stopped retaining information
+	 * useful for conflict detection. This is used only when
+	 * retain_dead_tuples is enabled.
+	 */
+	bool		stop_conflict_info_retention;
+
 	/* Stats. */
 	XLogRecPtr	last_lsn;
 	TimestampTz last_send_time;
-- 
2.50.1.windows.1

