From c16cbe69ba13dac434b274245fa7bc7079484676 Mon Sep 17 00:00:00 2001
From: Michail Nikolaev <michail.nikolaev@gmail.com>
Date: Sun, 21 Nov 2021 21:37:29 +0300
Subject: [PATCH v4] Use linked list to improve KnownAssignedXids performance

Currently KnownAssignedXid requires a lot of iterative loops through
array, including empty xids. To ease these loops we intoduce linked
list made from pointers on next valid xid in KnownAssignedXids.
---
 src/backend/storage/ipc/procarray.c | 56 ++++++++++++++++++++++++-----
 1 file changed, 47 insertions(+), 9 deletions(-)

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 9d3efb7d80..43cee80ab5 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -267,6 +267,7 @@ static PGPROC *allProcs;
  */
 static TransactionId *KnownAssignedXids;
 static bool *KnownAssignedXidsValid;
+static int32 *KnownAssignedXidsNext;
 static TransactionId latestObservedXid = InvalidTransactionId;
 
 /*
@@ -446,6 +447,12 @@ CreateSharedProcArray(void)
 			ShmemInitStruct("KnownAssignedXidsValid",
 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
 							&found);
+		KnownAssignedXidsNext = (int32 *)
+				ShmemInitStruct("KnownAssignedXidsNext",
+								mul_size(sizeof(int32), TOTAL_MAX_CACHED_SUBXIDS),
+								&found);
+		for (int i = 0; i < TOTAL_MAX_CACHED_SUBXIDS; i++)
+			KnownAssignedXidsNext[i] = 1;
 	}
 }
 
@@ -4535,7 +4542,13 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  * XID entry itself.  This preserves the property that the XID entries are
  * sorted, so we can do binary searches easily.  Periodically we compress
  * out the unused entries; that's much cheaper than having to compress the
- * array immediately on every deletion.
+ * array immediately on every deletion. Also, we lazily maintain an offset
+ * in KnownAssignedXidsNext[] array to skip known to be invalid xids. It
+ * helps to skip the gaps; it could significantly increase performance in
+ * the case of long transactions on the primary. KnownAssignedXidsNext[] is
+ * updating while taking the snapshot. In general case KnownAssignedXidsNext
+ * contains not an offset to the next valid xid but a number which tends to
+ * the offset to next valid xid.
  *
  * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
  * are those with indexes tail <= i < head; items outside this subscript range
@@ -4573,7 +4586,7 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  *		must happen)
  *	* Compressing the array is O(S) and requires exclusive lock
  *	* Removing an XID is O(logS) and requires exclusive lock
- *	* Taking a snapshot is O(S) and requires shared lock
+ *	* Taking a snapshot is O(S), O(N) next call; requires shared lock
  *	* Checking for an XID is O(logS) and requires shared lock
  *
  * In comparison, using a hash table for KnownAssignedXids would mean that
@@ -4633,12 +4646,13 @@ KnownAssignedXidsCompress(bool force)
 	 * re-aligning data to 0th element.
 	 */
 	compress_index = 0;
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNext[i])
 	{
 		if (KnownAssignedXidsValid[i])
 		{
 			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
 			KnownAssignedXidsValid[compress_index] = true;
+			KnownAssignedXidsNext[compress_index] = 1;
 			compress_index++;
 		}
 	}
@@ -4741,6 +4755,7 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
 	{
 		KnownAssignedXids[head] = next_xid;
 		KnownAssignedXidsValid[head] = true;
+		KnownAssignedXidsNext[head] = 1;
 		TransactionIdAdvance(next_xid);
 		head++;
 	}
@@ -4956,7 +4971,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 	tail = pArray->tailKnownAssignedXids;
 	head = pArray->headKnownAssignedXids;
 
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNext[i])
 	{
 		if (KnownAssignedXidsValid[i])
 		{
@@ -4979,7 +4994,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 	/*
 	 * Advance the tail pointer if we've marked the tail item invalid.
 	 */
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNext[i])
 	{
 		if (KnownAssignedXidsValid[i])
 			break;
@@ -5029,7 +5044,9 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 	int			count = 0;
 	int			head,
 				tail;
-	int			i;
+	int			i,
+				prev,
+				prevOffset;
 
 	/*
 	 * Fetch head just once, since it may change while we loop. We can stop
@@ -5043,9 +5060,12 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
 	tail = procArray->tailKnownAssignedXids;
 	head = procArray->headKnownAssignedXids;
+	prev = tail;
+	prevOffset = KnownAssignedXidsNext[prev];
 	SpinLockRelease(&procArray->known_assigned_xids_lck);
 
-	for (i = tail; i < head; i++)
+
+	for (i = tail; i < head; i += KnownAssignedXidsNext[i])
 	{
 		/* Skip any gaps in the array */
 		if (KnownAssignedXidsValid[i])
@@ -5070,6 +5090,24 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 
 			/* Add knownXid into output array */
 			xarray[count++] = knownXid;
+
+			if (prev != i)
+			{
+				int32 n = i - prev;
+				/*
+				 * Do not touch the cache if value is unchanged. This way we
+				 * can avoid additional cache miss.
+				 */
+				if (n != prevOffset)
+					KnownAssignedXidsNext[prev] = n;
+				/*
+				 * Remember this xid as previous valid. Also, manually store
+				 * prevOffset from current fetched value to avoid additional
+				 * atomic read.
+				 */
+				prev = i;
+				prevOffset = KnownAssignedXidsNext[i];
+			}
 		}
 	}
 
@@ -5095,7 +5133,7 @@ KnownAssignedXidsGetOldestXmin(void)
 	head = procArray->headKnownAssignedXids;
 	SpinLockRelease(&procArray->known_assigned_xids_lck);
 
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNext[i])
 	{
 		/* Skip any gaps in the array */
 		if (KnownAssignedXidsValid[i])
@@ -5130,7 +5168,7 @@ KnownAssignedXidsDisplay(int trace_level)
 
 	initStringInfo(&buf);
 
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNext[i])
 	{
 		if (KnownAssignedXidsValid[i])
 		{
-- 
2.32.0 (Apple Git-132)

