From ba51fa5bbf865addb34a75aec35882c61d66f344 Mon Sep 17 00:00:00 2001
From: alterego665 <824662526@qq.com>
Date: Mon, 7 Jul 2025 21:19:21 +0800
Subject: [PATCH v5] Replace polling with waiting in XactLockTableWait on 
 standby servers

Previously, XactLockTableWait() and ConditionalXactLockTableWait() used
inefficient polling when waiting for transactions to complete on standby servers.
This caused excessive CPU usage and poor response latency during transaction waits.

This polling occurred because standby servers do not maintain the primary's
lock table, so these functions fall back to repeatedly checking
TransactionIdIsInProgress() until the target transaction completes.
This situation commonly arises during logical replication slot creation on hot standby.

Replace the polling mechanism with per-XID event-driven waiting using a hash table
of condition variables. When transactions complete during KnownAssignedXids updates,
WakeXidWaiters() immediately wakes only the processes waiting for specific XIDs.
This eliminates polling entirely while providing precise, low-latency notifications.

The changes include a new XACT_COMPLETE wait event, a partitioned XidWaitHash table
mapping transaction IDs to condition variables, and new functions StandbyXidWait(),
WakeXidWaiters(), and WakeAllXidWaiters(). XactLockTableWait() now tries per-XID
waiting on standby servers instead of falling back to inefficient polling.
---
 src/backend/storage/ipc/procarray.c           | 235 ++++++++++++++++++
 src/backend/storage/lmgr/lmgr.c               |  34 +++
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/storage/procarray.h               |   5 +
 4 files changed, 275 insertions(+)

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index e5b945a9ee3..4a7f2c6cfa8 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -306,6 +306,41 @@ static GlobalVisState GlobalVisTempRels;
  */
 static TransactionId ComputeXidHorizonsResultLastXmin;
 
+/*
+ * XID waiter hash table configuration
+ *
+ * NUM_XID_WAIT_PARTITIONS must be a power of 2 to work with the hash
+ * table partitioning scheme used by dynahash.c. The value of 16 partitions
+ * matches the lock manager's partitioning to minimize contention under
+ * high concurrency workloads.
+ */
+#define NUM_XID_WAIT_PARTITIONS  16   /* Must be power of 2 */
+
+/*
+ * Hash table entry for per-XID waiting on standby servers.
+ *
+ * This structure is stored in a partitioned hash table and provides
+ * condition variable-based waiting for specific transaction IDs during
+ * hot standby operations. Each entry represents waiters for a single XID.
+ */
+typedef struct XidWaitEntry
+{
+	TransactionId xid;				/* transaction ID being waited for */
+	ConditionVariable cv;			/* condition variable for this XID */
+	pg_atomic_uint32 waiter_count;	/* number of backends waiting */
+	bool		initialized;		/* true when entry is fully set up */
+} XidWaitEntry;
+
+/*
+ * Global hash table for XID waiting.
+ *
+ * This hash table maps transaction IDs to XidWaitEntry structures,
+ * enabling efficient per-XID waiting during hot standby recovery.
+ * The table is partitioned to reduce lock contention and uses the
+ * same infrastructure as PostgreSQL's lock manager.
+ */
+static HTAB *XidWaitHash = NULL;
+
 #ifdef XIDCACHE_DEBUG
 
 /* counters for XidCache measurement */
@@ -369,6 +404,32 @@ static inline FullTransactionId FullXidRelativeTo(FullTransactionId rel,
 												  TransactionId xid);
 static void GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons);
 
+/*
+ * Calculate shared memory size for XID waiter hash table.
+ *
+ * This function estimates the memory requirements for the hash table
+ * used to track per-XID waiters on standby servers. The estimation
+ * assumes an average of 2 concurrent waiters per backend.
+ *
+ * Only called during PostgreSQL startup as part of shared memory
+ * size calculation. Returns 0 if hot standby is not enabled.
+ */
+static Size
+XidWaitShmemSize(void)
+{
+	/* Only allocate memory when hot standby could be used */
+	if (!EnableHotStandby)
+		return 0;
+
+	/*
+	 * Estimate maximum concurrent XID waiters.
+	 * Conservative estimate: 2 waiters per backend on average.
+	 * This provides headroom for burst scenarios while avoiding
+	 * excessive memory usage in typical workloads.
+	 */
+	return hash_estimate_size(MaxBackends * 2, sizeof(XidWaitEntry));
+}
+
 /*
  * Report shared-memory space needed by ProcArrayShmemInit
  */
@@ -406,6 +467,7 @@ ProcArrayShmemSize(void)
 								 TOTAL_MAX_CACHED_SUBXIDS));
 		size = add_size(size,
 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
+		size = add_size(size, XidWaitShmemSize());
 	}
 
 	return size;
@@ -458,6 +520,24 @@ ProcArrayShmemInit(void)
 			ShmemInitStruct("KnownAssignedXidsValid",
 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
 							&found);
+
+		/* Initialize XID waiter hash table for standby XID waiting */
+		{
+			HASHCTL info;
+			long max_xid_waiters;
+
+			max_xid_waiters = MaxBackends * 2;
+
+			info.keysize = sizeof(TransactionId);
+			info.entrysize = sizeof(XidWaitEntry);
+			info.num_partitions = NUM_XID_WAIT_PARTITIONS;
+
+			XidWaitHash = ShmemInitHash("XID Wait Hash",
+									   max_xid_waiters / 2,  /* init_size */
+									   max_xid_waiters,      /* max_size */
+									   &info,
+									   HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
+		}
 	}
 }
 
@@ -1370,6 +1450,10 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
 		procArray->lastOverflowedXid = max_xid;
 
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake up waiters for expired subtransactions */
+	for (i = 0; i < nsubxids; i++)
+		WakeXidWaiters(subxids[i]);
 }
 
 /*
@@ -4488,6 +4572,11 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
 	TransamVariables->xactCompletionCount++;
 
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake up per-XID waiters */
+	WakeXidWaiters(xid);
+	for (int i = 0; i < nsubxids; i++)
+		WakeXidWaiters(subxids[i]);
 }
 
 /*
@@ -4521,6 +4610,9 @@ ExpireAllKnownAssignedTransactionIds(void)
 	 */
 	procArray->lastOverflowedXid = InvalidTransactionId;
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake all XID waiters since all transactions are being expired */
+	WakeAllXidWaiters();
 }
 
 /*
@@ -4553,6 +4645,9 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
 		procArray->lastOverflowedXid = InvalidTransactionId;
 	KnownAssignedXidsRemovePreceding(xid);
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake all XID waiters since we may have expired transactions they're waiting for */
+	WakeAllXidWaiters();
 }
 
 /*
@@ -5265,3 +5360,143 @@ KnownAssignedXidsReset(void)
 
 	LWLockRelease(ProcArrayLock);
 }
+
+/*
+ * Wait for XID completion using condition variables.
+ *
+ * This function implements efficient waiting for transaction completion
+ * on standby servers by using a hash table of condition variables keyed
+ * by transaction ID. This replaces polling-based approaches with direct
+ * event notification.
+ *
+ * The function handles the complete lifecycle of waiting: finding or
+ * creating the hash entry, managing waiter counts, and cleaning up
+ * when the last waiter finishes.
+ *
+ * Returns true if we waited (and the XID completed), false if waiting
+ * was not applicable (not in recovery, XID already complete, etc.).
+ *
+ * Note: This function is only meaningful during hot standby recovery.
+ * Primary servers should use the lock-based waiting mechanisms.
+ */
+bool
+StandbyXidWait(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	bool		found;
+	uint32		hashcode;
+
+	/* Only meaningful during recovery */
+	if (!InHotStandby || !XidWaitHash)
+		return false;
+
+	/* Quick exit if transaction already complete */
+	if (!TransactionIdIsInProgress(xid))
+		return false;
+
+	/* Get hash code for partition locking */
+	hashcode = get_hash_value(XidWaitHash, &xid);
+
+	/* Find or create hash entry */
+	entry = hash_search_with_hash_value(XidWaitHash, &xid, hashcode,
+									   HASH_ENTER, &found);
+
+	if (!found)
+	{
+		/* Initialize new entry */
+		entry->xid = xid;
+		ConditionVariableInit(&entry->cv);
+		pg_atomic_init_u32(&entry->waiter_count, 0);
+		entry->initialized = true;
+	}
+
+	/* Increment waiter count */
+	pg_atomic_fetch_add_u32(&entry->waiter_count, 1);
+
+	/* Standard PostgreSQL condition variable waiting pattern */
+	ConditionVariablePrepareToSleep(&entry->cv);
+
+	/* Wait loop with condition re-checking */
+	while (TransactionIdIsInProgress(xid))
+	{
+		ConditionVariableSleep(&entry->cv, WAIT_EVENT_XACT_COMPLETE);
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	/* Standard cleanup - PostgreSQL's exception system handles errors */
+	ConditionVariableCancelSleep();
+
+	/* Decrement waiter count and cleanup if last waiter */
+	if (pg_atomic_fetch_sub_u32(&entry->waiter_count, 1) == 1)
+	{
+		hash_search_with_hash_value(XidWaitHash, &xid, hashcode,
+								   HASH_REMOVE, NULL);
+	}
+
+	return true;
+}
+
+/*
+ * Wake waiters for a specific XID.
+ *
+ * This function is called when a transaction completes on the primary
+ * server and we need to wake up any standby processes that were waiting
+ * for that specific transaction ID.
+ *
+ * Uses the hash table to efficiently locate waiters for the specified
+ * XID and broadcasts on the associated condition variable to wake all
+ * waiting backends simultaneously.
+ *
+ * Safe to call even if no one is waiting on the XID - the hash lookup
+ * will simply find no entry and return immediately.
+ */
+void
+WakeXidWaiters(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	uint32		hashcode;
+
+	/* Skip if not in hot standby or hash table not initialized */
+	if (!InHotStandby || !XidWaitHash)
+		return;
+
+	hashcode = get_hash_value(XidWaitHash, &xid);
+
+	entry = hash_search_with_hash_value(XidWaitHash, &xid, hashcode,
+									   HASH_FIND, NULL);
+	if (entry && entry->initialized)
+	{
+		/* Wake all waiters for this specific XID */
+		ConditionVariableBroadcast(&entry->cv);
+	}
+}
+
+/*
+ * Wake all XID waiters.
+ *
+ * This function wakes up all backends waiting on any transaction ID.
+ * It is primarily used during standby promotion when the server is
+ * transitioning from recovery mode to normal operation, at which point
+ * all XID-based waiting becomes invalid.
+ *
+ * Note: This is a relatively expensive operation as it must examine
+ * every hash table entry, but it is only called during infrequent
+ * administrative operations like promotion.
+ */
+void
+WakeAllXidWaiters(void)
+{
+	HASH_SEQ_STATUS status;
+	XidWaitEntry *entry;
+
+	/* Skip if not in hot standby or hash table not initialized */
+	if (!InHotStandby || !XidWaitHash)
+		return;
+
+	hash_seq_init(&status, XidWaitHash);
+	while ((entry = (XidWaitEntry *) hash_seq_search(&status)) != NULL)
+	{
+		if (entry->initialized)
+			ConditionVariableBroadcast(&entry->cv);
+	}
+}
\ No newline at end of file
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 3f6bf70bd3c..c230914e27c 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -652,6 +652,10 @@ XactLockTableDelete(TransactionId xid)
  * is specified, an error context callback is set up.  If 'oper' is passed as
  * None, no error context callback is set up.
  *
+ * On standby servers, uses efficient per-XID condition variable waiting
+ * instead of traditional lock acquisition.  On primary servers, uses the
+ * standard lock table approach.
+ *
  * Note that this does the right thing for subtransactions: if we wait on a
  * subtransaction, we will exit as soon as it aborts or its top parent commits.
  * It takes some extra work to ensure this, because to save on shared memory
@@ -687,6 +691,20 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
 		error_context_stack = &callback;
 	}
 
+	/* Try efficient per-XID wait first on standby */
+	if (unlikely(RecoveryInProgress()))
+	{
+		Assert(TransactionIdIsValid(xid));
+		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+		StandbyXidWait(xid);
+
+		if (oper != XLTW_None)
+			error_context_stack = callback.previous;
+
+		return;
+	}
+
 	for (;;)
 	{
 		Assert(TransactionIdIsValid(xid));
@@ -734,6 +752,10 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
  *
  * As above, but only lock if we can get the lock without blocking.
  * Returns true if the lock was acquired.
+ *
+ * On standby servers, returns false if the transaction is still in progress
+ * (since condition variable waiting would block).  On primary servers, uses
+ * conditional lock acquisition.
  */
 bool
 ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
@@ -741,6 +763,18 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 	LOCKTAG		tag;
 	bool		first = true;
 
+	/* Try efficient per-XID wait first on standby */
+	if (unlikely(RecoveryInProgress()))
+	{
+		Assert(TransactionIdIsValid(xid));
+		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+		if (!TransactionIdIsInProgress(xid))
+			return true;
+
+		return false;
+	}
+
 	for (;;)
 	{
 		Assert(TransactionIdIsValid(xid));
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 4da68312b5f..fdf3652cf46 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -160,6 +160,7 @@ WAL_BUFFER_INIT	"Waiting on WAL buffer to be initialized."
 WAL_RECEIVER_EXIT	"Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START	"Waiting for startup process to send initial data for streaming replication."
 WAL_SUMMARY_READY	"Waiting for a new WAL summary to be generated."
+XACT_COMPLETE	"Waiting for a transaction to complete."
 XACT_GROUP_UPDATE	"Waiting for the group leader to update transaction status at transaction end."
 
 ABI_compatibility:
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index ef0b733ebe8..fa4a43790dd 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -100,4 +100,9 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 											TransactionId *catalog_xmin);
 
+/* Per-XID waiting support for hot standby servers */
+extern bool StandbyXidWait(TransactionId xid);
+extern void WakeXidWaiters(TransactionId xid);
+extern void WakeAllXidWaiters(void);
+
 #endif							/* PROCARRAY_H */
-- 
2.49.0

