Tom pointed out two errors that could effect the HS startup code, which is fairly subtle.
Attached patch is small with these changes * adds new section comment as posted to hackers earlier * adds comments to other functions * makes minor corrections to a few existing comments * fixes the two bugs observed by Tom so that snapshotOldestActiveXid no longer exists, xids are tracked as soon as (standbyState >= STANDBY_INITIALIZED) and ProcArrayApplyRecoveryInfo() is substantially re-written with comments at every stage * fixes an old bug that set xmax incorrectly in initial snapshot, though would normally have been corrected soon afterwards as recovery continues * fixes a simple bug introduced in recent sorted knownassignedxids patch that would give incorrect snapshots * minor refactoring of xact_redo_abort() and ExpireTreeKnownAssignedTransactionIds() * new debug message for when in STANDBY_SNAPSHOT_PENDING and we fail to move to STANDBY_SNAPSHOT_READY I have avoided adding a new WAL record as I mentioned was required earlier, deciding that it wasn't any easier to understand that way. I will apply tomorrow in the absence of review objections. -- Simon Riggs www.2ndQuadrant.com
*** a/src/backend/access/transam/xact.c --- b/src/backend/access/transam/xact.c *************** *** 4378,4384 **** xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) LWLockRelease(XidGenLock); } ! if (!InHotStandby) { /* * Mark the transaction committed in pg_clog. --- 4378,4384 ---- LWLockRelease(XidGenLock); } ! if (standbyState == STANDBY_DISABLED) { /* * Mark the transaction committed in pg_clog. *************** *** 4412,4423 **** xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn) /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as ! * occurs in . */ ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, XactCompletionRelcacheInitFileInval(xlrec), --- 4412,4423 ---- /* * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); /* * Send any cache invalidations attached to the commit. We must * maintain the same order of invalidation then release locks as ! * occurs in CommitTransaction(). */ ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs, XactCompletionRelcacheInitFileInval(xlrec), *************** *** 4499,4505 **** xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) LWLockRelease(XidGenLock); } ! if (InHotStandby) { /* * If a transaction completion record arrives that has as-yet --- 4499,4510 ---- LWLockRelease(XidGenLock); } ! if (standbyState == STANDBY_DISABLED) ! { ! /* Mark the transaction aborted in pg_clog, no need for async stuff */ ! TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); ! } ! else { /* * If a transaction completion record arrives that has as-yet *************** *** 4511,4527 **** xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) * already. Leave it in. */ RecordKnownAssignedTransactionIds(max_xid); - } ! /* Mark the transaction aborted in pg_clog, no need for async stuff */ ! TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); - if (InHotStandby) - { /* ! * We must mark clog before we update the ProcArray. */ ! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids); /* * There are no flat files that need updating, nor invalidation --- 4516,4529 ---- * already. Leave it in. */ RecordKnownAssignedTransactionIds(max_xid); ! /* Mark the transaction aborted in pg_clog, no need for async stuff */ ! TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids); /* ! * We must update the ProcArray after we have marked clog. */ ! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid); /* * There are no flat files that need updating, nor invalidation *************** *** 4596,4602 **** xact_redo(XLogRecPtr lsn, XLogRecord *record) { xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record); ! if (InHotStandby) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } --- 4598,4604 ---- { xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record); ! if (standbyState >= STANDBY_INITIALIZED) ProcArrayApplyXidAssignment(xlrec->xtop, xlrec->nsubxacts, xlrec->xsub); } *** a/src/backend/access/transam/xlog.c --- b/src/backend/access/transam/xlog.c *************** *** 5995,6000 **** StartupXLOG(void) --- 5995,6001 ---- if (wasShutdown) { RunningTransactionsData running; + TransactionId latestCompletedXid; /* * Construct a RunningTransactions snapshot representing a shut *************** *** 6006,6011 **** StartupXLOG(void) --- 6007,6015 ---- running.subxid_overflow = false; running.nextXid = checkPoint.nextXid; running.oldestRunningXid = oldestActiveXID; + latestCompletedXid = checkPoint.nextXid; + TransactionIdRetreat(latestCompletedXid); + running.latestCompletedXid = latestCompletedXid; running.xids = xids; ProcArrayApplyRecoveryInfo(&running); *************** *** 6154,6161 **** StartupXLOG(void) xlogctl->recoveryLastXTime = recoveryLastXTime; SpinLockRelease(&xlogctl->info_lck); ! /* In Hot Standby mode, keep track of XIDs we've seen */ ! if (InHotStandby && TransactionIdIsValid(record->xl_xid)) RecordKnownAssignedTransactionIds(record->xl_xid); RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); --- 6158,6166 ---- xlogctl->recoveryLastXTime = recoveryLastXTime; SpinLockRelease(&xlogctl->info_lck); ! /* If we are attempting to enter Hot Standby mode, process XIDs we see */ ! if (standbyState >= STANDBY_INITIALIZED && ! TransactionIdIsValid(record->xl_xid)) RecordKnownAssignedTransactionIds(record->xl_xid); RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); *************** *** 7803,7808 **** xlog_redo(XLogRecPtr lsn, XLogRecord *record) --- 7808,7814 ---- TransactionId *xids; int nxids; TransactionId oldestActiveXID; + TransactionId latestCompletedXid; RunningTransactionsData running; oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids); *************** *** 7817,7822 **** xlog_redo(XLogRecPtr lsn, XLogRecord *record) --- 7823,7831 ---- running.subxid_overflow = false; running.nextXid = checkPoint.nextXid; running.oldestRunningXid = oldestActiveXID; + latestCompletedXid = checkPoint.nextXid; + TransactionIdRetreat(latestCompletedXid); + running.latestCompletedXid = latestCompletedXid; running.xids = xids; ProcArrayApplyRecoveryInfo(&running); *** a/src/backend/storage/ipc/procarray.c --- b/src/backend/storage/ipc/procarray.c *************** *** 105,116 **** static TransactionId latestObservedXid = InvalidTransactionId; */ static TransactionId standbySnapshotPendingXmin; - /* - * Oldest transaction still running according to the running-xacts snapshot - * we initialized standby mode from. - */ - static TransactionId snapshotOldestActiveXid; - #ifdef XIDCACHE_DEBUG /* counters for XidCache measurement */ --- 105,110 ---- *************** *** 158,164 **** static void KnownAssignedXidsRemove(TransactionId xid); static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, TransactionId *subxids); static void KnownAssignedXidsRemovePreceding(TransactionId xid); ! static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax); static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, TransactionId xmax); --- 152,158 ---- static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids, TransactionId *subxids); static void KnownAssignedXidsRemovePreceding(TransactionId xid); ! static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax); static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, TransactionId xmax); *************** *** 439,448 **** ProcArrayClearTransaction(PGPROC *proc) proc->subxids.overflowed = false; } void ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid) { ! snapshotOldestActiveXid = oldestActiveXid; } /* --- 433,449 ---- proc->subxids.overflowed = false; } + /* + * ProcArrayInitRecoveryInfo + * + * When trying to assemble our snapshot we only care about xids after this value. + * See comments for LogStandbySnapshot(). + */ void ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid) { ! latestObservedXid = oldestActiveXid; ! TransactionIdRetreat(latestObservedXid); } /* *************** *** 458,473 **** ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid) * with FATAL errors fail to write abort records, which could cause eventual * overflow. * ! * Only used during recovery. Notice the signature is very similar to a ! * _redo function and its difficult to decide exactly where this code should ! * reside. */ void ProcArrayApplyRecoveryInfo(RunningTransactions running) { - int xid_index; /* main loop */ TransactionId *xids; ! int nxids; Assert(standbyState >= STANDBY_INITIALIZED); --- 459,473 ---- * with FATAL errors fail to write abort records, which could cause eventual * overflow. * ! * See comments for LogStandbySnapshot(). */ void ProcArrayApplyRecoveryInfo(RunningTransactions running) { TransactionId *xids; ! int nxids; ! TransactionId nextXid; ! int i; Assert(standbyState >= STANDBY_INITIALIZED); *************** *** 505,545 **** ProcArrayApplyRecoveryInfo(RunningTransactions running) elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled"); } return; } /* * OK, we need to initialise from the RunningXactData record */ - latestObservedXid = running->nextXid; - TransactionIdRetreat(latestObservedXid); /* ! * If the snapshot overflowed, then we still initialise with what we know, ! * but the recovery snapshot isn't fully valid yet because we know there ! * are some subxids missing (ergo we don't know which ones) */ ! if (!running->subxid_overflow) ! { ! standbyState = STANDBY_SNAPSHOT_READY; ! standbySnapshotPendingXmin = InvalidTransactionId; ! } ! else ! { ! standbyState = STANDBY_SNAPSHOT_PENDING; ! standbySnapshotPendingXmin = latestObservedXid; ! ereport(LOG, ! (errmsg("consistent state delayed because recovery snapshot incomplete"))); ! } ! nxids = running->xcnt; ! xids = running->xids; ! ! KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); /* ! * Scan through the incoming array of RunningXacts and collect xids. We ! * don't use SubtransSetParent because it doesn't matter yet. If we aren't * overflowed then all xids will fit in snapshot and so we don't need * subtrans. If we later overflow, an xid assignment record will add xids * to subtrans. If RunningXacts is overflowed then we don't have enough --- 505,544 ---- elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled"); } + else + elog(trace_recovery(DEBUG2), + "recovery snapshot waiting for %u oldest active xid on standby is %u", + standbySnapshotPendingXmin, + running->oldestRunningXid); return; } + Assert(standbyState == STANDBY_INITIALIZED); + /* * OK, we need to initialise from the RunningXactData record */ /* ! * Remove all xids except xids later than the snapshot. We don't know ! * exactly which ones that is until precisely now, so that is why we ! * allow xids to be added only to remove most of them again here. */ ! ExpireOldKnownAssignedTransactionIds(running->nextXid); ! StandbyReleaseOldLocks(running->nextXid); ! /* ! * Nobody else is running yet, but take locks anyhow ! */ ! LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); /* ! * Combine the running xact data with already known xids, if any exist. ! * KnownAssignedXids is sorted so we cannot just add new xids, we have ! * to combine them first, sort them and then re-add to KnownAssignedXids. ! * ! * Some of the new xids are top-level xids and some are subtransactions. We ! * don't call SubtransSetParent because it doesn't matter yet. If we aren't * overflowed then all xids will fit in snapshot and so we don't need * subtrans. If we later overflow, an xid assignment record will add xids * to subtrans. If RunningXacts is overflowed then we don't have enough *************** *** 547,605 **** ProcArrayApplyRecoveryInfo(RunningTransactions running) */ /* ! * Nobody else is running yet, but take locks anyhow */ ! LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); ! /* Reset latestCompletedXid */ ! ShmemVariableCache->latestCompletedXid = running->nextXid; ! TransactionIdRetreat(ShmemVariableCache->latestCompletedXid); /* ! * Add our new xids into the array */ ! for (xid_index = 0; xid_index < running->xcnt; xid_index++) { ! TransactionId xid = running->xids[xid_index]; /* ! * The running-xacts snapshot can contain xids that did finish between ! * when the snapshot was taken and when it was written to WAL. Such ! * transactions are not running anymore, so ignore them. */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) continue; ! KnownAssignedXidsAdd(xid, xid, true); } ! KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); /* ! * Update lastOverflowedXid if the snapshot has any missing subxids. * We don't know the specific subxids that are missing, so conservatively * assume the last one is latestObservedXid. If no missing subxids, * try to clear lastOverflowedXid. */ if (running->subxid_overflow) { if (TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid)) procArray->lastOverflowedXid = latestObservedXid; } ! else if (TransactionIdFollows(running->oldestRunningXid, procArray->lastOverflowedXid)) ! procArray->lastOverflowedXid = InvalidTransactionId; /* nextXid must be beyond any observed xid */ ! if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid)) ! ShmemVariableCache->nextXid = running->nextXid; LWLockRelease(ProcArrayLock); elog(trace_recovery(DEBUG2), "running transaction data initialized"); if (standbyState == STANDBY_SNAPSHOT_READY) elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled"); } /* --- 546,693 ---- */ /* ! * Allocate a temporary array so we can combine xids. The total ! * of both arrays should never normally exceed TOTAL_MAX_CACHED_SUBXIDS. */ ! xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS); ! ! /* ! * Get the remaining KnownAssignedXids. In most cases there won't ! * be any at all since this exists only to catch a theoretical ! * race condition. ! */ ! nxids = KnownAssignedXidsGet(xids, InvalidTransactionId); ! if (nxids > 0) ! KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); ! /* ! * Now we have a copy of any KnownAssignedXids we can zero the ! * array before we re-insertion of combined snapshot. ! */ ! KnownAssignedXidsRemovePreceding(InvalidTransactionId); /* ! * Add to the temp array any xids which have not already completed, ! * taking care not to overflow in extreme cases. */ ! for (i = 0; i < running->xcnt; i++) { ! TransactionId xid = running->xids[i]; /* ! * The running-xacts snapshot can contain xids that were running at ! * the time of the snapshot, yet complete before the snapshot was ! * written to WAL. They're running now, so ignore them. */ if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid)) continue; ! xids[nxids++] = xid; ! ! /* ! * Test for overflow only after we have filtered out already complete ! * transactions. ! */ ! if (nxids > TOTAL_MAX_CACHED_SUBXIDS) ! elog(ERROR, "too many xids to add into KnownAssignedXids"); } ! if (nxids > 0) ! { ! /* ! * Sort the array so that we can add them safely into KnownAssignedXids. ! */ ! qsort(xids, nxids, sizeof(TransactionId), xidComparator); ! ! /* ! * Re-initialise latestObservedXid to the highest xid we've seen. ! */ ! latestObservedXid = xids[nxids - 1]; ! ! /* ! * Add the sorted snapshot into KnownAssignedXids ! */ ! for (i = 0; i < nxids; i++) ! { ! TransactionId xid = xids[i]; ! ! KnownAssignedXidsAdd(xid, xid, true); ! } ! ! KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); ! } ! ! pfree(xids); /* ! * Now we've got the running xids we need to set the global values ! * thare used to track snapshots as they evolve further ! * ! * * latestCompletedXid which will be the xmax for snapshots ! * * lastOverflowedXid which shows whether snapshots overflow ! * * nextXid ! * ! * If the snapshot overflowed, then we still initialise with what we know, ! * but the recovery snapshot isn't fully valid yet because we know there ! * are some subxids missing. * We don't know the specific subxids that are missing, so conservatively * assume the last one is latestObservedXid. If no missing subxids, * try to clear lastOverflowedXid. + * + * If the snapshot didn't overflow it's still possible that an overflow + * occurred in the gap between taking snapshot and logging record, so + * we also need to check if lastOverflowedXid is already ahead of us. */ if (running->subxid_overflow) { + standbyState = STANDBY_SNAPSHOT_PENDING; + + standbySnapshotPendingXmin = latestObservedXid; if (TransactionIdFollows(latestObservedXid, procArray->lastOverflowedXid)) procArray->lastOverflowedXid = latestObservedXid; } ! else if (TransactionIdFollows(procArray->lastOverflowedXid, ! latestObservedXid)) ! { ! standbyState = STANDBY_SNAPSHOT_PENDING; ! ! standbySnapshotPendingXmin = procArray->lastOverflowedXid; ! } ! else ! { ! standbyState = STANDBY_SNAPSHOT_READY; ! ! standbySnapshotPendingXmin = InvalidTransactionId; ! if (TransactionIdFollows(running->oldestRunningXid, procArray->lastOverflowedXid)) ! procArray->lastOverflowedXid = InvalidTransactionId; ! } ! ! /* ! * If a transaction completed in the gap between taking and logging the ! * snapshot then latestCompletedXid may already be higher than the value ! * from the snapshot, so check before we use the incoming value. ! */ ! if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid, ! running->latestCompletedXid)) ! ShmemVariableCache->latestCompletedXid = running->latestCompletedXid; /* nextXid must be beyond any observed xid */ ! nextXid = latestObservedXid; ! TransactionIdAdvance(nextXid); ! if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid)) ! ShmemVariableCache->nextXid = nextXid; LWLockRelease(ProcArrayLock); elog(trace_recovery(DEBUG2), "running transaction data initialized"); + KnownAssignedXidsDisplay(trace_recovery(DEBUG3)); if (standbyState == STANDBY_SNAPSHOT_READY) elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled"); + else + ereport(LOG, + (errmsg("consistent state delayed because recovery snapshot incomplete"))); } /* *************** *** 613,620 **** ProcArrayApplyXidAssignment(TransactionId topxid, TransactionId max_xid; int i; ! if (standbyState < STANDBY_SNAPSHOT_PENDING) ! return; max_xid = TransactionIdLatest(topxid, nsubxids, subxids); --- 701,708 ---- TransactionId max_xid; int i; ! Assert(standbyState >= STANDBY_INITIALIZED); ! max_xid = TransactionIdLatest(topxid, nsubxids, subxids); *************** *** 1410,1415 **** GetRunningTransactionData(void) --- 1498,1504 ---- CurrentRunningXacts->subxid_overflow = suboverflowed; CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid; CurrentRunningXacts->oldestRunningXid = oldestRunningXid; + CurrentRunningXacts->latestCompletedXid = latestCompletedXid; LWLockRelease(XidGenLock); LWLockRelease(ProcArrayLock); *************** *** 2219,2253 **** DisplayXidCache(void) * * RecordKnownAssignedTransactionIds() should be run for *every* WAL record * type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first ! * snapshot so that RecordKnownAssignedTransactionIds() can be called). * ! * Must only be called in Startup process. */ void RecordKnownAssignedTransactionIds(TransactionId xid) { ! /* ! * Skip processing if the current snapshot is not initialized. ! */ ! if (standbyState < STANDBY_SNAPSHOT_PENDING) ! return; ! ! /* ! * We can see WAL records before the running-xacts snapshot that contain ! * XIDs that are not in the running-xacts snapshot, but that we know to ! * have finished before the running-xacts snapshot was taken. Don't waste ! * precious shared memory by keeping them in the hash table. ! * ! * We can also see WAL records before the running-xacts snapshot that ! * contain XIDs that are not in the running-xacts snapshot for a different ! * reason: the transaction started *after* the running-xacts snapshot was ! * taken, but before it was written to WAL. We must be careful to not ! * ignore such XIDs. Because such a transaction started after the ! * running-xacts snapshot was taken, it must have an XID larger than the ! * oldest XID according to the running-xacts snapshot. ! */ ! if (TransactionIdPrecedes(xid, snapshotOldestActiveXid)) ! return; elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u", xid, latestObservedXid); --- 2308,2323 ---- * * RecordKnownAssignedTransactionIds() should be run for *every* WAL record * type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first ! * snapshot so that RecordKnownAssignedTransactionIds() can be called). Must ! * be called for each record after we have executed StartupCLog() et al, ! * since we must ExtendCLOG() etc.. * ! * Called during recovery in analogy with and in place of GetNewTransactionId() */ void RecordKnownAssignedTransactionIds(TransactionId xid) { ! Assert(standbyState >= STANDBY_INITIALIZED); elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u", xid, latestObservedXid); *************** *** 2287,2317 **** RecordKnownAssignedTransactionIds(TransactionId xid) * Now we can advance latestObservedXid */ latestObservedXid = xid; - } ! /* nextXid must be beyond any observed xid */ ! if (TransactionIdFollowsOrEquals(latestObservedXid, ! ShmemVariableCache->nextXid)) ! { ! ShmemVariableCache->nextXid = latestObservedXid; ! TransactionIdAdvance(ShmemVariableCache->nextXid); } } /* * ExpireTreeKnownAssignedTransactionIds * Remove the given XIDs from KnownAssignedXids. */ void ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, ! TransactionId *subxids) { ! TransactionId max_xid; ! ! if (standbyState == STANDBY_DISABLED) ! return; /* nothing to do */ ! ! max_xid = TransactionIdLatest(xid, nsubxids, subxids); /* * Uses same locking as transaction commit --- 2357,2381 ---- * Now we can advance latestObservedXid */ latestObservedXid = xid; ! /* ShmemVariableCache->nextXid must be beyond any observed xid */ ! next_expected_xid = latestObservedXid; ! TransactionIdAdvance(next_expected_xid); ! ShmemVariableCache->nextXid = next_expected_xid; } } /* * ExpireTreeKnownAssignedTransactionIds * Remove the given XIDs from KnownAssignedXids. + * + * Called during recovery in analogy with and in place of ProcArrayEndTransaction() */ void ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids, ! TransactionId *subxids, TransactionId max_xid) { ! Assert(standbyState >= STANDBY_INITIALIZED); /* * Uses same locking as transaction commit *************** *** 2882,2889 **** KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, int head, tail; int i; - Assert(TransactionIdIsValid(xmax)); - /* * Fetch head just once, since it may change while we loop. * We can stop once we reach the initially seen head, since --- 2946,2951 ---- *************** *** 2894,2901 **** KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, * Must take spinlock to ensure we see up-to-date array contents. */ SpinLockAcquire(&pArray->known_assigned_xids_lck); ! head = pArray->tailKnownAssignedXids; ! tail = pArray->headKnownAssignedXids; SpinLockRelease(&pArray->known_assigned_xids_lck); for (i = tail; i < head; i++) --- 2956,2963 ---- * Must take spinlock to ensure we see up-to-date array contents. */ SpinLockAcquire(&pArray->known_assigned_xids_lck); ! tail = pArray->tailKnownAssignedXids; ! head = pArray->headKnownAssignedXids; SpinLockRelease(&pArray->known_assigned_xids_lck); for (i = tail; i < head; i++) *************** *** 2917,2923 **** KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin, * Filter out anything >= xmax, again relying on sorted property * of array. */ ! if (TransactionIdPrecedesOrEquals(xmax, knownXid)) break; /* Add knownXid into output array */ --- 2979,2986 ---- * Filter out anything >= xmax, again relying on sorted property * of array. */ ! if (TransactionIdIsValid(xmax) && ! TransactionIdFollowsOrEquals(knownXid, xmax)) break; /* Add knownXid into output array */ *** a/src/backend/storage/ipc/standby.c --- b/src/backend/storage/ipc/standby.c *************** *** 776,781 **** standby_desc(StringInfo buf, uint8 xl_info, char *rec) --- 776,826 ---- /* * Log details of the current snapshot to WAL. This allows the snapshot state * to be reconstructed on the standby. + * + * We can move directly to STANDBY_SNAPSHOT_READY at startup if we + * start from a shutdown checkpoint because we know nothing was running + * at that time and our recovery snapshot is known empty. In the more + * typical case of an online checkpoint we need to jump through a few + * hoops to get a correct recovery snapshot and this requires two or + * sometimes a three stage process. + * + * The initial snapshot must contain all running xids and all current + * AccessExclusiveLocks at a point in time on the standby. Assembling + * that information while the server is running requires many and + * various LWLocks, so we choose to derive that information piece by + * piece and then re-assemble that info on the standby. When that + * information is fully assembled we move to STANDBY_SNAPSHOT_READY. + * + * Since locking on the primary when we derive the information is not + * strict, we note that there is a time window between the derivation and + * writing to WAL of the derived information. That allows race conditions + * that we must resolve, since xids and locks may enter or leave the + * snapshot during that window. This creates the issue that an xid or + * lock may start *after* the snapshot has been derived yet *before* the + * snapshot is logged in the running xacts WAL record. We resolve this by + * starting to accumulate changes at a point immediately before we derive + * the snapshot on the primary and ignore duplicates when we later apply + * the snapshot from the running xacts record. This is implemented during + * CreateCheckpoint() where we use the logical checkpoint location as + * our starting point and then write the running xacts record immediately + * before writing the main checkpoint WAL record. Since we always start + * up from a checkpoint and we are immediately at our starting point, so + * we unconditionally move to STANDBY_INITIALIZED. After this point we + * must do 4 things: + * * move shared nextXid forwards as we see new xids + * * extend the clog and subtrans with each new xid + * * keep track of uncommitted known assigned xids + * * keep track of uncommitted AccessExclusiveLocks + * + * When we see a commit/abort we must remove known assigned xids and locks + * from the completing transaction. Attempted removals that cannot locate + * an entry are expected and must not cause an error when we are in state + * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and + * KnownAssignedXidsRemove(). + * + * Later, when we apply the running xact data we must be careful to ignore + * transactions already committed, since those commits raced ahead when + * making WAL entries. */ void LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid) *************** *** 788,793 **** LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid) --- 833,844 ---- /* * Get details of any AccessExclusiveLocks being held at the moment. + * + * XXX GetRunningTransactionLocks() currently holds a lock on all partitions + * though it is possible to further optimise the locking. By reference + * counting locks and storing the value on the ProcArray entry for each backend + * we can easily tell if any locks need recording without trying to acquire + * the partition locks and scanning the lock table. */ locks = GetRunningTransactionLocks(&nlocks); if (nlocks > 0) *************** *** 798,803 **** LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid) --- 849,859 ---- * record we write, because standby will open up when it sees this. */ running = GetRunningTransactionData(); + /* + * The gap between GetRunningTransactionData() and LogCurrentRunningXacts() + * is what most of the fuss is about here, so artifically extending this + * interval is a great way to test the little used parts of the code. + */ LogCurrentRunningXacts(running); *oldestActiveXid = running->oldestRunningXid; *** a/src/include/storage/procarray.h --- b/src/include/storage/procarray.h *************** *** 35,41 **** extern void ProcArrayApplyXidAssignment(TransactionId topxid, extern void RecordKnownAssignedTransactionIds(TransactionId xid); extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid, ! int nsubxids, TransactionId *subxids); extern void ExpireAllKnownAssignedTransactionIds(void); extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid); --- 35,42 ---- extern void RecordKnownAssignedTransactionIds(TransactionId xid); extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid, ! int nsubxids, TransactionId *subxids, ! TransactionId max_xid); extern void ExpireAllKnownAssignedTransactionIds(void); extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid); *** a/src/include/storage/standby.h --- b/src/include/storage/standby.h *************** *** 68,73 **** typedef struct xl_running_xacts --- 68,74 ---- bool subxid_overflow; /* snapshot overflowed, subxids missing */ TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId oldestRunningXid; /* *not* oldestXmin */ + TransactionId latestCompletedXid; /* so we can set xmax */ TransactionId xids[1]; /* VARIABLE LENGTH ARRAY */ } xl_running_xacts; *************** *** 97,102 **** typedef struct RunningTransactionsData --- 98,104 ---- bool subxid_overflow; /* snapshot overflowed, subxids missing */ TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */ TransactionId oldestRunningXid; /* *not* oldestXmin */ + TransactionId latestCompletedXid; /* so we can set xmax */ TransactionId *xids; /* array of (sub)xids still running */ } RunningTransactionsData;
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers