From a7bd2863fd2c15c3acc5221252673a7715025124 Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Wed, 27 Aug 2025 18:11:38 +0800
Subject: [PATCH v1] Avoid retaining conflict-related data when no tables are
 subscribed

This commit fixes an issue where conflict-related data was unnecessarily
retained when the subscription does not have a table.
---
 .../replication/logical/applyparallelworker.c  |  2 +-
 src/backend/replication/logical/tablesync.c    | 18 +++++++++++++-----
 src/backend/replication/logical/worker.c       | 14 ++++++++++++--
 src/include/replication/worker_internal.h      |  2 +-
 4 files changed, 27 insertions(+), 9 deletions(-)

diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c
index 31a92d1a24a..5b4fb6a08c4 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -309,7 +309,7 @@ pa_can_start(void)
 	 * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
 	 * time. So, we don't start the new parallel apply worker in this case.
 	 */
-	if (!AllTablesyncsReady())
+	if (!AllTablesyncsReady(false))
 		return false;
 
 	return true;
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d3356bc84ee..d0dca0ebf5e 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -664,7 +664,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
 		if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING)
 		{
 			CommandCounterIncrement();	/* make updates visible */
-			if (AllTablesyncsReady())
+			if (AllTablesyncsReady(false))
 			{
 				ereport(LOG,
 						(errmsg("logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
@@ -1759,15 +1759,19 @@ TablesyncWorkerMain(Datum main_arg)
 }
 
 /*
- * If the subscription has no tables then return false.
+ * Check if all tablesyncs are READY for the current subscription.
  *
- * Otherwise, are all tablesyncs READY?
+ * If the subscription has no tables, return the value determined by
+ * 'ready_if_no_tables'.
+ *
+ * Otherwise, return whether all the tables for the subscription are in the
+ * READY state.
  *
  * Note: This function is not suitable to be called from outside of apply or
  * tablesync workers because MySubscription needs to be already initialized.
  */
 bool
-AllTablesyncsReady(void)
+AllTablesyncsReady(bool ready_if_no_tables)
 {
 	bool		started_tx = false;
 	bool		has_subrels = false;
@@ -1781,11 +1785,15 @@ AllTablesyncsReady(void)
 		pgstat_report_stat(true);
 	}
 
+	/* If there are no tables, decide readiness based on the parameter */
+	if (!has_subrels)
+		return ready_if_no_tables;
+
 	/*
 	 * Return false when there are no tables in subscription or not all tables
 	 * are in ready state; true otherwise.
 	 */
-	return has_subrels && (table_states_not_ready == NIL);
+	return table_states_not_ready == NIL;
 }
 
 /*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 22ad9051db3..136584d4569 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4547,8 +4547,18 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	 * It is safe to add new tables with initial states to the subscription
 	 * after this check because any changes applied to these tables should
 	 * have a WAL position greater than the rdt_data->remote_lsn.
+	 *
+	 * Advancing the transaction ID is also necessary when no tables are
+	 * subscribed, as it prevents unnecessary retention of dead tuples. Although
+	 * it seem feasible to skip all phases and directly assign candidate_xid to
+	 * oldest_nonremovable_xid in the RDT_GET_CANDIDATE_XID phase when no tables
+	 * are currently subscribed, this approach is unsafe. This is because new
+	 * tables may be added to the subscription after the initial table check,
+	 * requiring tuples deleted before candidate_xid for conflict detection in
+	 * upcoming transactions. Therefore, it remains necessary to wait for all
+	 * concurrent transactions to be fully applied.
 	 */
-	if (!AllTablesyncsReady())
+	if (!AllTablesyncsReady(true))
 		return;
 
 	/*
@@ -5345,7 +5355,7 @@ run_apply_worker()
 	 * work.
 	 */
 	if (MySubscription->twophasestate == LOGICALREP_TWOPHASE_STATE_PENDING &&
-		AllTablesyncsReady())
+		AllTablesyncsReady(false))
 	{
 		/* Start streaming with two_phase enabled */
 		options.proto.logical.twophase = true;
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 7c0204dd6f4..08a8dcaba20 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -268,7 +268,7 @@ extern int	logicalrep_sync_worker_count(Oid subid);
 extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 											   char *originname, Size szoriginname);
 
-extern bool AllTablesyncsReady(void);
+extern bool AllTablesyncsReady(bool ready_if_no_tables);
 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
 
 extern void process_syncing_tables(XLogRecPtr current_lsn);
-- 
2.51.0.windows.1

