On Fri, 2010-04-16 at 11:29 +0300, Heikki Linnakangas wrote:
> Simon Riggs wrote:
> > On Tue, 2010-04-13 at 21:09 +0300, Heikki Linnakangas wrote:
> >> A quick fix would be to check if there's any entries in the hash table
> >> before scanning it. That would eliminate the overhead when there's no
> >> in-progress transactions in the master. But as soon as there's even one,
> >> the overhead comes back.
> > 
> > Any fix should be fairly quick because of the way its modularised - with
> > something like this in mind.
> > 
> > I'll try a circular buffer implementation, with fastpath.
> 
> I started experimenting with a sorted array based implementation on
> Tuesday but got carried away with other stuff. I now got back to that
> and cleaned it up.
> 
> How does the attached patch look like? It's probably similar to what you
> had in mind.

It looks like a second version of what I'm working on and about to
publish. I'll take that as a compliment!

My patch is attached here also, for discussion.

The two patches look same in their main parts, though I have quite a few
extra tweaks in there, which you can read about in comments. One tweak I
don't have is the use of the presence array that allows a sensible
bsearch, so I'll to alter my patch to use that idea but keep the rest of
my code.

-- 
 Simon Riggs           www.2ndQuadrant.com
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 3cc9bdd..d432c9d 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1200,6 +1200,9 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
 
 	Assert(TransactionIdIsValid(xid));
 
+	if (max_prepared_xacts <= 0)
+		return false;					/* nothing to do */
+
 	/* Read and validate file */
 	buf = ReadTwoPhaseFile(xid, false);
 	if (buf == NULL)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 3adbee2..9245727 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6453,6 +6453,12 @@ CheckRecoveryConsistency(void)
 	}
 }
 
+bool
+XLogConsistentState(void)
+{
+	return reachedMinRecoveryPoint;
+}
+
 /*
  * Is the system still in recovery?
  *
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 88169b4..050c547 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -68,6 +68,10 @@ typedef struct ProcArrayStruct
 										 * xids */
 	int			maxKnownAssignedXids;	/* allocated size of known assigned
 										 * xids */
+	int			tailKnownAssignedXids;	/* current tail of known assigned
+										 * xids */
+	int			headKnownAssignedXids;	/* current head of known assigned
+										 * xids */
 
 	/*
 	 * Highest subxid that overflowed KnownAssignedXids array. Similar to
@@ -87,7 +91,7 @@ static ProcArrayStruct *procArray;
 /*
  * Bookkeeping for tracking emulated transactions in recovery
  */
-static HTAB *KnownAssignedXidsHash;
+static TransactionId *KnownAssignedXids;
 static TransactionId latestObservedXid = InvalidTransactionId;
 
 /*
@@ -142,9 +146,12 @@ static int	KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
 static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
 							   TransactionId xmax);
 static bool KnownAssignedXidsExist(TransactionId xid);
-static void KnownAssignedXidsAdd(TransactionId *xids, int nxids);
-static void KnownAssignedXidsRemove(TransactionId xid);
-static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts);
+static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, bool have_lock);
+static void KnownAssignedXidsRemove(TransactionId xid, int *index);
+static void KnownAssignedXidsRemoveMany(TransactionId xid);
+static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
+									  TransactionId *subxids);
+static void KnownAssignedXidsCompress(bool full);
 static void KnownAssignedXidsDisplay(int trace_level);
 
 /*
@@ -204,10 +211,13 @@ CreateSharedProcArray(void)
 		procArray->numKnownAssignedXids = 0;
 		procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS;
 		procArray->lastOverflowedXid = InvalidTransactionId;
+		procArray->tailKnownAssignedXids = 0;
+		procArray->headKnownAssignedXids = 0;
 	}
 
 	if (XLogRequestRecoveryConnections)
 	{
+#ifdef KNOWNASSIGNED_USE_HASH
 		/* Create or attach to the KnownAssignedXids hash table */
 		HASHCTL		info;
 
@@ -223,6 +233,12 @@ CreateSharedProcArray(void)
 											  HASH_ELEM | HASH_FUNCTION);
 		if (!KnownAssignedXidsHash)
 			elog(FATAL, "could not initialize known assigned xids hash table");
+#else
+		KnownAssignedXids = (TransactionId *)
+				ShmemInitStruct("KnownAssignedXids",
+						mul_size(sizeof(TransactionId), TOTAL_MAX_CACHED_SUBXIDS),
+						&found);
+#endif
 	}
 }
 
@@ -417,6 +433,8 @@ void
 ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
 {
 	snapshotOldestActiveXid = oldestActiveXid;
+
+//	KnownAssignedXidsTest();
 }
 
 /*
@@ -544,10 +562,10 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running)
 		if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
 			continue;
 
-		KnownAssignedXidsAdd(&xid, 1);
+		KnownAssignedXidsAdd(xid, xid, true);
 	}
 
-	KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
+	KnownAssignedXidsDisplay(LOG);
 
 	/*
 	 * Update lastOverflowedXid if the snapshot had overflown. We don't know
@@ -617,8 +635,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid,
 	/*
 	 * Remove from known-assigned-xacts.
 	 */
-	for (i = 0; i < nsubxids; i++)
-		KnownAssignedXidsRemove(subxids[i]);
+	KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids);
 
 	/*
 	 * Advance lastOverflowedXid when required.
@@ -2230,45 +2247,35 @@ RecordKnownAssignedTransactionIds(TransactionId xid)
 	 */
 	if (TransactionIdFollows(xid, latestObservedXid))
 	{
-		TransactionId next_expected_xid = latestObservedXid;
-
-		TransactionIdAdvance(next_expected_xid);
+		TransactionId next_expected_xid;
 
 		/*
-		 * Locking requirement is currently higher than for xid assignment in
-		 * normal running. However, we only get called here for new high xids
-		 * - so on a multi-processor where it is common that xids arrive out
-		 * of order the average number of locks per assignment will actually
-		 * reduce. So not too worried about this locking.
-		 *
-		 * XXX It does seem possible that we could add a whole range of
-		 * numbers atomically to KnownAssignedXids, if we use a sorted list
-		 * for KnownAssignedXids. But that design also increases the length of
-		 * time we hold lock when we process commits/aborts, so on balance
-		 * don't worry about this.
+		 * Extend clog and subtrans like we do in GetNewTransactionId()
+		 * during normal operation using individual extend steps.
+		 * Typical case requires almost no activity.
 		 */
-		LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-
+		next_expected_xid = latestObservedXid;
+		TransactionIdAdvance(next_expected_xid);
 		while (TransactionIdPrecedesOrEquals(next_expected_xid, xid))
 		{
-			if (TransactionIdPrecedes(next_expected_xid, xid))
-				elog(trace_recovery(DEBUG4),
-					 "recording unobserved xid %u (latestObservedXid %u)",
-						next_expected_xid, latestObservedXid);
-			KnownAssignedXidsAdd(&next_expected_xid, 1);
-
-			/*
-			 * Extend clog and subtrans like we do in GetNewTransactionId()
-			 * during normal operation
-			 */
 			ExtendCLOG(next_expected_xid);
 			ExtendSUBTRANS(next_expected_xid);
 
 			TransactionIdAdvance(next_expected_xid);
 		}
 
-		LWLockRelease(ProcArrayLock);
+		/*
+		 * Add the new xids onto the KnownAssignedXids array. Note that
+		 * we don't need ProcArrayLock to do this, mimicing the locking
+		 * behaviour when assigning an xid in normal running.
+		 */
+		next_expected_xid = latestObservedXid;
+		TransactionIdAdvance(next_expected_xid);
+		KnownAssignedXidsAdd(next_expected_xid, xid, false);
 
+		/*
+		 * Now we can advance latestObservedXid
+		 */
 		latestObservedXid = xid;
 	}
 
@@ -2285,7 +2292,6 @@ void
 ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
 									  TransactionId *subxids)
 {
-	int			i;
 	TransactionId max_xid;
 
 	if (standbyState == STANDBY_DISABLED)
@@ -2298,10 +2304,8 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
 	 */
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
 
-	if (TransactionIdIsValid(xid))
-		KnownAssignedXidsRemove(xid);
-	for (i = 0; i < nsubxids; i++)
-		KnownAssignedXidsRemove(subxids[i]);
+	KnownAssignedXidsRemoveTree(xid, nsubxids, subxids);
+
 
 	/* Like in ProcArrayRemove, advance latestCompletedXid */
 	if (TransactionIdFollowsOrEquals(max_xid,
@@ -2315,7 +2319,7 @@ void
 ExpireAllKnownAssignedTransactionIds(void)
 {
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	KnownAssignedXidsRemoveMany(InvalidTransactionId, false);
+	KnownAssignedXidsRemoveMany(InvalidTransactionId);
 	LWLockRelease(ProcArrayLock);
 }
 
@@ -2323,7 +2327,7 @@ void
 ExpireOldKnownAssignedTransactionIds(TransactionId xid)
 {
 	LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
-	KnownAssignedXidsRemoveMany(xid, true);
+	KnownAssignedXidsRemoveMany(xid);
 	LWLockRelease(ProcArrayLock);
 }
 
@@ -2349,7 +2353,7 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid)
  * having the standby process changes quickly so that it can provide
  * high availability. So we choose to implement as a hash table.
  */
-
+#ifdef KNOWNASSIGNED_USE_HASH
 /*
  * Add xids into KnownAssignedXids.
  *
@@ -2563,3 +2567,772 @@ KnownAssignedXidsDisplay(int trace_level)
 
 	pfree(buf.data);
 }
+#else
+/*
+ * Circular buffer implementation =========================================
+ *
+ * We use a circular virtual array to store xids. Xids are always added in
+ * sequence on the right-hand side (head) of the array allowing us to
+ * maintain the list in sorted sequence from tail to head. When we add xids
+ * we move the head of the array atomically, so we don't need ProcArrayLock.
+ *
+ * Maintaing a contiguous array is costly, so we allow gaps to form in the
+ * sequence so we can avoid compressing the contents after each removal.
+ *
+ * Removing xids from the array requires ExclusiveLock for the usual
+ * transactional reasons. That allows us to left trim the list while only
+ * holding ProcArrayLock in Shared mode by atomically updating the tail.
+ * That can be useful since older xids are naturally removed from the tail
+ * and in a steady state we seldom need to explicitly compress the array.
+ * With carefully chosen compression we can also say that xids never move
+ * right and that compression always maintains the sorted order.
+ *
+ * If we have a maximum of M slots, with N xids currently spread across
+ * S elements then we have N <= S <= M always.
+ *
+ *  * Adding 1 xid is O(1) and is lock free
+ *  * Taking snapshots is O(S) and is similar to normal running
+ *  * Pruning is mostly O(1)
+ *  * Locating an xid is essentially same as Removing
+ *  * Removing 1 xid is best case O(1) though can be O(S) in worst case.
+ *    Worst case behaviour depends upon how empty we let the array become
+ *    so compressing when S >> N and/or S is large reduces worst case.
+ *    May need a tunable parameter to control this behaviour.
+ *
+ * In comparison, using a hash table for KnownAssignedXids would mean
+ *  * Adding would be slightly slower and would require locks
+ *  * Removing would be slightly slower than best case but more consistent
+ *  * Taking snapshots would be O(M) (i.e. >> O(S))
+ *  * Pruning is O(M)
+ *
+ * Removing a large tree of X values a naive approach would give O(N^2)
+ * behaviour. To avoid those costs we retain index of last searched location,
+ * so we can begin next scan from last location. That is not possible using
+ * hash table.
+ *
+ * Explicit compression becomes a more likely requirement if the current
+ * contents of the array grows in size.
+ * Compression requires ProcArrayLock in Exclusive mode.
+ *
+ * 0 <= head < max
+ * 0 <= tail < max
+ *
+ *  procArray->tailKnownAssignedXids points to current tail of virtual array,
+ *  	which should be a valid xid if array contains at least one valid xid
+ *
+ *  procArray->headKnownAssignedXids points to next insertion point of
+ *		virtual array, so is always invalid
+ *
+ *  if head == tail && N = 0 then array is empty, otherwise fully spread out
+ */
+
+#define KnownAssignedXidsIncrementIndex(index) \
+{ \
+	index++; \
+	if (index >= procArray->maxKnownAssignedXids) \
+		index = 0; \
+}
+
+#define KnownAssignedXidsGetNumElements(nelements, head, tail) \
+{ \
+	if (head == tail) \
+	{ \
+		if (procArray->numKnownAssignedXids == 0) \
+			nelements = 0; \
+		else \
+			nelements = procArray->maxKnownAssignedXids; \
+	} \
+	else if (head > tail) \
+		nelements = head - tail + 1; \
+	else \
+		nelements = procArray->maxKnownAssignedXids - (tail - head) +  1; \
+}
+
+/*
+ * Compress KnownAssignedXids by removing any gaps in virtual array.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsCompress(bool full)
+{
+	int	head = procArray->headKnownAssignedXids;
+	int	tail = procArray->tailKnownAssignedXids;
+	int current_index;
+	int	compress_index;
+	int		nelements;
+
+	if (procArray->numKnownAssignedXids == 0)
+	{
+		Assert(head = tail);
+		return;
+	}
+
+	KnownAssignedXidsGetNumElements(nelements, head, tail);
+
+	if (!full)
+	{
+		/*
+		 * If we can choose how much to compress, use a heuristic to
+		 * avoid compressing too often or not often enough.
+		 *
+		 * Heuristic is if we have a large enough current spread and
+		 * less than 50% of the elements are currently in use, then
+		 * compress. This should ensure we compress fairly infrequently.
+		 * We could compress less often though the virtual array would
+		 * spread out more and snapshots would become more expensive.
+		 */
+		if (nelements < 4 * MaxConnections ||
+			nelements < 2 * procArray->numKnownAssignedXids)
+			return;
+	}
+
+	elog(LOG, "Compressing KnownAssignedXids... (num=%u tail=%u head=%u nelements=%u) %s",
+		procArray->numKnownAssignedXids,
+		procArray->tailKnownAssignedXids,
+		procArray->headKnownAssignedXids,
+		nelements,
+		(full ? "full" : ""));
+
+	/*
+	 * Left trim the virtual array up to the leftmost valid xid.
+	 */
+	current_index = tail;
+	do
+	{
+		/* Skip any gaps in the array */
+		if (TransactionIdIsValid(KnownAssignedXids[current_index]))
+			break;
+
+		KnownAssignedXidsIncrementIndex(current_index);
+
+	} while (current_index != head);
+
+	if (current_index == head)
+		procArray->headKnownAssignedXids = procArray->tailKnownAssignedXids;
+	else
+	{
+		tail = current_index;
+
+		/*
+		 * We compress the virtual array by reading the array from tail
+		 * to head, filling gaps as we proceed. Rely on implicit cache line
+		 * buffering rather than fiddling with memcpy(). This method allows
+		 * us to optimise search by realising that xids never move right
+		 * in the virtual array when being compressed.
+		 */
+		compress_index = tail;
+		do
+		{
+			if (TransactionIdIsValid(KnownAssignedXids[current_index]))
+			{
+				if (current_index != compress_index)
+					KnownAssignedXids[compress_index] = KnownAssignedXids[current_index];
+
+				KnownAssignedXidsIncrementIndex(compress_index);
+				KnownAssignedXidsIncrementIndex(current_index);
+			}
+			else
+				KnownAssignedXidsIncrementIndex(current_index);
+
+		} while (current_index != head);
+
+		procArray->headKnownAssignedXids = compress_index;
+	}
+}
+
+/*
+ * Add xids into KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, bool have_lock)
+{
+	TransactionId	next_xid;
+	int			head = procArray->headKnownAssignedXids;
+	int			tail = procArray->tailKnownAssignedXids;
+	int			nxids = 1;
+	int			i;
+
+	Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+	Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids);
+	Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid));
+
+	/*
+	 * Calculate how many slots we need so we can test for overflow.
+	 */
+	if (to_xid >= from_xid)
+		nxids = to_xid - from_xid + 1;
+	else
+	{
+		next_xid = from_xid;
+		while (TransactionIdPrecedes(next_xid, to_xid))
+		{
+			nxids++;
+			TransactionIdAdvance(next_xid);
+		}
+	}
+	if (procArray->numKnownAssignedXids + nxids > procArray->maxKnownAssignedXids)
+		elog(ERROR, "too many KnownAssignedXids");
+
+	next_xid = from_xid;
+	for (i = 0; i < nxids; i++)
+	{
+		/*
+		 * If we've run out of space in the array then we
+		 * need to perform an emergency compression. Can
+		 * only test for this once we've advanced head.
+		 */
+		if (head == tail && procArray->numKnownAssignedXids > 0)
+		{
+			if (!have_lock)
+				LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+			KnownAssignedXidsCompress(true);
+			head = procArray->headKnownAssignedXids;
+			tail = procArray->tailKnownAssignedXids;
+			if (head++ >= procArray->maxKnownAssignedXids)
+				head = 0;
+			if (!have_lock)
+				LWLockRelease(ProcArrayLock);
+		}
+
+		/* insert the xid */
+		KnownAssignedXids[head] = next_xid;
+		procArray->numKnownAssignedXids++;
+
+		/* increment next_xid */
+		TransactionIdAdvance(next_xid);
+
+		/* move head to next insertion point */
+		KnownAssignedXidsIncrementIndex(head);
+	}
+
+	/*
+	 * Set the new head of the ProcArray last, so it is atomic
+	 */
+	procArray->headKnownAssignedXids = head;
+}
+
+/*
+ * KnownAssignedXidsSearch
+ *
+ * Searches KnownAssignedXids for a specific xid and optionally removes it.
+ *
+ * Must be called while holding ProcArrayLock in shared or exclusive mode.
+ * Exclusive lock must be held for remove = true
+ *
+ * Tail can be moved as far forward as first valid xid in shared mode.
+ * While holding shared lock we cannot remove xids, so no other caller
+ * would move tail to a different place than we do. We don't try to trim
+ * the element we are removing.
+ *
+ * Search algorithm is specifically designed to meet the needs of most
+ * frequent search patterns. Xids tend to be removed from lower end of
+ * virtual array, so we begin with a linear search from tail until we find
+ * the first valid xid. If xid matches, or array is empty we exit quickly.
+ * Next, we attempt to predict the array offset that should contain
+ * the xid, if it is present, knowing that the array is laid out one xid
+ * per array element. If this works we access the xid directly and
+ * we're done. If not, we search between left and right index.
+ */
+#define RESULT_NOT_FOUND	-1
+static bool
+KnownAssignedXidsSearch(TransactionId xid, bool remove, int *index)
+{
+	TransactionId knownXid;
+	int			head, i, tail, newtail;
+	int			result_index = RESULT_NOT_FOUND; /* set >= 0 if found */
+
+	if (procArray->numKnownAssignedXids == 0)
+		return false;
+
+	/*
+	 * Fetch the tail just once, since it may change while we loop,
+	 * when remove is false.
+	 */
+	tail = procArray->tailKnownAssignedXids;
+
+	/*
+	 * Avoid O(N^2) behaviour for removal of a tree of xids by restarting
+	 * search from last location for each xid in tree.
+	 */
+	if (*index > RESULT_NOT_FOUND)
+		i = *index;
+	else
+		i = tail;
+	
+	newtail = tail;
+
+	/*
+	 * Fetch the head just once, since it may change while we loop.
+	 * We need only go as far as head at the start of the loop, since
+	 * we are certain that an xid cannot enter and then leave the
+	 * array without us knowing, so if we skip a few higher xids they
+	 * will look like they were running anyway.
+	 */
+	head = procArray->headKnownAssignedXids;
+
+	Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+	Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids);
+
+	/*
+	 * Locate the first valid xid in the array
+	 */
+	do
+	{
+		knownXid = KnownAssignedXids[i];
+
+		/* Skip any gaps in the array */
+		if (TransactionIdIsValid(knownXid))
+			break;
+
+		newtail = i;
+
+		KnownAssignedXidsIncrementIndex(i);
+
+	} while (i != head);
+
+	/*
+	 * If the array is non-empty, search for the xid
+	 */
+	if (i != head)
+	{
+		TransactionId	left_xid = knownXid;
+		int				left_index = i;
+
+		/*
+		 * Check to see whether xid matches, or desired xid
+		 * is somewhere to the right.
+		 */
+		if (left_xid == xid)
+			result_index = left_index;
+		else if (TransactionIdFollowsOrEquals(xid, left_xid))
+		{
+			int		xid_gap;
+			int		nelements;
+
+			/*
+			 * xid is somewhere to right. We know that xids
+			 * are laid out one by one across the array, so
+			 * we can predict where the xid should be iff
+			 * the array has not been compressed.
+			 */
+
+			KnownAssignedXidsGetNumElements(nelements, head, left_index);
+
+			/*
+			 * We already saw first element, so if there is more than one
+			 * element we can continue searching to the right.
+			 */
+			if (nelements > 1)
+			{
+				TransactionId	current_xid;
+				int 			right_index;
+
+				/*
+				 * Calculate the difference between the xid in the current
+				 * array element and the xid we are searching for. This will
+				 * give us the offset to the element that may contain xid.
+				 */
+				if (xid > left_xid)
+					xid_gap = (int) xid - left_xid;
+				else
+					xid_gap = (int) (INT_MAX - left_xid) + xid - FirstNormalTransactionId;
+
+				/*
+				 * If the xid_gap is too wide then we know the array has been
+				 * compressed, so no point in predicting index. Just use
+				 * head as our right_index in that case.
+				 */
+				if (xid_gap <= nelements)
+				{
+					/*
+					 * Access the xid directly, if possible.
+					 */
+					right_index = left_index + xid_gap;
+					if (right_index >= procArray->maxKnownAssignedXids)
+						right_index -= procArray->maxKnownAssignedXids;
+					current_xid = KnownAssignedXids[right_index];
+
+					if (current_xid == xid)
+						result_index = right_index;
+					else
+					{
+						if (TransactionIdIsValid(current_xid) &&
+							TransactionIdPrecedes(current_xid, xid))
+							elog(ERROR, "KnownAssignedXids (num=%u tail=%u head=%u) left [%u]=%u right [%u]=%u search %u",
+								procArray->numKnownAssignedXids,
+								procArray->tailKnownAssignedXids,
+								procArray->headKnownAssignedXids,
+								left_index, left_xid,
+								right_index, current_xid,
+								xid);
+					}
+				}
+				else
+					right_index = head;
+
+				/*
+				 * If we've not yet found our xid, then either the xid
+				 * has been removed already or it lies somewhere
+				 * between left_index and right_index in the virtual array.
+				 * Latter state can only happen as a result of a compress
+				 * action, so search space should be small enough to
+				 * make linear search acceptable.
+				 *
+				 * We could use a binary search to locate the xid though
+				 * because of the gaps we cannot use bsearch(). It is also
+				 * a quirky algorithm and too complex for practicality.
+				 *
+				 * We use two properties to improve the algorithm: xids never
+				 * move right when compression occurs and the array is sorted
+				 * even after compression. We search left (i.e. backwards)
+				 * from right_index until we hit left_index, or we find
+				 * a valid xid that precedes our xid.
+				 */
+				if (result_index == RESULT_NOT_FOUND)
+				{
+					i = right_index;
+					while (i != left_index)
+					{
+						i--;
+						if (i < 0)
+							i = procArray->maxKnownAssignedXids - 1;
+
+						current_xid = KnownAssignedXids[i];
+
+						if (current_xid == xid)
+						{
+							result_index = i;
+							break;
+						}
+						else if (TransactionIdPrecedes(current_xid, xid))
+							break;
+					}
+				}
+			}
+		}
+	}
+
+	/*
+	 * If we can simply prune the list, then do so.
+	 */
+	if (*index == RESULT_NOT_FOUND && newtail != tail)
+	{
+		/*
+		 * Move the tail of the buffer to the leftmost valid xid.
+		 * This action happens atomically, so no problem with locking.
+		 * It is possible that someone else has already done this, but
+		 * it's not possible that they moved it to a different place than
+		 * we will do because xids can only be removed while holding
+		 * ExclusiveLock, which would conflict with us.
+		 */
+		procArray->tailKnownAssignedXids = newtail;
+	}
+
+	/*
+	 * Remove xid, if required and return result
+	 */
+	if (result_index == RESULT_NOT_FOUND)
+		return false;
+	else
+	{
+		*index = result_index;
+		if (remove)
+			KnownAssignedXids[result_index] = InvalidTransactionId;
+		return true;
+	}
+}
+
+static bool
+KnownAssignedXidsExist(TransactionId xid)
+{
+	int		index;
+
+	Assert(TransactionIdIsValid(xid));
+
+	return KnownAssignedXidsSearch(xid, false, &index);
+}
+
+/*
+ * Remove one xid from anywhere in KnownAssignedXids.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode
+ */
+static void
+KnownAssignedXidsRemove(TransactionId xid, int *index)
+{
+	Assert(TransactionIdIsValid(xid));
+
+	elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid);
+
+	if (KnownAssignedXidsSearch(xid, true, index))
+		procArray->numKnownAssignedXids--;
+	else
+	{
+		int i;
+		bool	found = false;
+
+		if (XLogConsistentState())
+		{
+			for (i = 0; i < procArray->maxKnownAssignedXids; i++)
+			{
+				if (KnownAssignedXids[i] == xid)
+				{
+					KnownAssignedXids[i] = InvalidTransactionId;
+					elog(LOG, "found error in search for xid=%u", xid);
+					found = true;
+					break;
+				}
+			}
+
+			if (!found)
+			{
+				elog(LOG, "failed to remove xid=%u", xid);
+				KnownAssignedXidsDisplay(LOG);
+			}
+		}
+	}
+
+	Assert(procArray->numKnownAssignedXids >= 0);
+
+	/*
+	 * We can fail to find an xid if the xid came from a subtransaction that
+	 * aborts, though the xid hadn't yet been reported and no WAL records have
+	 * been written using the subxid. In that case the abort record will
+	 * contain that subxid and we haven't seen it before.
+	 *
+	 * If we fail to find it for other reasons it might be a problem, but it
+	 * isn't much use to log that it happened, since we can't divine much from
+	 * just an isolated xid value.
+	 */
+}
+
+/*
+ * KnownAssignedXidsRemoveTree
+ *
+ * Must be called while holding ProcArrayLock in exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
+									  TransactionId *subxids)
+{
+	int 	i;
+	int		index = RESULT_NOT_FOUND;
+
+	if (TransactionIdIsValid(xid))
+		KnownAssignedXidsRemove(xid, &index);
+
+	for (i = 0; i < nsubxids; i++)
+		KnownAssignedXidsRemove(subxids[i], &index);
+
+	KnownAssignedXidsCompress(false);
+}
+
+/*
+ * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids.
+ * We filter out anything higher than xmax.
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static int
+KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax)
+{
+	TransactionId xtmp = InvalidTransactionId;
+
+	return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax);
+}
+
+/*
+ * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin
+ * to the lowest xid value seen if not already lower.
+ *
+ * Must be called while holding ProcArrayLock, in shared mode or higher.
+ */
+static int
+KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
+							   TransactionId xmax)
+{
+	TransactionId knownXid;
+	int			count = 0;
+	bool		can_move_tail = true;
+	int			head, i, tail, newtail;
+
+	Assert(TransactionIdIsValid(xmax));
+
+	if (procArray->numKnownAssignedXids == 0)
+		return 0;
+
+	/*
+	 * Fetch the tail just once, since it may change while we loop.
+	 */
+	tail = procArray->tailKnownAssignedXids;
+	i = tail;
+	newtail = tail;
+
+	/*
+	 * Fetch the head just once, since it may change while we loop.
+	 * We need only go as far as head at the start of the loop, since
+	 * we are certain that an xid cannot enter and then leave the
+	 * array without us knowing, so if we skip a few higher xids they
+	 * will look like they were running anyway.
+	 */
+	head = procArray->headKnownAssignedXids;
+
+	Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+	Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids);
+
+	do
+	{
+		knownXid = KnownAssignedXids[i];
+
+		/* Skip any gaps in the array */
+		if (TransactionIdIsValid(knownXid))
+		{
+				/* Update xmin if required */
+				if (count == 0 &&
+					TransactionIdPrecedes(knownXid, *xmin))
+					*xmin = knownXid;
+
+				/*
+				 * Filter out anything higher than xmax, relying on
+				 * sorted property of array.
+				 */
+				if (TransactionIdPrecedes(xmax, knownXid))
+					break;
+
+				/* Add knownXid onto output array */
+				*xarray = knownXid;
+				xarray++;
+				count++;
+				can_move_tail = false;
+		}
+		else if (can_move_tail)
+			newtail = i;
+
+		KnownAssignedXidsIncrementIndex(i);
+
+	} while (i != head);
+
+	/*
+	 * If we can simply prune the list, then do so.
+	 */
+	if (newtail != tail)
+	{
+		/*
+		 * Move the tail of the buffer to the leftmost valid xid.
+		 * This action happens atomically, so no problem with locking.
+		 * It is possible that someone else has already done this, but
+		 * it's not possible that they moved it to a different place than
+		 * we will do because xids can only be removed while holding
+		 * ExclusiveLock, which would conflict with us.
+		 */
+		procArray->tailKnownAssignedXids = newtail;
+	}
+
+	return count;
+}
+
+/*
+ * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid
+ * then clear the whole table.
+ *
+ * Must be called while holding ProcArrayLock in Exclusive mode.
+ */
+static void
+KnownAssignedXidsRemoveMany(TransactionId removeXid)
+{
+	TransactionId	knownXid;
+	int			count = 0;
+	int			head, i;
+
+	if (procArray->numKnownAssignedXids == 0)
+		return;
+
+	if (!TransactionIdIsValid(removeXid))
+	{
+		elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids");
+		procArray->numKnownAssignedXids = 0;
+		procArray->headKnownAssignedXids = procArray->tailKnownAssignedXids;
+		KnownAssignedXids[procArray->tailKnownAssignedXids] = InvalidTransactionId;
+		return;
+	}
+
+	Assert(TransactionIdIsValid(removeXid));
+	elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid);
+
+	i = procArray->tailKnownAssignedXids;
+	head = procArray->headKnownAssignedXids;
+
+	Assert(head >= 0 && head < procArray->maxKnownAssignedXids);
+
+	do
+	{
+		knownXid = KnownAssignedXids[i];
+
+		if (TransactionIdFollowsOrEquals(knownXid, removeXid))
+			break;
+
+		if (TransactionIdIsValid(knownXid) &&
+			!StandbyTransactionIdIsPrepared(knownXid))
+		{
+			KnownAssignedXids[i] = InvalidTransactionId;
+			count++;
+		}
+
+		KnownAssignedXidsIncrementIndex(i);
+
+	} while (i != head);
+
+	procArray->numKnownAssignedXids -= count;
+	Assert(procArray->numKnownAssignedXids >= 0);
+
+	KnownAssignedXidsCompress(false);
+}
+
+/*
+ * Display KnownAssignedXids to provide debug trail
+ *
+ * Must be called while holding ProcArrayLock (in shared mode)
+ */
+static void
+KnownAssignedXidsDisplay(int trace_level)
+{
+	TransactionId	knownXid;
+	StringInfoData	buf;
+	int			head, i;
+	int			nxids = 0;
+
+	i = procArray->tailKnownAssignedXids;
+	head = procArray->headKnownAssignedXids;
+
+	initStringInfo(&buf);
+
+	if (procArray->numKnownAssignedXids > 0)
+	{
+		do
+		{
+			knownXid = KnownAssignedXids[i];
+
+			if (TransactionIdIsValid(knownXid))
+			{
+				nxids++;
+				appendStringInfo(&buf, "[%u]=%u ", i, knownXid);
+			}
+
+			KnownAssignedXidsIncrementIndex(i);
+
+		} while (i != head);
+	}
+
+	elog(trace_level, "%d KnownAssignedXids (num=%u tail=%u head=%u) %s",
+		nxids,
+		procArray->numKnownAssignedXids,
+		procArray->tailKnownAssignedXids,
+		procArray->headKnownAssignedXids,
+		buf.data);
+
+	pfree(buf.data);
+}
+#endif
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0295a61..df7b009 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -278,6 +278,7 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec);
 
 extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg);
 
+extern bool XLogConsistentState(void);
 extern bool RecoveryInProgress(void);
 extern bool XLogInsertAllowed(void);
 extern TimestampTz GetLatestXLogTime(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to