Tom pointed out two errors that could effect the HS startup code, which
is fairly subtle.
Attached patch is small with these changes
* adds new section comment as posted to hackers earlier
* adds comments to other functions
* makes minor corrections to a few existing comments
* fixes the two bugs observed by Tom so that snapshotOldestActiveXid no
longer exists, xids are tracked as soon as (standbyState >=
STANDBY_INITIALIZED) and ProcArrayApplyRecoveryInfo() is substantially
re-written with comments at every stage
* fixes an old bug that set xmax incorrectly in initial snapshot, though
would normally have been corrected soon afterwards as recovery continues
* fixes a simple bug introduced in recent sorted knownassignedxids patch
that would give incorrect snapshots
* minor refactoring of xact_redo_abort() and
ExpireTreeKnownAssignedTransactionIds()
* new debug message for when in STANDBY_SNAPSHOT_PENDING and we fail to
move to STANDBY_SNAPSHOT_READY
I have avoided adding a new WAL record as I mentioned was required
earlier, deciding that it wasn't any easier to understand that way.
I will apply tomorrow in the absence of review objections.
--
Simon Riggs www.2ndQuadrant.com
*** a/src/backend/access/transam/xact.c
--- b/src/backend/access/transam/xact.c
***************
*** 4378,4384 **** xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
LWLockRelease(XidGenLock);
}
! if (!InHotStandby)
{
/*
* Mark the transaction committed in pg_clog.
--- 4378,4384 ----
LWLockRelease(XidGenLock);
}
! if (standbyState == STANDBY_DISABLED)
{
/*
* Mark the transaction committed in pg_clog.
***************
*** 4412,4423 **** xact_redo_commit(xl_xact_commit *xlrec, TransactionId xid, XLogRecPtr lsn)
/*
* We must mark clog before we update the ProcArray.
*/
! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
! * occurs in .
*/
ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs,
XactCompletionRelcacheInitFileInval(xlrec),
--- 4412,4423 ----
/*
* We must mark clog before we update the ProcArray.
*/
! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
/*
* Send any cache invalidations attached to the commit. We must
* maintain the same order of invalidation then release locks as
! * occurs in CommitTransaction().
*/
ProcessCommittedInvalidationMessages(inval_msgs, xlrec->nmsgs,
XactCompletionRelcacheInitFileInval(xlrec),
***************
*** 4499,4505 **** xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
LWLockRelease(XidGenLock);
}
! if (InHotStandby)
{
/*
* If a transaction completion record arrives that has as-yet
--- 4499,4510 ----
LWLockRelease(XidGenLock);
}
! if (standbyState == STANDBY_DISABLED)
! {
! /* Mark the transaction aborted in pg_clog, no need for async stuff */
! TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
! }
! else
{
/*
* If a transaction completion record arrives that has as-yet
***************
*** 4511,4527 **** xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
* already. Leave it in.
*/
RecordKnownAssignedTransactionIds(max_xid);
- }
! /* Mark the transaction aborted in pg_clog, no need for async stuff */
! TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
- if (InHotStandby)
- {
/*
! * We must mark clog before we update the ProcArray.
*/
! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids);
/*
* There are no flat files that need updating, nor invalidation
--- 4516,4529 ----
* already. Leave it in.
*/
RecordKnownAssignedTransactionIds(max_xid);
! /* Mark the transaction aborted in pg_clog, no need for async stuff */
! TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
/*
! * We must update the ProcArray after we have marked clog.
*/
! ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
/*
* There are no flat files that need updating, nor invalidation
***************
*** 4596,4602 **** xact_redo(XLogRecPtr lsn, XLogRecord *record)
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
! if (InHotStandby)
ProcArrayApplyXidAssignment(xlrec->xtop,
xlrec->nsubxacts, xlrec->xsub);
}
--- 4598,4604 ----
{
xl_xact_assignment *xlrec = (xl_xact_assignment *) XLogRecGetData(record);
! if (standbyState >= STANDBY_INITIALIZED)
ProcArrayApplyXidAssignment(xlrec->xtop,
xlrec->nsubxacts, xlrec->xsub);
}
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
***************
*** 5995,6000 **** StartupXLOG(void)
--- 5995,6001 ----
if (wasShutdown)
{
RunningTransactionsData running;
+ TransactionId latestCompletedXid;
/*
* Construct a RunningTransactions snapshot representing a shut
***************
*** 6006,6011 **** StartupXLOG(void)
--- 6007,6015 ----
running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid;
running.oldestRunningXid = oldestActiveXID;
+ latestCompletedXid = checkPoint.nextXid;
+ TransactionIdRetreat(latestCompletedXid);
+ running.latestCompletedXid = latestCompletedXid;
running.xids = xids;
ProcArrayApplyRecoveryInfo(&running);
***************
*** 6154,6161 **** StartupXLOG(void)
xlogctl->recoveryLastXTime = recoveryLastXTime;
SpinLockRelease(&xlogctl->info_lck);
! /* In Hot Standby mode, keep track of XIDs we've seen */
! if (InHotStandby && TransactionIdIsValid(record->xl_xid))
RecordKnownAssignedTransactionIds(record->xl_xid);
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
--- 6158,6166 ----
xlogctl->recoveryLastXTime = recoveryLastXTime;
SpinLockRelease(&xlogctl->info_lck);
! /* If we are attempting to enter Hot Standby mode, process XIDs we see */
! if (standbyState >= STANDBY_INITIALIZED &&
! TransactionIdIsValid(record->xl_xid))
RecordKnownAssignedTransactionIds(record->xl_xid);
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
***************
*** 7803,7808 **** xlog_redo(XLogRecPtr lsn, XLogRecord *record)
--- 7808,7814 ----
TransactionId *xids;
int nxids;
TransactionId oldestActiveXID;
+ TransactionId latestCompletedXid;
RunningTransactionsData running;
oldestActiveXID = PrescanPreparedTransactions(&xids, &nxids);
***************
*** 7817,7822 **** xlog_redo(XLogRecPtr lsn, XLogRecord *record)
--- 7823,7831 ----
running.subxid_overflow = false;
running.nextXid = checkPoint.nextXid;
running.oldestRunningXid = oldestActiveXID;
+ latestCompletedXid = checkPoint.nextXid;
+ TransactionIdRetreat(latestCompletedXid);
+ running.latestCompletedXid = latestCompletedXid;
running.xids = xids;
ProcArrayApplyRecoveryInfo(&running);
*** a/src/backend/storage/ipc/procarray.c
--- b/src/backend/storage/ipc/procarray.c
***************
*** 105,116 **** static TransactionId latestObservedXid = InvalidTransactionId;
*/
static TransactionId standbySnapshotPendingXmin;
- /*
- * Oldest transaction still running according to the running-xacts snapshot
- * we initialized standby mode from.
- */
- static TransactionId snapshotOldestActiveXid;
-
#ifdef XIDCACHE_DEBUG
/* counters for XidCache measurement */
--- 105,110 ----
***************
*** 158,164 **** static void KnownAssignedXidsRemove(TransactionId xid);
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
TransactionId *subxids);
static void KnownAssignedXidsRemovePreceding(TransactionId xid);
! static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
TransactionId *xmin,
TransactionId xmax);
--- 152,158 ----
static void KnownAssignedXidsRemoveTree(TransactionId xid, int nsubxids,
TransactionId *subxids);
static void KnownAssignedXidsRemovePreceding(TransactionId xid);
! static int KnownAssignedXidsGet(TransactionId *xarray, TransactionId xmax);
static int KnownAssignedXidsGetAndSetXmin(TransactionId *xarray,
TransactionId *xmin,
TransactionId xmax);
***************
*** 439,448 **** ProcArrayClearTransaction(PGPROC *proc)
proc->subxids.overflowed = false;
}
void
ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
{
! snapshotOldestActiveXid = oldestActiveXid;
}
/*
--- 433,449 ----
proc->subxids.overflowed = false;
}
+ /*
+ * ProcArrayInitRecoveryInfo
+ *
+ * When trying to assemble our snapshot we only care about xids after this value.
+ * See comments for LogStandbySnapshot().
+ */
void
ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
{
! latestObservedXid = oldestActiveXid;
! TransactionIdRetreat(latestObservedXid);
}
/*
***************
*** 458,473 **** ProcArrayInitRecoveryInfo(TransactionId oldestActiveXid)
* with FATAL errors fail to write abort records, which could cause eventual
* overflow.
*
! * Only used during recovery. Notice the signature is very similar to a
! * _redo function and its difficult to decide exactly where this code should
! * reside.
*/
void
ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
- int xid_index; /* main loop */
TransactionId *xids;
! int nxids;
Assert(standbyState >= STANDBY_INITIALIZED);
--- 459,473 ----
* with FATAL errors fail to write abort records, which could cause eventual
* overflow.
*
! * See comments for LogStandbySnapshot().
*/
void
ProcArrayApplyRecoveryInfo(RunningTransactions running)
{
TransactionId *xids;
! int nxids;
! TransactionId nextXid;
! int i;
Assert(standbyState >= STANDBY_INITIALIZED);
***************
*** 505,545 **** ProcArrayApplyRecoveryInfo(RunningTransactions running)
elog(trace_recovery(DEBUG2),
"recovery snapshots are now enabled");
}
return;
}
/*
* OK, we need to initialise from the RunningXactData record
*/
- latestObservedXid = running->nextXid;
- TransactionIdRetreat(latestObservedXid);
/*
! * If the snapshot overflowed, then we still initialise with what we know,
! * but the recovery snapshot isn't fully valid yet because we know there
! * are some subxids missing (ergo we don't know which ones)
*/
! if (!running->subxid_overflow)
! {
! standbyState = STANDBY_SNAPSHOT_READY;
! standbySnapshotPendingXmin = InvalidTransactionId;
! }
! else
! {
! standbyState = STANDBY_SNAPSHOT_PENDING;
! standbySnapshotPendingXmin = latestObservedXid;
! ereport(LOG,
! (errmsg("consistent state delayed because recovery snapshot incomplete")));
! }
! nxids = running->xcnt;
! xids = running->xids;
!
! KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
/*
! * Scan through the incoming array of RunningXacts and collect xids. We
! * don't use SubtransSetParent because it doesn't matter yet. If we aren't
* overflowed then all xids will fit in snapshot and so we don't need
* subtrans. If we later overflow, an xid assignment record will add xids
* to subtrans. If RunningXacts is overflowed then we don't have enough
--- 505,544 ----
elog(trace_recovery(DEBUG2),
"recovery snapshots are now enabled");
}
+ else
+ elog(trace_recovery(DEBUG2),
+ "recovery snapshot waiting for %u oldest active xid on standby is %u",
+ standbySnapshotPendingXmin,
+ running->oldestRunningXid);
return;
}
+ Assert(standbyState == STANDBY_INITIALIZED);
+
/*
* OK, we need to initialise from the RunningXactData record
*/
/*
! * Remove all xids except xids later than the snapshot. We don't know
! * exactly which ones that is until precisely now, so that is why we
! * allow xids to be added only to remove most of them again here.
*/
! ExpireOldKnownAssignedTransactionIds(running->nextXid);
! StandbyReleaseOldLocks(running->nextXid);
! /*
! * Nobody else is running yet, but take locks anyhow
! */
! LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
/*
! * Combine the running xact data with already known xids, if any exist.
! * KnownAssignedXids is sorted so we cannot just add new xids, we have
! * to combine them first, sort them and then re-add to KnownAssignedXids.
! *
! * Some of the new xids are top-level xids and some are subtransactions. We
! * don't call SubtransSetParent because it doesn't matter yet. If we aren't
* overflowed then all xids will fit in snapshot and so we don't need
* subtrans. If we later overflow, an xid assignment record will add xids
* to subtrans. If RunningXacts is overflowed then we don't have enough
***************
*** 547,605 **** ProcArrayApplyRecoveryInfo(RunningTransactions running)
*/
/*
! * Nobody else is running yet, but take locks anyhow
*/
! LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
! /* Reset latestCompletedXid */
! ShmemVariableCache->latestCompletedXid = running->nextXid;
! TransactionIdRetreat(ShmemVariableCache->latestCompletedXid);
/*
! * Add our new xids into the array
*/
! for (xid_index = 0; xid_index < running->xcnt; xid_index++)
{
! TransactionId xid = running->xids[xid_index];
/*
! * The running-xacts snapshot can contain xids that did finish between
! * when the snapshot was taken and when it was written to WAL. Such
! * transactions are not running anymore, so ignore them.
*/
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
continue;
! KnownAssignedXidsAdd(xid, xid, true);
}
! KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
/*
! * Update lastOverflowedXid if the snapshot has any missing subxids.
* We don't know the specific subxids that are missing, so conservatively
* assume the last one is latestObservedXid. If no missing subxids,
* try to clear lastOverflowedXid.
*/
if (running->subxid_overflow)
{
if (TransactionIdFollows(latestObservedXid,
procArray->lastOverflowedXid))
procArray->lastOverflowedXid = latestObservedXid;
}
! else if (TransactionIdFollows(running->oldestRunningXid,
procArray->lastOverflowedXid))
! procArray->lastOverflowedXid = InvalidTransactionId;
/* nextXid must be beyond any observed xid */
! if (TransactionIdFollows(running->nextXid, ShmemVariableCache->nextXid))
! ShmemVariableCache->nextXid = running->nextXid;
LWLockRelease(ProcArrayLock);
elog(trace_recovery(DEBUG2), "running transaction data initialized");
if (standbyState == STANDBY_SNAPSHOT_READY)
elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled");
}
/*
--- 546,693 ----
*/
/*
! * Allocate a temporary array so we can combine xids. The total
! * of both arrays should never normally exceed TOTAL_MAX_CACHED_SUBXIDS.
*/
! xids = palloc(sizeof(TransactionId) * TOTAL_MAX_CACHED_SUBXIDS);
!
! /*
! * Get the remaining KnownAssignedXids. In most cases there won't
! * be any at all since this exists only to catch a theoretical
! * race condition.
! */
! nxids = KnownAssignedXidsGet(xids, InvalidTransactionId);
! if (nxids > 0)
! KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
! /*
! * Now we have a copy of any KnownAssignedXids we can zero the
! * array before we re-insertion of combined snapshot.
! */
! KnownAssignedXidsRemovePreceding(InvalidTransactionId);
/*
! * Add to the temp array any xids which have not already completed,
! * taking care not to overflow in extreme cases.
*/
! for (i = 0; i < running->xcnt; i++)
{
! TransactionId xid = running->xids[i];
/*
! * The running-xacts snapshot can contain xids that were running at
! * the time of the snapshot, yet complete before the snapshot was
! * written to WAL. They're running now, so ignore them.
*/
if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
continue;
! xids[nxids++] = xid;
!
! /*
! * Test for overflow only after we have filtered out already complete
! * transactions.
! */
! if (nxids > TOTAL_MAX_CACHED_SUBXIDS)
! elog(ERROR, "too many xids to add into KnownAssignedXids");
}
! if (nxids > 0)
! {
! /*
! * Sort the array so that we can add them safely into KnownAssignedXids.
! */
! qsort(xids, nxids, sizeof(TransactionId), xidComparator);
!
! /*
! * Re-initialise latestObservedXid to the highest xid we've seen.
! */
! latestObservedXid = xids[nxids - 1];
!
! /*
! * Add the sorted snapshot into KnownAssignedXids
! */
! for (i = 0; i < nxids; i++)
! {
! TransactionId xid = xids[i];
!
! KnownAssignedXidsAdd(xid, xid, true);
! }
!
! KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
! }
!
! pfree(xids);
/*
! * Now we've got the running xids we need to set the global values
! * thare used to track snapshots as they evolve further
! *
! * * latestCompletedXid which will be the xmax for snapshots
! * * lastOverflowedXid which shows whether snapshots overflow
! * * nextXid
! *
! * If the snapshot overflowed, then we still initialise with what we know,
! * but the recovery snapshot isn't fully valid yet because we know there
! * are some subxids missing.
* We don't know the specific subxids that are missing, so conservatively
* assume the last one is latestObservedXid. If no missing subxids,
* try to clear lastOverflowedXid.
+ *
+ * If the snapshot didn't overflow it's still possible that an overflow
+ * occurred in the gap between taking snapshot and logging record, so
+ * we also need to check if lastOverflowedXid is already ahead of us.
*/
if (running->subxid_overflow)
{
+ standbyState = STANDBY_SNAPSHOT_PENDING;
+
+ standbySnapshotPendingXmin = latestObservedXid;
if (TransactionIdFollows(latestObservedXid,
procArray->lastOverflowedXid))
procArray->lastOverflowedXid = latestObservedXid;
}
! else if (TransactionIdFollows(procArray->lastOverflowedXid,
! latestObservedXid))
! {
! standbyState = STANDBY_SNAPSHOT_PENDING;
!
! standbySnapshotPendingXmin = procArray->lastOverflowedXid;
! }
! else
! {
! standbyState = STANDBY_SNAPSHOT_READY;
!
! standbySnapshotPendingXmin = InvalidTransactionId;
! if (TransactionIdFollows(running->oldestRunningXid,
procArray->lastOverflowedXid))
! procArray->lastOverflowedXid = InvalidTransactionId;
! }
!
! /*
! * If a transaction completed in the gap between taking and logging the
! * snapshot then latestCompletedXid may already be higher than the value
! * from the snapshot, so check before we use the incoming value.
! */
! if (TransactionIdPrecedes(ShmemVariableCache->latestCompletedXid,
! running->latestCompletedXid))
! ShmemVariableCache->latestCompletedXid = running->latestCompletedXid;
/* nextXid must be beyond any observed xid */
! nextXid = latestObservedXid;
! TransactionIdAdvance(nextXid);
! if (TransactionIdFollows(nextXid, ShmemVariableCache->nextXid))
! ShmemVariableCache->nextXid = nextXid;
LWLockRelease(ProcArrayLock);
elog(trace_recovery(DEBUG2), "running transaction data initialized");
+ KnownAssignedXidsDisplay(trace_recovery(DEBUG3));
if (standbyState == STANDBY_SNAPSHOT_READY)
elog(trace_recovery(DEBUG2), "recovery snapshots are now enabled");
+ else
+ ereport(LOG,
+ (errmsg("consistent state delayed because recovery snapshot incomplete")));
}
/*
***************
*** 613,620 **** ProcArrayApplyXidAssignment(TransactionId topxid,
TransactionId max_xid;
int i;
! if (standbyState < STANDBY_SNAPSHOT_PENDING)
! return;
max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
--- 701,708 ----
TransactionId max_xid;
int i;
! Assert(standbyState >= STANDBY_INITIALIZED);
!
max_xid = TransactionIdLatest(topxid, nsubxids, subxids);
***************
*** 1410,1415 **** GetRunningTransactionData(void)
--- 1498,1504 ----
CurrentRunningXacts->subxid_overflow = suboverflowed;
CurrentRunningXacts->nextXid = ShmemVariableCache->nextXid;
CurrentRunningXacts->oldestRunningXid = oldestRunningXid;
+ CurrentRunningXacts->latestCompletedXid = latestCompletedXid;
LWLockRelease(XidGenLock);
LWLockRelease(ProcArrayLock);
***************
*** 2219,2253 **** DisplayXidCache(void)
*
* RecordKnownAssignedTransactionIds() should be run for *every* WAL record
* type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first
! * snapshot so that RecordKnownAssignedTransactionIds() can be called).
*
! * Must only be called in Startup process.
*/
void
RecordKnownAssignedTransactionIds(TransactionId xid)
{
! /*
! * Skip processing if the current snapshot is not initialized.
! */
! if (standbyState < STANDBY_SNAPSHOT_PENDING)
! return;
!
! /*
! * We can see WAL records before the running-xacts snapshot that contain
! * XIDs that are not in the running-xacts snapshot, but that we know to
! * have finished before the running-xacts snapshot was taken. Don't waste
! * precious shared memory by keeping them in the hash table.
! *
! * We can also see WAL records before the running-xacts snapshot that
! * contain XIDs that are not in the running-xacts snapshot for a different
! * reason: the transaction started *after* the running-xacts snapshot was
! * taken, but before it was written to WAL. We must be careful to not
! * ignore such XIDs. Because such a transaction started after the
! * running-xacts snapshot was taken, it must have an XID larger than the
! * oldest XID according to the running-xacts snapshot.
! */
! if (TransactionIdPrecedes(xid, snapshotOldestActiveXid))
! return;
elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
xid, latestObservedXid);
--- 2308,2323 ----
*
* RecordKnownAssignedTransactionIds() should be run for *every* WAL record
* type apart from XLOG_XACT_RUNNING_XACTS (since that initialises the first
! * snapshot so that RecordKnownAssignedTransactionIds() can be called). Must
! * be called for each record after we have executed StartupCLog() et al,
! * since we must ExtendCLOG() etc..
*
! * Called during recovery in analogy with and in place of GetNewTransactionId()
*/
void
RecordKnownAssignedTransactionIds(TransactionId xid)
{
! Assert(standbyState >= STANDBY_INITIALIZED);
elog(trace_recovery(DEBUG4), "record known xact %u latestObservedXid %u",
xid, latestObservedXid);
***************
*** 2287,2317 **** RecordKnownAssignedTransactionIds(TransactionId xid)
* Now we can advance latestObservedXid
*/
latestObservedXid = xid;
- }
! /* nextXid must be beyond any observed xid */
! if (TransactionIdFollowsOrEquals(latestObservedXid,
! ShmemVariableCache->nextXid))
! {
! ShmemVariableCache->nextXid = latestObservedXid;
! TransactionIdAdvance(ShmemVariableCache->nextXid);
}
}
/*
* ExpireTreeKnownAssignedTransactionIds
* Remove the given XIDs from KnownAssignedXids.
*/
void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
! TransactionId *subxids)
{
! TransactionId max_xid;
!
! if (standbyState == STANDBY_DISABLED)
! return; /* nothing to do */
!
! max_xid = TransactionIdLatest(xid, nsubxids, subxids);
/*
* Uses same locking as transaction commit
--- 2357,2381 ----
* Now we can advance latestObservedXid
*/
latestObservedXid = xid;
! /* ShmemVariableCache->nextXid must be beyond any observed xid */
! next_expected_xid = latestObservedXid;
! TransactionIdAdvance(next_expected_xid);
! ShmemVariableCache->nextXid = next_expected_xid;
}
}
/*
* ExpireTreeKnownAssignedTransactionIds
* Remove the given XIDs from KnownAssignedXids.
+ *
+ * Called during recovery in analogy with and in place of ProcArrayEndTransaction()
*/
void
ExpireTreeKnownAssignedTransactionIds(TransactionId xid, int nsubxids,
! TransactionId *subxids, TransactionId max_xid)
{
! Assert(standbyState >= STANDBY_INITIALIZED);
/*
* Uses same locking as transaction commit
***************
*** 2882,2889 **** KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
int head, tail;
int i;
- Assert(TransactionIdIsValid(xmax));
-
/*
* Fetch head just once, since it may change while we loop.
* We can stop once we reach the initially seen head, since
--- 2946,2951 ----
***************
*** 2894,2901 **** KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
* Must take spinlock to ensure we see up-to-date array contents.
*/
SpinLockAcquire(&pArray->known_assigned_xids_lck);
! head = pArray->tailKnownAssignedXids;
! tail = pArray->headKnownAssignedXids;
SpinLockRelease(&pArray->known_assigned_xids_lck);
for (i = tail; i < head; i++)
--- 2956,2963 ----
* Must take spinlock to ensure we see up-to-date array contents.
*/
SpinLockAcquire(&pArray->known_assigned_xids_lck);
! tail = pArray->tailKnownAssignedXids;
! head = pArray->headKnownAssignedXids;
SpinLockRelease(&pArray->known_assigned_xids_lck);
for (i = tail; i < head; i++)
***************
*** 2917,2923 **** KnownAssignedXidsGetAndSetXmin(TransactionId *xarray, TransactionId *xmin,
* Filter out anything >= xmax, again relying on sorted property
* of array.
*/
! if (TransactionIdPrecedesOrEquals(xmax, knownXid))
break;
/* Add knownXid into output array */
--- 2979,2986 ----
* Filter out anything >= xmax, again relying on sorted property
* of array.
*/
! if (TransactionIdIsValid(xmax) &&
! TransactionIdFollowsOrEquals(knownXid, xmax))
break;
/* Add knownXid into output array */
*** a/src/backend/storage/ipc/standby.c
--- b/src/backend/storage/ipc/standby.c
***************
*** 776,781 **** standby_desc(StringInfo buf, uint8 xl_info, char *rec)
--- 776,826 ----
/*
* Log details of the current snapshot to WAL. This allows the snapshot state
* to be reconstructed on the standby.
+ *
+ * We can move directly to STANDBY_SNAPSHOT_READY at startup if we
+ * start from a shutdown checkpoint because we know nothing was running
+ * at that time and our recovery snapshot is known empty. In the more
+ * typical case of an online checkpoint we need to jump through a few
+ * hoops to get a correct recovery snapshot and this requires two or
+ * sometimes a three stage process.
+ *
+ * The initial snapshot must contain all running xids and all current
+ * AccessExclusiveLocks at a point in time on the standby. Assembling
+ * that information while the server is running requires many and
+ * various LWLocks, so we choose to derive that information piece by
+ * piece and then re-assemble that info on the standby. When that
+ * information is fully assembled we move to STANDBY_SNAPSHOT_READY.
+ *
+ * Since locking on the primary when we derive the information is not
+ * strict, we note that there is a time window between the derivation and
+ * writing to WAL of the derived information. That allows race conditions
+ * that we must resolve, since xids and locks may enter or leave the
+ * snapshot during that window. This creates the issue that an xid or
+ * lock may start *after* the snapshot has been derived yet *before* the
+ * snapshot is logged in the running xacts WAL record. We resolve this by
+ * starting to accumulate changes at a point immediately before we derive
+ * the snapshot on the primary and ignore duplicates when we later apply
+ * the snapshot from the running xacts record. This is implemented during
+ * CreateCheckpoint() where we use the logical checkpoint location as
+ * our starting point and then write the running xacts record immediately
+ * before writing the main checkpoint WAL record. Since we always start
+ * up from a checkpoint and we are immediately at our starting point, so
+ * we unconditionally move to STANDBY_INITIALIZED. After this point we
+ * must do 4 things:
+ * * move shared nextXid forwards as we see new xids
+ * * extend the clog and subtrans with each new xid
+ * * keep track of uncommitted known assigned xids
+ * * keep track of uncommitted AccessExclusiveLocks
+ *
+ * When we see a commit/abort we must remove known assigned xids and locks
+ * from the completing transaction. Attempted removals that cannot locate
+ * an entry are expected and must not cause an error when we are in state
+ * STANDBY_INITIALIZED. This is implemented in StandbyReleaseLocks() and
+ * KnownAssignedXidsRemove().
+ *
+ * Later, when we apply the running xact data we must be careful to ignore
+ * transactions already committed, since those commits raced ahead when
+ * making WAL entries.
*/
void
LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
***************
*** 788,793 **** LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
--- 833,844 ----
/*
* Get details of any AccessExclusiveLocks being held at the moment.
+ *
+ * XXX GetRunningTransactionLocks() currently holds a lock on all partitions
+ * though it is possible to further optimise the locking. By reference
+ * counting locks and storing the value on the ProcArray entry for each backend
+ * we can easily tell if any locks need recording without trying to acquire
+ * the partition locks and scanning the lock table.
*/
locks = GetRunningTransactionLocks(&nlocks);
if (nlocks > 0)
***************
*** 798,803 **** LogStandbySnapshot(TransactionId *oldestActiveXid, TransactionId *nextXid)
--- 849,859 ----
* record we write, because standby will open up when it sees this.
*/
running = GetRunningTransactionData();
+ /*
+ * The gap between GetRunningTransactionData() and LogCurrentRunningXacts()
+ * is what most of the fuss is about here, so artifically extending this
+ * interval is a great way to test the little used parts of the code.
+ */
LogCurrentRunningXacts(running);
*oldestActiveXid = running->oldestRunningXid;
*** a/src/include/storage/procarray.h
--- b/src/include/storage/procarray.h
***************
*** 35,41 **** extern void ProcArrayApplyXidAssignment(TransactionId topxid,
extern void RecordKnownAssignedTransactionIds(TransactionId xid);
extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid,
! int nsubxids, TransactionId *subxids);
extern void ExpireAllKnownAssignedTransactionIds(void);
extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid);
--- 35,42 ----
extern void RecordKnownAssignedTransactionIds(TransactionId xid);
extern void ExpireTreeKnownAssignedTransactionIds(TransactionId xid,
! int nsubxids, TransactionId *subxids,
! TransactionId max_xid);
extern void ExpireAllKnownAssignedTransactionIds(void);
extern void ExpireOldKnownAssignedTransactionIds(TransactionId xid);
*** a/src/include/storage/standby.h
--- b/src/include/storage/standby.h
***************
*** 68,73 **** typedef struct xl_running_xacts
--- 68,74 ----
bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */
+ TransactionId latestCompletedXid; /* so we can set xmax */
TransactionId xids[1]; /* VARIABLE LENGTH ARRAY */
} xl_running_xacts;
***************
*** 97,102 **** typedef struct RunningTransactionsData
--- 98,104 ----
bool subxid_overflow; /* snapshot overflowed, subxids missing */
TransactionId nextXid; /* copy of ShmemVariableCache->nextXid */
TransactionId oldestRunningXid; /* *not* oldestXmin */
+ TransactionId latestCompletedXid; /* so we can set xmax */
TransactionId *xids; /* array of (sub)xids still running */
} RunningTransactionsData;
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers