From bf7594e1aea108560aaa8f0858246cf126890bbb Mon Sep 17 00:00:00 2001
From: Zhijie Hou <houzj.fnst@fujitsu.com>
Date: Wed, 27 Aug 2025 18:11:38 +0800
Subject: [PATCH v3] Post-commit review fixes for 228c370

This commit fixes two issues:

1) When a disabled subscription is created with retain_dead_tuples set to true,
the launcher is not woken up immediately, which may lead to delays in creating
the conflict detection slot.

Creating the conflict detection slot is essential even when the subscription is
not enabled. This ensures that dead tuples are retained, which is necessary for
accurately identifying the type of conflict during replication.

2) Conflict-related data was unnecessarily retained when the subscription does
not have a table.
---
 src/backend/commands/subscriptioncmds.c     | 12 ++++++++-
 src/backend/replication/logical/tablesync.c | 26 ++++++++++++++++++
 src/backend/replication/logical/worker.c    | 25 +++++++++++++++---
 src/include/replication/worker_internal.h   |  1 +
 src/test/subscription/t/035_conflicts.pl    | 29 +++++++++++++++++++++
 5 files changed, 88 insertions(+), 5 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 82cf65fae73..750d262fcca 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -854,7 +854,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 
 	pgstat_create_subscription(subid);
 
-	if (opts.enabled)
+	/*
+	 * Notify the launcher to start the apply worker if the subscription is
+	 * enabled, or to create the conflict detection slot if retain_dead_tuples
+	 * is enabled.
+	 *
+	 * Creating the conflict detection slot is essential even when the
+	 * subscription is not enabled. This ensures that dead tuples are
+	 * retained, which is necessary for accurately identifying the type of
+	 * conflict during replication.
+	 */
+	if (opts.enabled || opts.retaindeadtuples)
 		ApplyLauncherWakeupAtCommit();
 
 	ObjectAddressSet(myself, SubscriptionRelationId, subid);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index d3356bc84ee..e6da4028d39 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1788,6 +1788,32 @@ AllTablesyncsReady(void)
 	return has_subrels && (table_states_not_ready == NIL);
 }
 
+/*
+ * Return whether the subscription currently has any relations.
+ *
+ * Note: Unlike HasSubscriptionRelations(), this function relies on cached
+ * information for subscription relations. Additionally, it should not be
+ * invoked outside of apply or tablesync workers, as MySubscription must be
+ * initialized first.
+ */
+bool
+HasSubscriptionRelationsCached(void)
+{
+	bool		started_tx;
+	bool		has_subrels;
+
+	/* We need up-to-date subscription tables info here */
+	has_subrels = FetchTableStates(&started_tx);
+
+	if (started_tx)
+	{
+		CommitTransactionCommand();
+		pgstat_report_stat(true);
+	}
+
+	return has_subrels;
+}
+
 /*
  * Update the two_phase state of the specified subscription in pg_subscription.
  */
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index f1ebd63e792..c0f6bef5c28 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4595,11 +4595,28 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
 	 * workers is complex and not worth the effort, so we simply return if not
 	 * all tables are in the READY state.
 	 *
-	 * It is safe to add new tables with initial states to the subscription
-	 * after this check because any changes applied to these tables should
-	 * have a WAL position greater than the rdt_data->remote_lsn.
+	 * Advancing the transaction ID is necessary even when no tables are
+	 * currently subscribed, to avoid retaining dead tuples unnecessarily.
+	 * While it might seem safe to skip all phases and directly assign
+	 * candidate_xid to oldest_nonremovable_xid during the
+	 * RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
+	 * concurrently add tables to the subscription, the apply worker may not
+	 * process invalidations in time. Consequently,
+	 * HasSubscriptionRelationsCached() might miss the new tables, leading to
+	 * premature advancement of oldest_nonremovable_xid.
+	 *
+	 * Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
+	 * invalidations are guaranteed to be processed before applying changes
+	 * from newly added tables while waiting for the local flush to reach
+	 * remote_lsn.
+	 *
+	 * Additionally, even if we check for subscription tables during
+	 * RDT_GET_CANDIDATE_XID, they might be dropped before reaching
+	 * RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
+	 * subscription tables at this stage to prevent unnecessary tuple
+	 * retention.
 	 */
-	if (!AllTablesyncsReady())
+	if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
 	{
 		TimestampTz now;
 
diff --git a/src/include/replication/worker_internal.h b/src/include/replication/worker_internal.h
index 62ea1a00580..de003802612 100644
--- a/src/include/replication/worker_internal.h
+++ b/src/include/replication/worker_internal.h
@@ -272,6 +272,7 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
 											   char *originname, Size szoriginname);
 
 extern bool AllTablesyncsReady(void);
+extern bool HasSubscriptionRelationsCached(void);
 extern void UpdateTwoPhaseState(Oid suboid, char new_state);
 
 extern void process_syncing_tables(XLogRecPtr current_lsn);
diff --git a/src/test/subscription/t/035_conflicts.pl b/src/test/subscription/t/035_conflicts.pl
index 51b23a39fa9..e06429c288f 100644
--- a/src/test/subscription/t/035_conflicts.pl
+++ b/src/test/subscription/t/035_conflicts.pl
@@ -386,6 +386,35 @@ ok( $logfile =~
 .*Remote row \(2, 4\); replica identity full \(2, 2\)/,
 	'update target row was deleted in tab');
 
+###############################################################################
+# Check that the xmin value of the conflict detection slot can be advanced when
+# the subscription has no tables.
+###############################################################################
+
+# Remove the table from the publication
+$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B DROP TABLE tab");
+
+$node_A->safe_psql('postgres',
+	"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION");
+
+# Remember the next transaction ID to be assigned
+$next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
+
+# Confirm that the xmin value is advanced to the latest nextXid. If no
+# transactions are running, the apply worker selects nextXid as the candidate
+# for the non-removable xid. See GetOldestActiveTransactionId().
+ok( $node_A->poll_query_until(
+		'postgres',
+		"SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+	),
+	"the xmin value of slot 'pg_conflict_detection' is updated on Node A");
+
+# Re-add the table to the publication for further tests
+$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B ADD TABLE tab");
+
+$node_A->safe_psql('postgres',
+	"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION WITH (copy_data = false)");
+
 ###############################################################################
 # Check that dead tuple retention stops due to the wait time surpassing
 # max_retention_duration.
-- 
2.31.1

