On Fri, 2010-04-16 at 11:29 +0300, Heikki Linnakangas wrote: > Simon Riggs wrote: > > On Tue, 2010-04-13 at 21:09 +0300, Heikki Linnakangas wrote: > >> A quick fix would be to check if there's any entries in the hash table > >> before scanning it. That would eliminate the overhead when there's no > >> in-progress transactions in the master. But as soon as there's even one, > >> the overhead comes back. > > > > Any fix should be fairly quick because of the way its modularised - with > > something like this in mind. > > > > I'll try a circular buffer implementation, with fastpath. > > I started experimenting with a sorted array based implementation on > Tuesday but got carried away with other stuff. I now got back to that > and cleaned it up. > > How does the attached patch look like? It's probably similar to what you > had in mind.
It looks like a second version of what I'm working on and about to publish. I'll take that as a compliment! My patch is attached here also, for discussion. The two patches look same in their main parts, though I have quite a few extra tweaks in there, which you can read about in comments. One tweak I don't have is the use of the presence array that allows a sensible bsearch, so I'll to alter my patch to use that idea but keep the rest of my code. -- Simon Riggs www.2ndQuadrant.com
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 3cc9bdd..d432c9d 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -1200,6 +1200,9 @@ StandbyTransactionIdIsPrepared(TransactionId xid) Assert(TransactionIdIsValid(xid)); + if (max_prepared_xacts <= 0) + return false; /* nothing to do */ + /* Read and validate file */ buf = ReadTwoPhaseFile(xid, false); if (buf == NULL) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3adbee2..9245727 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6453,6 +6453,12 @@ CheckRecoveryConsistency(void) } } +bool +XLogConsistentState(void) +{ + return reachedMinRecoveryPoint; +} + /* * Is the system still in recovery? * diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 88169b4..050c547 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -68,6 +68,10 @@ typedef struct ProcArrayStruct * xids */ int maxKnownAssignedXids; /* allocated size of known assigned * xids */ + int tailKnownAssignedXids; /* current tail of known assigned + * xids */ + int headKnownAssignedXids; /* current head of known assigned + * xids */ /* * Highest subxid that overflowed KnownAssignedXids array. Similar to @@ -87,7 +91,7 @@ static ProcArrayStruct *procArray; /* * Bookkeeping for tracking emulated transactions in recovery */ -static HTAB *KnownAssignedXidsHash; +static TransactionId *KnownAssignedXids; static TransactionId latestObservedXid = InvalidTransactionId; /* @@ -142,9 +146,12 @@ static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax); static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, TransactionId xmax); static bool KnownAssignedXidsExist(TransactionId xid); -static void KnownAssignedXidsAdd(TransactionId *xids, int nxids); -static void KnownAssignedXidsRemove(TransactionId xid); -static void KnownAssignedXidsRemoveMany(TransactionId xid, bool keepPreparedXacts); +static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, bool have_lock); +static void KnownAssignedXidsRemove(TransactionId xid, int *index); +static void KnownAssignedXidsRemoveMany(TransactionId xid); +static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, + TransactionId *subxids); +static void KnownAssignedXidsCompress(bool full); static void KnownAssignedXidsDisplay(int trace_level); /* @@ -204,10 +211,13 @@ CreateSharedProcArray(void) procArray->numKnownAssignedXids = 0; procArray->maxKnownAssignedXids = TOTAL_MAX_CACHED_SUBXIDS; procArray->lastOverflowedXid = InvalidTransactionId; + procArray->tailKnownAssignedXids = 0; + procArray->headKnownAssignedXids = 0; } if (XLogRequestRecoveryConnections) { +#ifdef KNOWNASSIGNED_USE_HASH /* Create or attach to the KnownAssignedXids hash table */ HASHCTL info; @@ -223,6 +233,12 @@ CreateSharedProcArray(void) HASH_ELEM | HASH_FUNCTION); if (!KnownAssignedXidsHash) elog(FATAL, "could not initialize known assigned xids hash table"); +#else + KnownAssignedXids = (TransactionId *) + ShmemInitStruct("KnownAssignedXids", + mul_size(sizeof(TransactionId), TOTAL_MAX_CACHED_SUBXIDS), + &found); +#endif } } @@ -417,6 +433,8 @@ void ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid) { snapshotOldestActiveXid = oldestActiveXid; + +// KnownAssignedXidsTest(); } /* @@ -544,10 +562,10 @@ ProcArrayApplyRecoveryInfo(RunningTransactions running) if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) continue; - KnownAssignedXidsAdd(&xid, 1); + KnownAssignedXidsAdd(xid, xid, true); } - KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); + KnownAssignedXidsDisplay(LOG); /* * Update lastOverflowedXid if the snapshot had overflown. We don't know @@ -617,8 +635,7 @@ ProcArrayApplyXidAssignment(TransactionId topxid, /* * Remove from known-assigned-xacts. */ - for (i = 0; i < nsubxids; i++) - KnownAssignedXidsRemove(subxids[i]); + KnownAssignedXidsRemoveTree(InvalidTransactionId, nsubxids, subxids); /* * Advance lastOverflowedXid when required. @@ -2230,45 +2247,35 @@ RecordKnownAssignedTransactionIds(TransactionId xid) */ if (TransactionIdFollows(xid, latestObservedXid)) { - TransactionId next_expected_xid = latestObservedXid; - - TransactionIdAdvance(next_expected_xid); + TransactionId next_expected_xid; /* - * Locking requirement is currently higher than for xid assignment in - * normal running. However, we only get called here for new high xids - * - so on a multi-processor where it is common that xids arrive out - * of order the average number of locks per assignment will actually - * reduce. So not too worried about this locking. - * - * XXX It does seem possible that we could add a whole range of - * numbers atomically to KnownAssignedXids, if we use a sorted list - * for KnownAssignedXids. But that design also increases the length of - * time we hold lock when we process commits/aborts, so on balance - * don't worry about this. + * Extend clog and subtrans like we do in GetNewTransactionId() + * during normal operation using individual extend steps. + * Typical case requires almost no activity. */ - LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - + next_expected_xid = latestObservedXid; + TransactionIdAdvance(next_expected_xid); while (TransactionIdPrecedesOrEquals(next_expected_xid, xid)) { - if (TransactionIdPrecedes(next_expected_xid, xid)) - elog(trace_recovery(DEBUG4), - "recording unobserved xid %u (latestObservedXid %u)", - next_expected_xid, latestObservedXid); - KnownAssignedXidsAdd(&next_expected_xid, 1); - - /* - * Extend clog and subtrans like we do in GetNewTransactionId() - * during normal operation - */ ExtendCLOG(next_expected_xid); ExtendSUBTRANS(next_expected_xid); TransactionIdAdvance(next_expected_xid); } - LWLockRelease(ProcArrayLock); + /* + * Add the new xids onto the KnownAssignedXids array. Note that + * we don't need ProcArrayLock to do this, mimicing the locking + * behaviour when assigning an xid in normal running. + */ + next_expected_xid = latestObservedXid; + TransactionIdAdvance(next_expected_xid); + KnownAssignedXidsAdd(next_expected_xid, xid, false); + /* + * Now we can advance latestObservedXid + */ latestObservedXid = xid; } @@ -2285,7 +2292,6 @@ void ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, TransactionId *subxids) { - int i; TransactionId max_xid; if (standbyState == STANDBY_DISABLED) @@ -2298,10 +2304,8 @@ ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - if (TransactionIdIsValid(xid)) - KnownAssignedXidsRemove(xid); - for (i = 0; i < nsubxids; i++) - KnownAssignedXidsRemove(subxids[i]); + KnownAssignedXidsRemoveTree(xid, nsubxids, subxids); + /* Like in ProcArrayRemove, advance latestCompletedXid */ if (TransactionIdFollowsOrEquals(max_xid, @@ -2315,7 +2319,7 @@ void ExpireAllKnownAssignedTransactionIds(void) { LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - KnownAssignedXidsRemoveMany(InvalidTransactionId, false); + KnownAssignedXidsRemoveMany(InvalidTransactionId); LWLockRelease(ProcArrayLock); } @@ -2323,7 +2327,7 @@ void ExpireOldKnownAssignedTransactionIds(TransactionId xid) { LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - KnownAssignedXidsRemoveMany(xid, true); + KnownAssignedXidsRemoveMany(xid); LWLockRelease(ProcArrayLock); } @@ -2349,7 +2353,7 @@ ExpireOldKnownAssignedTransactionIds(TransactionId xid) * having the standby process changes quickly so that it can provide * high availability. So we choose to implement as a hash table. */ - +#ifdef KNOWNASSIGNED_USE_HASH /* * Add xids into KnownAssignedXids. * @@ -2563,3 +2567,772 @@ KnownAssignedXidsDisplay(int trace_level) pfree(buf.data); } +#else +/* + * Circular buffer implementation ========================================= + * + * We use a circular virtual array to store xids. Xids are always added in + * sequence on the right-hand side (head) of the array allowing us to + * maintain the list in sorted sequence from tail to head. When we add xids + * we move the head of the array atomically, so we don't need ProcArrayLock. + * + * Maintaing a contiguous array is costly, so we allow gaps to form in the + * sequence so we can avoid compressing the contents after each removal. + * + * Removing xids from the array requires ExclusiveLock for the usual + * transactional reasons. That allows us to left trim the list while only + * holding ProcArrayLock in Shared mode by atomically updating the tail. + * That can be useful since older xids are naturally removed from the tail + * and in a steady state we seldom need to explicitly compress the array. + * With carefully chosen compression we can also say that xids never move + * right and that compression always maintains the sorted order. + * + * If we have a maximum of M slots, with N xids currently spread across + * S elements then we have N <= S <= M always. + * + * * Adding 1 xid is O(1) and is lock free + * * Taking snapshots is O(S) and is similar to normal running + * * Pruning is mostly O(1) + * * Locating an xid is essentially same as Removing + * * Removing 1 xid is best case O(1) though can be O(S) in worst case. + * Worst case behaviour depends upon how empty we let the array become + * so compressing when S >> N and/or S is large reduces worst case. + * May need a tunable parameter to control this behaviour. + * + * In comparison, using a hash table for KnownAssignedXids would mean + * * Adding would be slightly slower and would require locks + * * Removing would be slightly slower than best case but more consistent + * * Taking snapshots would be O(M) (i.e. >> O(S)) + * * Pruning is O(M) + * + * Removing a large tree of X values a naive approach would give O(N^2) + * behaviour. To avoid those costs we retain index of last searched location, + * so we can begin next scan from last location. That is not possible using + * hash table. + * + * Explicit compression becomes a more likely requirement if the current + * contents of the array grows in size. + * Compression requires ProcArrayLock in Exclusive mode. + * + * 0 <= head < max + * 0 <= tail < max + * + * procArray->tailKnownAssignedXids points to current tail of virtual array, + * which should be a valid xid if array contains at least one valid xid + * + * procArray->headKnownAssignedXids points to next insertion point of + * virtual array, so is always invalid + * + * if head == tail && N = 0 then array is empty, otherwise fully spread out + */ + +#define KnownAssignedXidsIncrementIndex(index) \ +{ \ + index++; \ + if (index >= procArray->maxKnownAssignedXids) \ + index = 0; \ +} + +#define KnownAssignedXidsGetNumElements(nelements, head, tail) \ +{ \ + if (head == tail) \ + { \ + if (procArray->numKnownAssignedXids == 0) \ + nelements = 0; \ + else \ + nelements = procArray->maxKnownAssignedXids; \ + } \ + else if (head > tail) \ + nelements = head - tail + 1; \ + else \ + nelements = procArray->maxKnownAssignedXids - (tail - head) + 1; \ +} + +/* + * Compress KnownAssignedXids by removing any gaps in virtual array. + * + * Must be called while holding ProcArrayLock in Exclusive mode. + */ +static void +KnownAssignedXidsCompress(bool full) +{ + int head = procArray->headKnownAssignedXids; + int tail = procArray->tailKnownAssignedXids; + int current_index; + int compress_index; + int nelements; + + if (procArray->numKnownAssignedXids == 0) + { + Assert(head = tail); + return; + } + + KnownAssignedXidsGetNumElements(nelements, head, tail); + + if (!full) + { + /* + * If we can choose how much to compress, use a heuristic to + * avoid compressing too often or not often enough. + * + * Heuristic is if we have a large enough current spread and + * less than 50% of the elements are currently in use, then + * compress. This should ensure we compress fairly infrequently. + * We could compress less often though the virtual array would + * spread out more and snapshots would become more expensive. + */ + if (nelements < 4 * MaxConnections || + nelements < 2 * procArray->numKnownAssignedXids) + return; + } + + elog(LOG, "Compressing KnownAssignedXids... (num=%u tail=%u head=%u nelements=%u) %s", + procArray->numKnownAssignedXids, + procArray->tailKnownAssignedXids, + procArray->headKnownAssignedXids, + nelements, + (full ? "full" : "")); + + /* + * Left trim the virtual array up to the leftmost valid xid. + */ + current_index = tail; + do + { + /* Skip any gaps in the array */ + if (TransactionIdIsValid(KnownAssignedXids[current_index])) + break; + + KnownAssignedXidsIncrementIndex(current_index); + + } while (current_index != head); + + if (current_index == head) + procArray->headKnownAssignedXids = procArray->tailKnownAssignedXids; + else + { + tail = current_index; + + /* + * We compress the virtual array by reading the array from tail + * to head, filling gaps as we proceed. Rely on implicit cache line + * buffering rather than fiddling with memcpy(). This method allows + * us to optimise search by realising that xids never move right + * in the virtual array when being compressed. + */ + compress_index = tail; + do + { + if (TransactionIdIsValid(KnownAssignedXids[current_index])) + { + if (current_index != compress_index) + KnownAssignedXids[compress_index] = KnownAssignedXids[current_index]; + + KnownAssignedXidsIncrementIndex(compress_index); + KnownAssignedXidsIncrementIndex(current_index); + } + else + KnownAssignedXidsIncrementIndex(current_index); + + } while (current_index != head); + + procArray->headKnownAssignedXids = compress_index; + } +} + +/* + * Add xids into KnownAssignedXids. + * + * Must be called while holding ProcArrayLock in Exclusive mode + */ +static void +KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid, bool have_lock) +{ + TransactionId next_xid; + int head = procArray->headKnownAssignedXids; + int tail = procArray->tailKnownAssignedXids; + int nxids = 1; + int i; + + Assert(head >= 0 && head < procArray->maxKnownAssignedXids); + Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids); + Assert(TransactionIdPrecedesOrEquals(from_xid, to_xid)); + + /* + * Calculate how many slots we need so we can test for overflow. + */ + if (to_xid >= from_xid) + nxids = to_xid - from_xid + 1; + else + { + next_xid = from_xid; + while (TransactionIdPrecedes(next_xid, to_xid)) + { + nxids++; + TransactionIdAdvance(next_xid); + } + } + if (procArray->numKnownAssignedXids + nxids > procArray->maxKnownAssignedXids) + elog(ERROR, "too many KnownAssignedXids"); + + next_xid = from_xid; + for (i = 0; i < nxids; i++) + { + /* + * If we've run out of space in the array then we + * need to perform an emergency compression. Can + * only test for this once we've advanced head. + */ + if (head == tail && procArray->numKnownAssignedXids > 0) + { + if (!have_lock) + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + KnownAssignedXidsCompress(true); + head = procArray->headKnownAssignedXids; + tail = procArray->tailKnownAssignedXids; + if (head++ >= procArray->maxKnownAssignedXids) + head = 0; + if (!have_lock) + LWLockRelease(ProcArrayLock); + } + + /* insert the xid */ + KnownAssignedXids[head] = next_xid; + procArray->numKnownAssignedXids++; + + /* increment next_xid */ + TransactionIdAdvance(next_xid); + + /* move head to next insertion point */ + KnownAssignedXidsIncrementIndex(head); + } + + /* + * Set the new head of the ProcArray last, so it is atomic + */ + procArray->headKnownAssignedXids = head; +} + +/* + * KnownAssignedXidsSearch + * + * Searches KnownAssignedXids for a specific xid and optionally removes it. + * + * Must be called while holding ProcArrayLock in shared or exclusive mode. + * Exclusive lock must be held for remove = true + * + * Tail can be moved as far forward as first valid xid in shared mode. + * While holding shared lock we cannot remove xids, so no other caller + * would move tail to a different place than we do. We don't try to trim + * the element we are removing. + * + * Search algorithm is specifically designed to meet the needs of most + * frequent search patterns. Xids tend to be removed from lower end of + * virtual array, so we begin with a linear search from tail until we find + * the first valid xid. If xid matches, or array is empty we exit quickly. + * Next, we attempt to predict the array offset that should contain + * the xid, if it is present, knowing that the array is laid out one xid + * per array element. If this works we access the xid directly and + * we're done. If not, we search between left and right index. + */ +#define RESULT_NOT_FOUND -1 +static bool +KnownAssignedXidsSearch(TransactionId xid, bool remove, int *index) +{ + TransactionId knownXid; + int head, i, tail, newtail; + int result_index = RESULT_NOT_FOUND; /* set >= 0 if found */ + + if (procArray->numKnownAssignedXids == 0) + return false; + + /* + * Fetch the tail just once, since it may change while we loop, + * when remove is false. + */ + tail = procArray->tailKnownAssignedXids; + + /* + * Avoid O(N^2) behaviour for removal of a tree of xids by restarting + * search from last location for each xid in tree. + */ + if (*index > RESULT_NOT_FOUND) + i = *index; + else + i = tail; + + newtail = tail; + + /* + * Fetch the head just once, since it may change while we loop. + * We need only go as far as head at the start of the loop, since + * we are certain that an xid cannot enter and then leave the + * array without us knowing, so if we skip a few higher xids they + * will look like they were running anyway. + */ + head = procArray->headKnownAssignedXids; + + Assert(head >= 0 && head < procArray->maxKnownAssignedXids); + Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids); + + /* + * Locate the first valid xid in the array + */ + do + { + knownXid = KnownAssignedXids[i]; + + /* Skip any gaps in the array */ + if (TransactionIdIsValid(knownXid)) + break; + + newtail = i; + + KnownAssignedXidsIncrementIndex(i); + + } while (i != head); + + /* + * If the array is non-empty, search for the xid + */ + if (i != head) + { + TransactionId left_xid = knownXid; + int left_index = i; + + /* + * Check to see whether xid matches, or desired xid + * is somewhere to the right. + */ + if (left_xid == xid) + result_index = left_index; + else if (TransactionIdFollowsOrEquals(xid, left_xid)) + { + int xid_gap; + int nelements; + + /* + * xid is somewhere to right. We know that xids + * are laid out one by one across the array, so + * we can predict where the xid should be iff + * the array has not been compressed. + */ + + KnownAssignedXidsGetNumElements(nelements, head, left_index); + + /* + * We already saw first element, so if there is more than one + * element we can continue searching to the right. + */ + if (nelements > 1) + { + TransactionId current_xid; + int right_index; + + /* + * Calculate the difference between the xid in the current + * array element and the xid we are searching for. This will + * give us the offset to the element that may contain xid. + */ + if (xid > left_xid) + xid_gap = (int) xid - left_xid; + else + xid_gap = (int) (INT_MAX - left_xid) + xid - FirstNormalTransactionId; + + /* + * If the xid_gap is too wide then we know the array has been + * compressed, so no point in predicting index. Just use + * head as our right_index in that case. + */ + if (xid_gap <= nelements) + { + /* + * Access the xid directly, if possible. + */ + right_index = left_index + xid_gap; + if (right_index >= procArray->maxKnownAssignedXids) + right_index -= procArray->maxKnownAssignedXids; + current_xid = KnownAssignedXids[right_index]; + + if (current_xid == xid) + result_index = right_index; + else + { + if (TransactionIdIsValid(current_xid) && + TransactionIdPrecedes(current_xid, xid)) + elog(ERROR, "KnownAssignedXids (num=%u tail=%u head=%u) left [%u]=%u right [%u]=%u search %u", + procArray->numKnownAssignedXids, + procArray->tailKnownAssignedXids, + procArray->headKnownAssignedXids, + left_index, left_xid, + right_index, current_xid, + xid); + } + } + else + right_index = head; + + /* + * If we've not yet found our xid, then either the xid + * has been removed already or it lies somewhere + * between left_index and right_index in the virtual array. + * Latter state can only happen as a result of a compress + * action, so search space should be small enough to + * make linear search acceptable. + * + * We could use a binary search to locate the xid though + * because of the gaps we cannot use bsearch(). It is also + * a quirky algorithm and too complex for practicality. + * + * We use two properties to improve the algorithm: xids never + * move right when compression occurs and the array is sorted + * even after compression. We search left (i.e. backwards) + * from right_index until we hit left_index, or we find + * a valid xid that precedes our xid. + */ + if (result_index == RESULT_NOT_FOUND) + { + i = right_index; + while (i != left_index) + { + i--; + if (i < 0) + i = procArray->maxKnownAssignedXids - 1; + + current_xid = KnownAssignedXids[i]; + + if (current_xid == xid) + { + result_index = i; + break; + } + else if (TransactionIdPrecedes(current_xid, xid)) + break; + } + } + } + } + } + + /* + * If we can simply prune the list, then do so. + */ + if (*index == RESULT_NOT_FOUND && newtail != tail) + { + /* + * Move the tail of the buffer to the leftmost valid xid. + * This action happens atomically, so no problem with locking. + * It is possible that someone else has already done this, but + * it's not possible that they moved it to a different place than + * we will do because xids can only be removed while holding + * ExclusiveLock, which would conflict with us. + */ + procArray->tailKnownAssignedXids = newtail; + } + + /* + * Remove xid, if required and return result + */ + if (result_index == RESULT_NOT_FOUND) + return false; + else + { + *index = result_index; + if (remove) + KnownAssignedXids[result_index] = InvalidTransactionId; + return true; + } +} + +static bool +KnownAssignedXidsExist(TransactionId xid) +{ + int index; + + Assert(TransactionIdIsValid(xid)); + + return KnownAssignedXidsSearch(xid, false, &index); +} + +/* + * Remove one xid from anywhere in KnownAssignedXids. + * + * Must be called while holding ProcArrayLock in Exclusive mode + */ +static void +KnownAssignedXidsRemove(TransactionId xid, int *index) +{ + Assert(TransactionIdIsValid(xid)); + + elog(trace_recovery(DEBUG4), "remove KnownAssignedXid %u", xid); + + if (KnownAssignedXidsSearch(xid, true, index)) + procArray->numKnownAssignedXids--; + else + { + int i; + bool found = false; + + if (XLogConsistentState()) + { + for (i = 0; i < procArray->maxKnownAssignedXids; i++) + { + if (KnownAssignedXids[i] == xid) + { + KnownAssignedXids[i] = InvalidTransactionId; + elog(LOG, "found error in search for xid=%u", xid); + found = true; + break; + } + } + + if (!found) + { + elog(LOG, "failed to remove xid=%u", xid); + KnownAssignedXidsDisplay(LOG); + } + } + } + + Assert(procArray->numKnownAssignedXids >= 0); + + /* + * We can fail to find an xid if the xid came from a subtransaction that + * aborts, though the xid hadn't yet been reported and no WAL records have + * been written using the subxid. In that case the abort record will + * contain that subxid and we haven't seen it before. + * + * If we fail to find it for other reasons it might be a problem, but it + * isn't much use to log that it happened, since we can't divine much from + * just an isolated xid value. + */ +} + +/* + * KnownAssignedXidsRemoveTree + * + * Must be called while holding ProcArrayLock in exclusive mode. + */ +static void +KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, + TransactionId *subxids) +{ + int i; + int index = RESULT_NOT_FOUND; + + if (TransactionIdIsValid(xid)) + KnownAssignedXidsRemove(xid, &index); + + for (i = 0; i < nsubxids; i++) + KnownAssignedXidsRemove(subxids[i], &index); + + KnownAssignedXidsCompress(false); +} + +/* + * KnownAssignedXidsGet - Get an array of xids by scanning KnownAssignedXids. + * We filter out anything higher than xmax. + * + * Must be called while holding ProcArrayLock (in shared mode) + */ +static int +KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax) +{ + TransactionId xtmp = InvalidTransactionId; + + return KnownAssignedXidsGetAndSetXmin(xarray, &xtmp, xmax); +} + +/* + * KnownAssignedXidsGetAndSetXmin - as KnownAssignedXidsGet, plus we reduce *xmin + * to the lowest xid value seen if not already lower. + * + * Must be called while holding ProcArrayLock, in shared mode or higher. + */ +static int +KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, + TransactionId xmax) +{ + TransactionId knownXid; + int count = 0; + bool can_move_tail = true; + int head, i, tail, newtail; + + Assert(TransactionIdIsValid(xmax)); + + if (procArray->numKnownAssignedXids == 0) + return 0; + + /* + * Fetch the tail just once, since it may change while we loop. + */ + tail = procArray->tailKnownAssignedXids; + i = tail; + newtail = tail; + + /* + * Fetch the head just once, since it may change while we loop. + * We need only go as far as head at the start of the loop, since + * we are certain that an xid cannot enter and then leave the + * array without us knowing, so if we skip a few higher xids they + * will look like they were running anyway. + */ + head = procArray->headKnownAssignedXids; + + Assert(head >= 0 && head < procArray->maxKnownAssignedXids); + Assert(tail >= 0 && tail < procArray->maxKnownAssignedXids); + + do + { + knownXid = KnownAssignedXids[i]; + + /* Skip any gaps in the array */ + if (TransactionIdIsValid(knownXid)) + { + /* Update xmin if required */ + if (count == 0 && + TransactionIdPrecedes(knownXid, *xmin)) + *xmin = knownXid; + + /* + * Filter out anything higher than xmax, relying on + * sorted property of array. + */ + if (TransactionIdPrecedes(xmax, knownXid)) + break; + + /* Add knownXid onto output array */ + *xarray = knownXid; + xarray++; + count++; + can_move_tail = false; + } + else if (can_move_tail) + newtail = i; + + KnownAssignedXidsIncrementIndex(i); + + } while (i != head); + + /* + * If we can simply prune the list, then do so. + */ + if (newtail != tail) + { + /* + * Move the tail of the buffer to the leftmost valid xid. + * This action happens atomically, so no problem with locking. + * It is possible that someone else has already done this, but + * it's not possible that they moved it to a different place than + * we will do because xids can only be removed while holding + * ExclusiveLock, which would conflict with us. + */ + procArray->tailKnownAssignedXids = newtail; + } + + return count; +} + +/* + * Prune KnownAssignedXids up to, but *not* including xid. If xid is invalid + * then clear the whole table. + * + * Must be called while holding ProcArrayLock in Exclusive mode. + */ +static void +KnownAssignedXidsRemoveMany(TransactionId removeXid) +{ + TransactionId knownXid; + int count = 0; + int head, i; + + if (procArray->numKnownAssignedXids == 0) + return; + + if (!TransactionIdIsValid(removeXid)) + { + elog(trace_recovery(DEBUG4), "removing all KnownAssignedXids"); + procArray->numKnownAssignedXids = 0; + procArray->headKnownAssignedXids = procArray->tailKnownAssignedXids; + KnownAssignedXids[procArray->tailKnownAssignedXids] = InvalidTransactionId; + return; + } + + Assert(TransactionIdIsValid(removeXid)); + elog(trace_recovery(DEBUG4), "prune KnownAssignedXids to %u", removeXid); + + i = procArray->tailKnownAssignedXids; + head = procArray->headKnownAssignedXids; + + Assert(head >= 0 && head < procArray->maxKnownAssignedXids); + + do + { + knownXid = KnownAssignedXids[i]; + + if (TransactionIdFollowsOrEquals(knownXid, removeXid)) + break; + + if (TransactionIdIsValid(knownXid) && + !StandbyTransactionIdIsPrepared(knownXid)) + { + KnownAssignedXids[i] = InvalidTransactionId; + count++; + } + + KnownAssignedXidsIncrementIndex(i); + + } while (i != head); + + procArray->numKnownAssignedXids -= count; + Assert(procArray->numKnownAssignedXids >= 0); + + KnownAssignedXidsCompress(false); +} + +/* + * Display KnownAssignedXids to provide debug trail + * + * Must be called while holding ProcArrayLock (in shared mode) + */ +static void +KnownAssignedXidsDisplay(int trace_level) +{ + TransactionId knownXid; + StringInfoData buf; + int head, i; + int nxids = 0; + + i = procArray->tailKnownAssignedXids; + head = procArray->headKnownAssignedXids; + + initStringInfo(&buf); + + if (procArray->numKnownAssignedXids > 0) + { + do + { + knownXid = KnownAssignedXids[i]; + + if (TransactionIdIsValid(knownXid)) + { + nxids++; + appendStringInfo(&buf, "[%u]=%u ", i, knownXid); + } + + KnownAssignedXidsIncrementIndex(i); + + } while (i != head); + } + + elog(trace_level, "%d KnownAssignedXids (num=%u tail=%u head=%u) %s", + nxids, + procArray->numKnownAssignedXids, + procArray->tailKnownAssignedXids, + procArray->headKnownAssignedXids, + buf.data); + + pfree(buf.data); +} +#endif diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 0295a61..df7b009 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -278,6 +278,7 @@ extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec); extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg); +extern bool XLogConsistentState(void); extern bool RecoveryInProgress(void); extern bool XLogInsertAllowed(void); extern TimestampTz GetLatestXLogTime(void);
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers