Hello, Andrey.

> I've looked into v5.
Thanks!

Patch is updated accordingly your remarks.

Best regards,
Michail.
From 1301a262dea7f541c11092851e82f10932150ee3 Mon Sep 17 00:00:00 2001
From: Michail Nikolaev <michail.nikol...@gmail.com>
Date: Tue, 19 Jul 2022 23:50:53 +0300
Subject: [PATCH v6] Currently KnownAssignedXidsGetAndSetXmin requires an
 iterative loop through KnownAssignedXids array, including xids marked as
 invalid. Performance impact is especially noticeable in the presence of long
 (few seconds) transactions on primary, big number (few thousands) of
 max_connections and high read workload on standby. Most of the cpu spent on
 looping throw KnownAssignedXids while almost all xid are invalid anyway.
 KnownAssignedXidsCompress removes invalid xids from time to time, but
 performance is still affected.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

To increase performance we lazily maintain an offset in the KnownAssignedXidsNextOffset array to skip known
to be invalid xids. KnownAssignedXidsNextOffset does not always point to “next” valid xid, it is just some
offset safe to skip (known to be filled by only invalid xids).
---
 src/backend/storage/ipc/procarray.c | 57 ++++++++++++++++++++++++-----
 1 file changed, 48 insertions(+), 9 deletions(-)

diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index dadaa958a8..f613ae2f09 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -271,6 +271,7 @@ static TransactionId cachedXidIsNotInProgress = InvalidTransactionId;
  */
 static TransactionId *KnownAssignedXids;
 static bool *KnownAssignedXidsValid;
+static int32 *KnownAssignedXidsNextOffset;
 static TransactionId latestObservedXid = InvalidTransactionId;
 
 /*
@@ -450,6 +451,12 @@ CreateSharedProcArray(void)
 			ShmemInitStruct("KnownAssignedXidsValid",
 							mul_size(sizeof(bool), TOTAL_MAX_CACHED_SUBXIDS),
 							&found);
+		KnownAssignedXidsNextOffset = (int32 *)
+				ShmemInitStruct("KnownAssignedXidsNextOffset",
+								mul_size(sizeof(int32), TOTAL_MAX_CACHED_SUBXIDS),
+								&found);
+		for (int i = 0; i < TOTAL_MAX_CACHED_SUBXIDS; i++)
+			KnownAssignedXidsNextOffset[i] = 1;
 	}
 }
 
@@ -4539,7 +4546,15 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  * XID entry itself.  This preserves the property that the XID entries are
  * sorted, so we can do binary searches easily.  Periodically we compress
  * out the unused entries; that's much cheaper than having to compress the
- * array immediately on every deletion.
+ * array immediately on every deletion. Also, we lazily maintain an offset
+ * in KnownAssignedXidsNextOffset[] array to skip known to be invalid xids.
+ * It helps to skip the gaps; it could significantly increase performance in
+ * the case of long transactions on the primary. KnownAssignedXidsNextOffset
+ * is s updated during taking the snapshot. The KnownAssignedXidsNextOffset
+ * contains not an offset to the next valid xid but a number which tends to
+ * the offset to next valid xid. KnownAssignedXidsNextOffset[] values read
+ * and updated without additional locking because four-bytes read-writes are
+ * assumed to be atomic.
  *
  * The actually valid items in KnownAssignedXids[] and KnownAssignedXidsValid[]
  * are those with indexes tail <= i < head; items outside this subscript range
@@ -4577,7 +4592,7 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  *		must happen)
  *	* Compressing the array is O(S) and requires exclusive lock
  *	* Removing an XID is O(logS) and requires exclusive lock
- *	* Taking a snapshot is O(S) and requires shared lock
+ *	* Taking a snapshot is O(S), amortized O(N) next call; requires shared lock
  *	* Checking for an XID is O(logS) and requires shared lock
  *
  * In comparison, using a hash table for KnownAssignedXids would mean that
@@ -4637,12 +4652,13 @@ KnownAssignedXidsCompress(bool force)
 	 * re-aligning data to 0th element.
 	 */
 	compress_index = 0;
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNextOffset[i])
 	{
 		if (KnownAssignedXidsValid[i])
 		{
 			KnownAssignedXids[compress_index] = KnownAssignedXids[i];
 			KnownAssignedXidsValid[compress_index] = true;
+			KnownAssignedXidsNextOffset[compress_index] = 1;
 			compress_index++;
 		}
 	}
@@ -4745,6 +4761,7 @@ KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
 	{
 		KnownAssignedXids[head] = next_xid;
 		KnownAssignedXidsValid[head] = true;
+		KnownAssignedXidsNextOffset[head] = 1;
 		TransactionIdAdvance(next_xid);
 		head++;
 	}
@@ -4960,7 +4977,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 	tail = pArray->tailKnownAssignedXids;
 	head = pArray->headKnownAssignedXids;
 
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNextOffset[i])
 	{
 		if (KnownAssignedXidsValid[i])
 		{
@@ -4983,7 +5000,7 @@ KnownAssignedXidsRemovePreceding(TransactionId removeXid)
 	/*
 	 * Advance the tail pointer if we've marked the tail item invalid.
 	 */
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNextOffset[i])
 	{
 		if (KnownAssignedXidsValid[i])
 			break;
@@ -5033,7 +5050,9 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 	int			count = 0;
 	int			head,
 				tail;
-	int			i;
+	int			i,
+				prev,
+				prevOffset;
 
 	/*
 	 * Fetch head just once, since it may change while we loop. We can stop
@@ -5047,9 +5066,11 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 	SpinLockAcquire(&procArray->known_assigned_xids_lck);
 	tail = procArray->tailKnownAssignedXids;
 	head = procArray->headKnownAssignedXids;
+	prev = tail;
+	prevOffset = KnownAssignedXidsNextOffset[prev];
 	SpinLockRelease(&procArray->known_assigned_xids_lck);
 
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNextOffset[i])
 	{
 		/* Skip any gaps in the array */
 		if (KnownAssignedXidsValid[i])
@@ -5074,6 +5095,24 @@ KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 
 			/* Add knownXid into output array */
 			xarray[count++] = knownXid;
+
+			if (prev != i)
+			{
+				int32 n = i - prev;
+				/*
+				 * Do not touch the cache if value is unchanged. This way we
+				 * can avoid additional cache miss.
+				 */
+				if (n != prevOffset)
+					KnownAssignedXidsNextOffset[prev] = n;
+				/*
+				 * Remember this xid as previous valid. Also, manually store
+				 * prevOffset from current fetched value to avoid additional
+				 * atomic read.
+				 */
+				prev = i;
+				prevOffset = KnownAssignedXidsNextOffset[i];
+			}
 		}
 	}
 
@@ -5099,7 +5138,7 @@ KnownAssignedXidsGetOldestXmin(void)
 	head = procArray->headKnownAssignedXids;
 	SpinLockRelease(&procArray->known_assigned_xids_lck);
 
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNextOffset[i])
 	{
 		/* Skip any gaps in the array */
 		if (KnownAssignedXidsValid[i])
@@ -5134,7 +5173,7 @@ KnownAssignedXidsDisplay(int trace_level)
 
 	initStringInfo(&buf);
 
-	for (i = tail; i < head; i++)
+	for (i = tail; i < head; i += KnownAssignedXidsNextOffset[i])
 	{
 		if (KnownAssignedXidsValid[i])
 		{
-- 
2.25.1

Reply via email to