From 98b71ccfb51ddb5ae5bdd6ade29afc7cff3b5a89 Mon Sep 17 00:00:00 2001
From: alterego665 <824662526@qq.com>
Date: Mon, 28 Jul 2025 17:20:32 +0800
Subject: [PATCH v7] Replace polling with waiting in XactLockTableWait on
 standby servers

Previously, XactLockTableWait() and ConditionalXactLockTableWait() used
inefficient polling when waiting for transactions to complete on standby servers.
This caused excessive CPU usage and poor response latency during transaction waits.

This polling occurred because standby servers do not maintain the primary's
lock table, so these functions fall back to repeatedly checking
TransactionIdIsInProgress() until the target transaction completes.
This situation commonly arises during logical replication operations on hot standby.

Replace the polling mechanism with per-XID event-driven waiting using a hash table
of condition variables. When transactions complete during KnownAssignedXids updates,
WakeXidWaiters() or WakeAllXidWaiters wakes the processes waiting for specific XIDs.
---
 src/backend/storage/ipc/procarray.c           | 219 +++++++++++++++++-
 src/backend/storage/lmgr/lmgr.c               |  36 ++-
 .../utils/activity/wait_event_names.txt       |   1 +
 src/include/storage/procarray.h               |   2 +
 4 files changed, 250 insertions(+), 8 deletions(-)

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index bf987aed8d3..f2b58a3d186 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -62,6 +62,7 @@
 #include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/hsearch.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
 
@@ -282,6 +283,11 @@ static TransactionId *KnownAssignedXids;
 static bool *KnownAssignedXidsValid;
 static TransactionId latestObservedXid = InvalidTransactionId;
 
+/*
+ * Array of XIDs to wake up on standby
+ */
+static TransactionId *KnownAssignedXidsToWakeup;
+
 /*
  * If we're in STANDBY_SNAPSHOT_PENDING state, standbySnapshotPendingXmin is
  * the highest xid that might still be running that we don't have in
@@ -306,6 +312,32 @@ static GlobalVisState GlobalVisTempRels;
  */
 static TransactionId ComputeXidHorizonsResultLastXmin;
 
+/*
+ * XID waiter hash table partition count
+ */
+#define NUM_XID_WAIT_PARTITIONS  16
+
+/*
+ * Hash table entry for per-XID waiting on standby servers.
+ */
+typedef struct XidWaitEntry
+{
+	TransactionId xid;				/* transaction ID being waited for */
+	ConditionVariable cv;			/* condition variable for this XID */
+	pg_atomic_uint32 waiter_count;	/* number of backends waiting */
+	bool		initialized;		/* true when entry is fully set up */
+} XidWaitEntry;
+
+/*
+ * Global hash table for XID waiting.
+ *
+ * This hash table maps transaction IDs to XidWaitEntry structures,
+ * enabling efficient per-XID waiting during hot standby recovery.
+ * The table is partitioned to reduce lock contention and uses the
+ * same infrastructure as PostgreSQL's lock manager.
+ */
+static HTAB *XidWaitHash = NULL;
+
 #ifdef XIDCACHE_DEBUG
 
 /* counters for XidCache measurement */
@@ -352,7 +384,7 @@ static bool KnownAssignedXidExists(TransactionId xid);
 static void KnownAssignedXidsRemove(TransactionId xid);
 static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
 										TransactionId *subxids);
-static void KnownAssignedXidsRemovePreceding(TransactionId removeXid);
+static int	KnownAssignedXidsRemovePreceding(TransactionId removeXid);
 static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
 static int	KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
 										   TransactionId *xmin,
@@ -369,6 +401,9 @@ static inline FullTransactionId FullXidRelativeTo(FullTransactionId rel,
 												  TransactionId xid);
 static void GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons);
 
+static void WakeXidWaiters(TransactionId xid);
+static void WakeAllXidWaiters(void);
+
 /*
  * Report shared-memory space needed by ProcArrayShmemInit
  */
@@ -403,9 +438,11 @@ ProcArrayShmemSize(void)
 	{
 		size = add_size(size,
 						mul_size(sizeof(TransactionId),
-								 TOTAL_MAX_CACHED_SUBXIDS));
+								 2 * TOTAL_MAX_CACHED_SUBXIDS));
 		size = add_size(size,
 						mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS));
+		size = add_size(size,
+						hash_estimate_size(MaxBackends * 2, sizeof(XidWaitEntry)));
 	}
 
 	return size;
@@ -458,6 +495,26 @@ ProcArrayShmemInit(void)
 			ShmemInitStruct("KnownAssignedXidsValid",
 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
 							&found);
+		KnownAssignedXidsToWakeup = (TransactionId *)
+			ShmemInitStruct("KnownAssignedXidsToWakeup",
+							mul_size(sizeof(TransactionId),
+									 TOTAL_MAX_CACHED_SUBXIDS),
+							&found);
+
+		/* Initialize XID waiter hash table for standby XID waiting */
+		{
+			HASHCTL info;
+
+			info.keysize = sizeof(TransactionId);
+			info.entrysize = sizeof(XidWaitEntry);
+			info.num_partitions = NUM_XID_WAIT_PARTITIONS;
+
+			XidWaitHash = ShmemInitHash("XID Wait Hash",
+									   MaxBackends,  	/* init_size */
+									   MaxBackends * 2,      /* max_size */
+									   &info,
+									   HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
+		}
 	}
 }
 
@@ -1113,6 +1170,7 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 			 * throw them away before we apply the recovery snapshot.
 			 */
 			KnownAssignedXidsReset();
+			WakeAllXidWaiters();
 			standbyState = STANDBY_INITIALIZED;
 		}
 		else
@@ -1370,6 +1428,10 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
 		procArray->lastOverflowedXid = max_xid;
 
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake up waiters for expired subtransactions */
+	for (i = 0; i < nsubxids; i++)
+		WakeXidWaiters(subxids[i]);
 }
 
 /*
@@ -4450,6 +4512,11 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
 	TransamVariables->xactCompletionCount++;
 
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake up per-XID waiters */
+	WakeXidWaiters(xid);
+	for (int i = 0; i < nsubxids; i++)
+		WakeXidWaiters(subxids[i]);
 }
 
 /*
@@ -4462,7 +4529,7 @@ ExpireAllKnownAssignedTransactionIds(void)
 	FullTransactionId latestXid;
 
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	KnownAssignedXidsRemovePreceding(InvalidTransactionId);
+	(void) KnownAssignedXidsRemovePreceding(InvalidTransactionId);
 
 	/* Reset latestCompletedXid to nextXid - 1 */
 	Assert(FullTransactionIdIsValid(TransamVariables->nextXid));
@@ -4483,6 +4550,9 @@ ExpireAllKnownAssignedTransactionIds(void)
 	 */
 	procArray->lastOverflowedXid = InvalidTransactionId;
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake all XID waiters since all transactions are being expired */
+	WakeAllXidWaiters();
 }
 
 /*
@@ -4494,6 +4564,8 @@ void
 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
 {
 	TransactionId latestXid;
+	int			i;
+	int			count;
 
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
@@ -4513,8 +4585,12 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
 	 */
 	if (TransactionIdPrecedes(procArray->lastOverflowedXid, xid))
 		procArray->lastOverflowedXid = InvalidTransactionId;
-	KnownAssignedXidsRemovePreceding(xid);
+	count = KnownAssignedXidsRemovePreceding(xid);
 	LWLockRelease(ProcArrayLock);
+
+	/* Wake XID waiters that have expired transactions they're waiting for */
+	for (i = 0; i < count; i++)
+		WakeXidWaiters(KnownAssignedXidsToWakeup[i]);
 }
 
 /*
@@ -4991,9 +5067,11 @@ KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
  * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
  * then clear the whole table.
  *
+ * Returns the number of XIDs removed.
+ *
  * Caller must hold ProcArrayLock in exclusive mode.
  */
-static void
+static int
 KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 {
 	ProcArrayStruct *pArray = procArray;
@@ -5005,9 +5083,10 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 	if (!TransactionIdIsValid(removeXid))
 	{
 		elog(DEBUG4, "removing all KnownAssignedXids");
+		count = pArray->numKnownAssignedXids;
 		pArray->numKnownAssignedXids = 0;
 		pArray->headKnownAssignedXids = pArray->tailKnownAssignedXids = 0;
-		return;
+		return count;
 	}
 
 	elog(DEBUG4, "prune KnownAssignedXids to %u", removeXid);
@@ -5031,6 +5110,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 			if (!StandbyTransactionIdIsPrepared(knownXid))
 			{
 				KnownAssignedXidsValid[i] = false;
+				KnownAssignedXidsToWakeup[count] = knownXid;
 				count++;
 			}
 		}
@@ -5060,6 +5140,8 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 
 	/* Opportunistically compress the array */
 	KnownAssignedXidsCompress(KAX_PRUNE, true);
+
+	return count;
 }
 
 /*
@@ -5227,3 +5309,128 @@ KnownAssignedXidsReset(void)
 
 	LWLockRelease(ProcArrayLock);
 }
+
+/*
+ * Wait for XID completion using condition variables.
+ *
+ * This function implements efficient waiting for transaction completion
+ * on standby servers by using a hash table of condition variables keyed
+ * by transaction ID. This replaces polling-based approaches with direct
+ * event notification.
+ *
+ * The function handles the complete lifecycle of waiting: finding or
+ * creating the hash entry, managing waiter counts, and cleaning up
+ * when the last waiter finishes.
+ *
+ * Note: This function is only meaningful during hot standby recovery.
+ * Primary servers should use the lock-based waiting mechanisms.
+ */
+void
+StandbyXidWait(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	bool		found;
+	uint32		hashcode;
+	TransactionId wait_xid;
+
+	/* Skip if not in hot standby or hash table not initialized */
+	if (!InHotStandby || !XidWaitHash)
+		return;
+
+	/* Quick exit if transaction already complete */
+	if (!TransactionIdIsInProgress(xid))
+		return;
+
+	/* Always wait on the topmost transaction to avoid lost wake-ups */
+	wait_xid = SubTransGetTopmostTransaction(xid);
+
+	hashcode = get_hash_value(XidWaitHash, &wait_xid);
+
+	entry = hash_search_with_hash_value(XidWaitHash, &wait_xid, hashcode,
+									   HASH_ENTER, &found);
+
+	if (!found)
+	{
+		/* Initialize new entry */
+		entry->xid = wait_xid;
+		ConditionVariableInit(&entry->cv);
+		pg_atomic_init_u32(&entry->waiter_count, 0);
+		entry->initialized = true;
+	}
+
+	pg_atomic_fetch_add_u32(&entry->waiter_count, 1);
+
+	ConditionVariablePrepareToSleep(&entry->cv);
+
+	/* Wait loop with condition re-checking */
+	while (TransactionIdIsInProgress(xid))
+	{
+		ConditionVariableSleep(&entry->cv, WAIT_EVENT_XACT_COMPLETE);
+		CHECK_FOR_INTERRUPTS();
+	}
+
+	ConditionVariableCancelSleep();
+
+	/* Decrement waiter count and cleanup if last waiter */
+	if (pg_atomic_fetch_sub_u32(&entry->waiter_count, 1) == 1)
+	{
+		hash_search_with_hash_value(XidWaitHash, &wait_xid, hashcode,
+								   HASH_REMOVE, NULL);
+	}
+}
+
+/*
+ * Wake waiters for a specific XID.
+ *
+ * This function is called when a transaction completes on the primary
+ * server and we need to wake up any standby processes that were waiting
+ * for that specific transaction ID.
+ *
+ * Uses the hash table to locate waiters for the specified XID and
+ * broadcasts on the associated condition variable to wake all waiting
+ * backends simultaneously.
+ */
+static void
+WakeXidWaiters(TransactionId xid)
+{
+	XidWaitEntry *entry;
+	uint32		hashcode;
+
+	if (!InHotStandby || !XidWaitHash)
+		return;
+
+	hashcode = get_hash_value(XidWaitHash, &xid);
+
+	entry = hash_search_with_hash_value(XidWaitHash, &xid, hashcode,
+									   HASH_FIND, NULL);
+	if (entry && entry->initialized)
+	{
+		/* Wake all waiters for this specific XID */
+		ConditionVariableBroadcast(&entry->cv);
+	}
+}
+
+/*
+ * Wake all XID waiters.
+ *
+ * This function wakes up all backends waiting on any transaction ID.
+ * It is primarily used during standby promotion when the server is
+ * transitioning from recovery mode to normal operation, at which point
+ * all XID-based waiting becomes invalid.
+ */
+static void
+WakeAllXidWaiters(void)
+{
+	HASH_SEQ_STATUS status;
+	XidWaitEntry *entry;
+
+	if (!InHotStandby || !XidWaitHash)
+		return;
+
+	hash_seq_init(&status, XidWaitHash);
+	while ((entry = (XidWaitEntry *) hash_seq_search(&status)) != NULL)
+	{
+		if (entry->initialized)
+			ConditionVariableBroadcast(&entry->cv);
+	}
+}
\ No newline at end of file
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 3f6bf70bd3c..8b1b99f0175 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -652,6 +652,10 @@ XactLockTableDelete(TransactionId xid)
  * is specified, an error context callback is set up.  If 'oper' is passed as
  * None, no error context callback is set up.
  *
+ * On standby servers, uses efficient per-XID condition variable waiting
+ * instead of traditional lock acquisition.  On primary servers, uses the
+ * standard lock table approach.
+ *
  * Note that this does the right thing for subtransactions: if we wait on a
  * subtransaction, we will exit as soon as it aborts or its top parent commits.
  * It takes some extra work to ensure this, because to save on shared memory
@@ -687,6 +691,20 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
 		error_context_stack = &callback;
 	}
 
+	/* Try efficient per-XID wait first on standby */
+	if (RecoveryInProgress())
+	{
+		Assert(TransactionIdIsValid(xid));
+		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+		StandbyXidWait(xid);
+
+		if (oper != XLTW_None)
+			error_context_stack = callback.previous;
+
+		return;
+	}
+
 	for (;;)
 	{
 		Assert(TransactionIdIsValid(xid));
@@ -718,7 +736,6 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
 		 */
 		if (!first)
 		{
-			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
 		first = false;
@@ -734,6 +751,10 @@ XactLockTableWait(TransactionId xid, Relation rel, ItemPointer ctid,
  *
  * As above, but only lock if we can get the lock without blocking.
  * Returns true if the lock was acquired.
+ *
+ * On standby servers, returns false if the transaction is still in progress
+ * (since condition variable waiting would block).  On primary servers, uses
+ * conditional lock acquisition.
  */
 bool
 ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
@@ -741,6 +762,18 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 	LOCKTAG		tag;
 	bool		first = true;
 
+	/* Try efficient per-XID wait on standby */
+	if (RecoveryInProgress())
+	{
+		Assert(TransactionIdIsValid(xid));
+		Assert(!TransactionIdEquals(xid, GetTopTransactionIdIfAny()));
+
+		if (!TransactionIdIsInProgress(xid))
+			return true;
+
+		return false;
+	}
+
 	for (;;)
 	{
 		Assert(TransactionIdIsValid(xid));
@@ -761,7 +794,6 @@ ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 		/* See XactLockTableWait about this case */
 		if (!first)
 		{
-			CHECK_FOR_INTERRUPTS();
 			pg_usleep(1000L);
 		}
 		first = false;
diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt
index 0be307d2ca0..775cc7313ad 100644
--- a/src/backend/utils/activity/wait_event_names.txt
+++ b/src/backend/utils/activity/wait_event_names.txt
@@ -160,6 +160,7 @@ WAL_BUFFER_INIT	"Waiting on WAL buffer to be initialized."
 WAL_RECEIVER_EXIT	"Waiting for the WAL receiver to exit."
 WAL_RECEIVER_WAIT_START	"Waiting for startup process to send initial data for streaming replication."
 WAL_SUMMARY_READY	"Waiting for a new WAL summary to be generated."
+XACT_COMPLETE	"Waiting for a transaction to complete."
 XACT_GROUP_UPDATE	"Waiting for the group leader to update transaction status at transaction end."
 
 ABI_compatibility:
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index 2f4ae06c279..6e35f21f6e8 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -100,4 +100,6 @@ extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
 extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
 											TransactionId *catalog_xmin);
 
+extern void StandbyXidWait(TransactionId xid);
+
 #endif							/* PROCARRAY_H */
-- 
2.49.0

