On Thu, Jan 16, 2025 at 04:50:09PM +0900, Michael Paquier wrote:
> On Wed, Jan 15, 2025 at 05:00:51PM -0800, Noah Misch wrote:
> > I think "using the current epoch" is wrong for half of the nextFullXid
> > values
> > having epoch > 0. For example, nextFullId==2^32 is in epoch 1, but all the
> > allowable XIDs are in epoch 0. (I mean "allowable" in the sense of
> > AssertTransactionIdInAllowableRange().) From then until we assign another
> > 2^31 XIDs, epochs 0 and 1 are both expected in XID values. After 2^31 XID
> > assignments, every allowable XID be in epoch 1. Hence, twophase.c would
> > need
> > two-epoch logic like we have in widen_snapshot_xid() and
> > XLogRecGetFullXid().
> > Is that right? (I wrote this in a hurry, so this email may have more than
> > the
> > standard level of errors.) Before commit 7e125b2, twophase also had that
> > logic. I didn't work out the user-visible consequences of that logic's new
> > absence here, but I bet on twophase recovery breakage. Similar problem here
> > (up to two epochs are acceptable, not just one):
> >
> > + /* Discard files from past epochs */
> > + if (EpochFromFullTransactionId(fxid) <
> > EpochFromFullTransactionId(nextXid))
>
> Oops, you're right. Your suggestion to unify all that in a single
> routine is an excellent idea. Missed the bits in xid8funcs.c.
Added.
> > I wrote the attached half-baked patch to fix those, but I tend to think it's
> > better to use FullTransactionId as many places as possible in twophase.c.
> > (We'd still need to convert XIDs that we read from xl_xact_prepare records,
> > along the lines of XLogRecGetFullXid().) How do you see it?
>
> I'm all for integrating more FullTransactionIds now that these reflect
> in the file names, and do a deeper cut.
>
> As far as I understand, the most important point of the logic is to
> detect and discard the future files first in restoreTwoPhaseData() ->
> ProcessTwoPhaseBuffer() when scanning the contents of pg_twophase at
> the beginning of recovery. Once this filtering is done, it should be
> safe to use your FullTransactionIdFromAllowableAt() when doing
> the fxid <-> xid transitions between the records and the files on disk
> flushed by a restartpoint which store an XID, and the shmem state of
> GlobalTransactionData with a fxid.
(I did not expect that a function called restoreTwoPhaseData() would run before
a function called PrescanPreparedTransactions(), but so it is.)
How is it that restoreTwoPhaseData() -> ProcessTwoPhaseBuffer() can safely
call TransactionIdDidAbort() when we've not replayed WAL to make CLOG
consistent? What can it assume about the value of TransamVariables->nextXid
at that early time?
> With the additions attached, FullTransactionIdFromAllowableAt() gets
> down from 8 to 6 calls in twophase.c. The change related to
> MarkAsPreparingGuts() seems optional, though.
Thanks. It's probably not worth doing at that level of reduction. I tried
spreading fxid further and got it down to three conversions, corresponding to
redo of each of XLOG_XACT_PREPARE, XLOG_XACT_COMMIT_PREPARED, and
XLOG_XACT_ABORT_PREPARED. I'm attaching the WIP of that. It's not as
satisfying as I expected, so FullTransactionIdFromCurrentEpoch-v0.patch may
yet be the better direction.
The ProcessTwoPhaseBuffer() code to remove "past two-phase state" seems
best-effort, independent of this change. Just because an XID is in a
potentially-acceptable epoch doesn't mean TransactionIdDidCommit() will find
clog for it. I've not studied whether that matters.
Incidentally, this comment about when a function is called:
* PrescanPreparedTransactions
*
* Scan the shared memory entries of TwoPhaseState and determine the range
* of valid XIDs present. This is run during database startup, after we
* have completed reading WAL. TransamVariables->nextXid has been set to
* one more than the highest XID for which evidence exists in WAL.
doesn't match the timing of the actual earliest call:
/* REDO */
if (InRecovery)
{
...
if (ArchiveRecoveryRequested && EnableHotStandby)
{
...
if (wasShutdown)
oldestActiveXID =
PrescanPreparedTransactions(&xids, &nxids);
else
oldestActiveXID = checkPoint.oldestActiveXid;
...
PerformWalRecovery();
performedWalRecovery = true;
> I am trying to figure
> out how to write a regression test to trigger this error, lacking a
> bit of time today. That's going to require more trickery with
> pg_resetwal to make that cheap, or something like that.. Attached are
> some suggestions, as of a 0002 that applies on top of your 0001.
Thanks. I'd value having your regression test, but manual-ish testing could
suffice if it's too hard.
> XLogRecGetFullXid() is used nowhere. This could be removed, perhaps,
> or not?
Maybe. Looks like it was born unused in 67b9b3c (2019-07), so removing may as
well be a separate discussion.
diff --git a/contrib/amcheck/verify_heapam.c b/contrib/amcheck/verify_heapam.c
index 8a8e36d..8273123 100644
--- a/contrib/amcheck/verify_heapam.c
+++ b/contrib/amcheck/verify_heapam.c
@@ -1877,7 +1877,9 @@ check_tuple(HeapCheckContext *ctx, bool
*xmin_commit_status_ok,
/*
* Convert a TransactionId into a FullTransactionId using our cached values of
* the valid transaction ID range. It is the caller's responsibility to have
- * already updated the cached values, if necessary.
+ * already updated the cached values, if necessary. This is akin to
+ * FullTransactionIdFromAllowableAt(), but it tolerates corruption in the form
+ * of an xid before epoch 0.
*/
static FullTransactionId
FullTransactionIdFromXidAndCtx(TransactionId xid, const HeapCheckContext *ctx)
diff --git a/src/backend/access/transam/multixact.c
b/src/backend/access/transam/multixact.c
index 27ccdf9..4e52792 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -1847,7 +1847,7 @@ AtPrepare_MultiXact(void)
* Clean up after successful PREPARE TRANSACTION
*/
void
-PostPrepare_MultiXact(TransactionId xid)
+PostPrepare_MultiXact(FullTransactionId fxid)
{
MultiXactId myOldestMember;
@@ -1858,7 +1858,7 @@ PostPrepare_MultiXact(TransactionId xid)
myOldestMember = OldestMemberMXactId[MyProcNumber];
if (MultiXactIdIsValid(myOldestMember))
{
- ProcNumber dummyProcNumber =
TwoPhaseGetDummyProcNumber(xid, false);
+ ProcNumber dummyProcNumber =
TwoPhaseGetDummyProcNumber(fxid, false);
/*
* Even though storing MultiXactId is atomic, acquire lock to
make
@@ -1896,10 +1896,10 @@ PostPrepare_MultiXact(TransactionId xid)
* Recover the state of a prepared transaction at startup
*/
void
-multixact_twophase_recover(TransactionId xid, uint16 info,
+multixact_twophase_recover(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len)
{
- ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(xid,
false);
+ ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(fxid,
false);
MultiXactId oldestMember;
/*
@@ -1917,10 +1917,10 @@ multixact_twophase_recover(TransactionId xid, uint16
info,
* Similar to AtEOXact_MultiXact but for COMMIT PREPARED
*/
void
-multixact_twophase_postcommit(TransactionId xid, uint16 info,
+multixact_twophase_postcommit(FullTransactionId fxid, uint16 info,
void *recdata, uint32
len)
{
- ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(xid, true);
+ ProcNumber dummyProcNumber = TwoPhaseGetDummyProcNumber(fxid,
true);
Assert(len == sizeof(MultiXactId));
@@ -1932,10 +1932,10 @@ multixact_twophase_postcommit(TransactionId xid, uint16
info,
* This is actually just the same as the COMMIT case.
*/
void
-multixact_twophase_postabort(TransactionId xid, uint16 info,
+multixact_twophase_postabort(FullTransactionId fxid, uint16 info,
void *recdata, uint32
len)
{
- multixact_twophase_postcommit(xid, info, recdata, len);
+ multixact_twophase_postcommit(fxid, info, recdata, len);
}
/*
diff --git a/src/backend/access/transam/twophase.c
b/src/backend/access/transam/twophase.c
index a3190dc..8b2d102 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -159,7 +159,7 @@ typedef struct GlobalTransactionData
*/
XLogRecPtr prepare_start_lsn; /* XLOG offset of prepare
record start */
XLogRecPtr prepare_end_lsn; /* XLOG offset of prepare
record end */
- TransactionId xid; /* The GXACT id */
+ FullTransactionId fxid; /* The GXACT full xid */
Oid owner; /* ID of user that
executed the xact */
ProcNumber locking_backend; /* backend currently working on
the xact */
@@ -197,6 +197,7 @@ static GlobalTransaction MyLockedGxact = NULL;
static bool twophaseExitRegistered = false;
+static void PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning);
static void RecordTransactionCommitPrepared(TransactionId xid,
int nchildren,
TransactionId *children,
@@ -216,19 +217,19 @@ static void RecordTransactionAbortPrepared(TransactionId
xid,
int nstats,
xl_xact_stats_item *stats,
const char *gid);
-static void ProcessRecords(char *bufptr, TransactionId xid,
+static void ProcessRecords(char *bufptr, FullTransactionId fxid,
const TwoPhaseCallback
callbacks[]);
static void RemoveGXact(GlobalTransaction gxact);
static void XlogReadTwoPhaseData(XLogRecPtr lsn, char **buf, int *len);
-static char *ProcessTwoPhaseBuffer(FullTransactionId xid,
+static char *ProcessTwoPhaseBuffer(FullTransactionId fxid,
XLogRecPtr
prepare_start_lsn,
bool
fromdisk, bool setParent, bool setNextXid);
-static void MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid,
+static void MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId
fxid,
const char
*gid, TimestampTz prepared_at, Oid owner,
Oid databaseid);
static void RemoveTwoPhaseFile(FullTransactionId fxid, bool giveWarning);
-static void RecreateTwoPhaseFile(TransactionId xid, void *content, int len);
+static void RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int
len);
/*
* Initialization of shared memory
@@ -356,7 +357,7 @@ PostPrepare_Twophase(void)
* Reserve the GID for the given transaction.
*/
GlobalTransaction
-MarkAsPreparing(TransactionId xid, const char *gid,
+MarkAsPreparing(FullTransactionId fxid, const char *gid,
TimestampTz prepared_at, Oid owner, Oid
databaseid)
{
GlobalTransaction gxact;
@@ -407,7 +408,7 @@ MarkAsPreparing(TransactionId xid, const char *gid,
gxact = TwoPhaseState->freeGXacts;
TwoPhaseState->freeGXacts = gxact->next;
- MarkAsPreparingGuts(gxact, xid, gid, prepared_at, owner, databaseid);
+ MarkAsPreparingGuts(gxact, fxid, gid, prepared_at, owner, databaseid);
gxact->ondisk = false;
@@ -430,11 +431,13 @@ MarkAsPreparing(TransactionId xid, const char *gid,
* Note: This function should be called with appropriate locks held.
*/
static void
-MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId xid, const char
*gid,
- TimestampTz prepared_at, Oid owner, Oid
databaseid)
+MarkAsPreparingGuts(GlobalTransaction gxact, FullTransactionId fxid,
+ const char *gid, TimestampTz
prepared_at, Oid owner,
+ Oid databaseid)
{
PGPROC *proc;
int i;
+ TransactionId xid = XidFromFullTransactionId(fxid);
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
@@ -479,7 +482,7 @@ MarkAsPreparingGuts(GlobalTransaction gxact, TransactionId
xid, const char *gid,
proc->subxidStatus.count = 0;
gxact->prepared_at = prepared_at;
- gxact->xid = xid;
+ gxact->fxid = fxid;
gxact->owner = owner;
gxact->locking_backend = MyProcNumber;
gxact->valid = false;
@@ -797,12 +800,12 @@ pg_prepared_xact(PG_FUNCTION_ARGS)
* caller had better hold it.
*/
static GlobalTransaction
-TwoPhaseGetGXact(TransactionId xid, bool lock_held)
+TwoPhaseGetGXact(FullTransactionId fxid, bool lock_held)
{
GlobalTransaction result = NULL;
int i;
- static TransactionId cached_xid = InvalidTransactionId;
+ static FullTransactionId cached_fxid = {0};
static GlobalTransaction cached_gxact = NULL;
Assert(!lock_held || LWLockHeldByMe(TwoPhaseStateLock));
@@ -811,7 +814,7 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
* During a recovery, COMMIT PREPARED, or ABORT PREPARED, we'll be
called
* repeatedly for the same XID. We can save work with a simple cache.
*/
- if (xid == cached_xid)
+ if (FullTransactionIdEquals(fxid, cached_fxid))
return cached_gxact;
if (!lock_held)
@@ -821,7 +824,7 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
{
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
- if (gxact->xid == xid)
+ if (FullTransactionIdEquals(gxact->fxid, fxid))
{
result = gxact;
break;
@@ -832,9 +835,10 @@ TwoPhaseGetGXact(TransactionId xid, bool lock_held)
LWLockRelease(TwoPhaseStateLock);
if (result == NULL) /* should not happen */
- elog(ERROR, "failed to find GlobalTransaction for xid %u", xid);
+ elog(ERROR, "failed to find GlobalTransaction for xid %u",
+ XidFromFullTransactionId(fxid));
- cached_xid = xid;
+ cached_fxid = fxid;
cached_gxact = result;
return result;
@@ -881,7 +885,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
*have_more = true;
break;
}
- result = gxact->xid;
+ result = XidFromFullTransactionId(gxact->fxid);
}
}
@@ -892,7 +896,7 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
/*
* TwoPhaseGetDummyProcNumber
- * Get the dummy proc number for prepared transaction specified by
XID
+ * Get the dummy proc number for prepared transaction
*
* Dummy proc numbers are similar to proc numbers of real backends. They
* start at MaxBackends, and are unique across all currently active real
@@ -900,24 +904,24 @@ TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
* TwoPhaseStateLock will not be taken, so the caller had better hold it.
*/
ProcNumber
-TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held)
+TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool lock_held)
{
- GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
+ GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
return gxact->pgprocno;
}
/*
* TwoPhaseGetDummyProc
- * Get the PGPROC that represents a prepared transaction specified
by XID
+ * Get the PGPROC that represents a prepared transaction
*
* If lock_held is set to true, TwoPhaseStateLock will not be taken, so the
* caller had better hold it.
*/
PGPROC *
-TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
+TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held)
{
- GlobalTransaction gxact = TwoPhaseGetGXact(xid, lock_held);
+ GlobalTransaction gxact = TwoPhaseGetGXact(fxid, lock_held);
return GetPGProcByNumber(gxact->pgprocno);
}
@@ -926,24 +930,6 @@ TwoPhaseGetDummyProc(TransactionId xid, bool lock_held)
/* State file support
*/
/************************************************************************/
-/*
- * Compute FullTransactionId for the given TransactionId, using the current
- * epoch.
- */
-static inline FullTransactionId
-FullTransactionIdFromCurrentEpoch(TransactionId xid)
-{
- FullTransactionId fxid;
- FullTransactionId nextFullXid;
- uint32 epoch;
-
- nextFullXid = ReadNextFullTransactionId();
- epoch = EpochFromFullTransactionId(nextFullXid);
-
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- return fxid;
-}
-
static inline int
TwoPhaseFilePath(char *path, FullTransactionId fxid)
{
@@ -1050,7 +1036,7 @@ void
StartPrepare(GlobalTransaction gxact)
{
PGPROC *proc = GetPGProcByNumber(gxact->pgprocno);
- TransactionId xid = gxact->xid;
+ TransactionId xid = XidFromFullTransactionId(gxact->fxid);
TwoPhaseFileHeader hdr;
TransactionId *children;
RelFileLocator *commitrels;
@@ -1283,10 +1269,10 @@ RegisterTwoPhaseRecord(TwoPhaseRmgrId rmid, uint16 info,
* contents of the file, issuing an error when finding corrupted data. If
* missing_ok is true, which indicates that missing files can be safely
* ignored, then return NULL. This state can be reached when doing recovery
- * after discarding two-phase files from other epochs.
+ * after discarding two-phase files from frozen epochs.
*/
static char *
-ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
+ReadTwoPhaseFile(FullTransactionId fxid, bool missing_ok)
{
char path[MAXPGPATH];
char *buf;
@@ -1297,9 +1283,7 @@ ReadTwoPhaseFile(TransactionId xid, bool missing_ok)
pg_crc32c calc_crc,
file_crc;
int r;
- FullTransactionId fxid;
- fxid = FullTransactionIdFromCurrentEpoch(xid);
TwoPhaseFilePath(path, fxid);
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
@@ -1465,6 +1449,7 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
char *buf;
TwoPhaseFileHeader *hdr;
bool result;
+ FullTransactionId fxid;
Assert(TransactionIdIsValid(xid));
@@ -1472,7 +1457,8 @@ StandbyTransactionIdIsPrepared(TransactionId xid)
return false; /* nothing to do */
/* Read and validate file */
- buf = ReadTwoPhaseFile(xid, true);
+ fxid = FullTransactionIdFromAllowableAt(TransamVariables->nextXid, xid);
+ buf = ReadTwoPhaseFile(fxid, true);
if (buf == NULL)
return false;
@@ -1492,6 +1478,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
{
GlobalTransaction gxact;
PGPROC *proc;
+ FullTransactionId fxid;
TransactionId xid;
bool ondisk;
char *buf;
@@ -1513,7 +1500,8 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
*/
gxact = LockGXact(gid, GetUserId());
proc = GetPGProcByNumber(gxact->pgprocno);
- xid = gxact->xid;
+ fxid = gxact->fxid;
+ xid = XidFromFullTransactionId(fxid);
/*
* Read and validate 2PC state data. State data will typically be stored
@@ -1521,7 +1509,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
* to disk if for some reason they have lived for a long time.
*/
if (gxact->ondisk)
- buf = ReadTwoPhaseFile(xid, false);
+ buf = ReadTwoPhaseFile(fxid, false);
else
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf, NULL);
@@ -1640,11 +1628,11 @@ FinishPreparedTransaction(const char *gid, bool
isCommit)
/* And now do the callbacks */
if (isCommit)
- ProcessRecords(bufptr, xid, twophase_postcommit_callbacks);
+ ProcessRecords(bufptr, fxid, twophase_postcommit_callbacks);
else
- ProcessRecords(bufptr, xid, twophase_postabort_callbacks);
+ ProcessRecords(bufptr, fxid, twophase_postabort_callbacks);
- PredicateLockTwoPhaseFinish(xid, isCommit);
+ PredicateLockTwoPhaseFinish(fxid, isCommit);
/*
* Read this value while holding the two-phase lock, as the on-disk 2PC
@@ -1664,17 +1652,9 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
/* Count the prepared xact as committed or aborted */
AtEOXact_PgStat(isCommit, false);
- /*
- * And now we can clean up any files we may have left. These should be
- * from the current epoch.
- */
+ /* And now we can clean up any files we may have left. */
if (ondisk)
- {
- FullTransactionId fxid;
-
- fxid = FullTransactionIdFromCurrentEpoch(xid);
RemoveTwoPhaseFile(fxid, true);
- }
MyLockedGxact = NULL;
@@ -1687,7 +1667,7 @@ FinishPreparedTransaction(const char *gid, bool isCommit)
* Scan 2PC state data in memory and call the indicated callbacks for each 2PC
record.
*/
static void
-ProcessRecords(char *bufptr, TransactionId xid,
+ProcessRecords(char *bufptr, FullTransactionId fxid,
const TwoPhaseCallback callbacks[])
{
for (;;)
@@ -1701,14 +1681,14 @@ ProcessRecords(char *bufptr, TransactionId xid,
bufptr += MAXALIGN(sizeof(TwoPhaseRecordOnDisk));
if (callbacks[record->rmid] != NULL)
- callbacks[record->rmid] (xid, record->info, bufptr,
record->len);
+ callbacks[record->rmid] (fxid, record->info, bufptr,
record->len);
bufptr += MAXALIGN(record->len);
}
}
/*
- * Remove the 2PC file for the specified XID.
+ * Remove the 2PC file.
*
* If giveWarning is false, do not complain about file-not-present;
* this is an expected case during WAL replay.
@@ -1737,20 +1717,17 @@ RemoveTwoPhaseFile(FullTransactionId fxid, bool
giveWarning)
* Note: content and len don't include CRC.
*/
static void
-RecreateTwoPhaseFile(TransactionId xid, void *content, int len)
+RecreateTwoPhaseFile(FullTransactionId fxid, void *content, int len)
{
char path[MAXPGPATH];
pg_crc32c statefile_crc;
int fd;
- FullTransactionId fxid;
/* Recompute CRC */
INIT_CRC32C(statefile_crc);
COMP_CRC32C(statefile_crc, content, len);
FIN_CRC32C(statefile_crc);
- /* Use current epoch */
- fxid = FullTransactionIdFromCurrentEpoch(xid);
TwoPhaseFilePath(path, fxid);
fd = OpenTransientFile(path,
@@ -1863,7 +1840,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
int len;
XlogReadTwoPhaseData(gxact->prepare_start_lsn, &buf,
&len);
- RecreateTwoPhaseFile(gxact->xid, buf, len);
+ RecreateTwoPhaseFile(gxact->fxid, buf, len);
gxact->ondisk = true;
gxact->prepare_start_lsn = InvalidXLogRecPtr;
gxact->prepare_end_lsn = InvalidXLogRecPtr;
@@ -1900,7 +1877,7 @@ CheckPointTwoPhase(XLogRecPtr redo_horizon)
* This is called once at the beginning of recovery, saving any extra
* lookups in the future. Two-phase files that are newer than the
* minimum XID horizon are discarded on the way. Two-phase files with
- * an epoch older or newer than the current checkpoint's record epoch
+ * an epoch frozen relative to the current checkpoint's record epoch
* are also discarded.
*/
void
@@ -1925,7 +1902,7 @@ restoreTwoPhaseData(void)
if (buf == NULL)
continue;
- PrepareRedoAdd(buf, InvalidXLogRecPtr,
+ PrepareRedoAdd(fxid, buf, InvalidXLogRecPtr,
InvalidXLogRecPtr,
InvalidRepOriginId);
}
}
@@ -1971,7 +1948,6 @@ PrescanPreparedTransactions(TransactionId **xids_p, int
*nxids_p)
TransactionId origNextXid = XidFromFullTransactionId(nextXid);
TransactionId result = origNextXid;
TransactionId *xids = NULL;
- uint32 epoch = EpochFromFullTransactionId(nextXid);
int nxids = 0;
int allocsize = 0;
int i;
@@ -1979,20 +1955,15 @@ PrescanPreparedTransactions(TransactionId **xids_p, int
*nxids_p)
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
- TransactionId xid;
FullTransactionId fxid;
+ TransactionId xid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
Assert(gxact->inredo);
- xid = gxact->xid;
-
- /*
- * All two-phase files with past and future epoch in
pg_twophase are
- * gone at this point, so we're OK to rely on only the current
epoch.
- */
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
+ fxid = gxact->fxid;
+ xid = XidFromFullTransactionId(fxid);
buf = ProcessTwoPhaseBuffer(fxid,
gxact->prepare_start_lsn,
gxact->ondisk, false, true);
@@ -2055,31 +2026,16 @@ void
StandbyRecoverPreparedTransactions(void)
{
int i;
- uint32 epoch;
- FullTransactionId nextFullXid;
-
- /* get current epoch */
- nextFullXid = ReadNextFullTransactionId();
- epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
- TransactionId xid;
- FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
Assert(gxact->inredo);
- xid = gxact->xid;
-
- /*
- * At this stage, we're OK to work with the current epoch as
all past
- * and future files have been already discarded.
- */
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(gxact->fxid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf != NULL)
@@ -2108,17 +2064,10 @@ void
RecoverPreparedTransactions(void)
{
int i;
- uint32 epoch;
- FullTransactionId nextFullXid;
-
- /* get current epoch */
- nextFullXid = ReadNextFullTransactionId();
- epoch = EpochFromFullTransactionId(nextFullXid);
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
{
- TransactionId xid;
FullTransactionId fxid;
char *buf;
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
@@ -2128,12 +2077,6 @@ RecoverPreparedTransactions(void)
const char *gid;
/*
- * At this stage, we're OK to work with the current epoch as
all past
- * and future files have been already discarded.
- */
- xid = gxact->xid;
-
- /*
* Reconstruct subtrans state for the transaction --- needed
because
* pg_subtrans is not preserved over a restart. Note that we
are
* linking all the subtransactions directly to the top-level
XID;
@@ -2142,18 +2085,20 @@ RecoverPreparedTransactions(void)
* SubTransSetParent has been set before, if the prepared
transaction
* generated xid assignment records.
*/
- fxid = FullTransactionIdFromEpochAndXid(epoch, xid);
- buf = ProcessTwoPhaseBuffer(fxid,
+ buf = ProcessTwoPhaseBuffer(gxact->fxid,
gxact->prepare_start_lsn,
gxact->ondisk, true, false);
if (buf == NULL)
continue;
ereport(LOG,
- (errmsg("recovering prepared transaction %u
from shared memory", xid)));
+ (errmsg("recovering prepared transaction %u of
epoch %u from shared memory",
+
XidFromFullTransactionId(gxact->fxid),
+
EpochFromFullTransactionId(gxact->fxid))));
hdr = (TwoPhaseFileHeader *) buf;
- Assert(TransactionIdEquals(hdr->xid, xid));
+ Assert(TransactionIdEquals(hdr->xid,
+
XidFromFullTransactionId(gxact->fxid)));
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
gid = (const char *) bufptr;
bufptr += MAXALIGN(hdr->gidlen);
@@ -2169,7 +2114,7 @@ RecoverPreparedTransactions(void)
* Recreate its GXACT and dummy PGPROC. But, check whether it
was
* added in redo and already has a shmem entry for it.
*/
- MarkAsPreparingGuts(gxact, xid, gid,
+ MarkAsPreparingGuts(gxact, gxact->fxid, gid,
hdr->prepared_at,
hdr->owner,
hdr->database);
@@ -2184,7 +2129,7 @@ RecoverPreparedTransactions(void)
/*
* Recover other state (notably locks) using resource managers.
*/
- ProcessRecords(bufptr, xid, twophase_recover_callbacks);
+ ProcessRecords(bufptr, fxid, twophase_recover_callbacks);
/*
* Release locks held by the standby process after we process
each
@@ -2192,7 +2137,7 @@ RecoverPreparedTransactions(void)
* additional locks at any one time.
*/
if (InHotStandby)
- StandbyReleaseLockTree(xid, hdr->nsubxacts, subxids);
+ StandbyReleaseLockTree(hdr->xid, hdr->nsubxacts,
subxids);
/*
* We're done with recovering this transaction. Clear
MyLockedGxact,
@@ -2226,20 +2171,22 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
bool setParent, bool setNextXid)
{
FullTransactionId nextXid = TransamVariables->nextXid;
+ TransactionId xid = XidFromFullTransactionId(fxid);
TransactionId *subxids;
char *buf;
TwoPhaseFileHeader *hdr;
int i;
- TransactionId xid = XidFromFullTransactionId(fxid);
+ Assert(InRecovery);
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
if (!fromdisk)
Assert(prepare_start_lsn != InvalidXLogRecPtr);
/*
- * Reject full XID if too new. Note that this discards files from
future
- * epochs.
+ * Reject future XIDs and delete their files; WAL will recreate them if
+ * needed. This is a normal outcome if the base backup process copied
+ * twophase files after a long time copying other files.
*/
if (FullTransactionIdFollowsOrEquals(fxid, nextXid))
{
@@ -2255,13 +2202,23 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
ereport(WARNING,
(errmsg("removing future two-phase
state from memory for transaction %u",
xid)));
- PrepareRedoRemove(xid, true);
+ PrepareRedoRemoveFull(fxid, true);
}
return NULL;
}
- /* Discard files from past epochs */
- if (EpochFromFullTransactionId(fxid) <
EpochFromFullTransactionId(nextXid))
+ /*
+ * Reject XIDs for which we've already trimmed clog. This should not
+ * happen, because redo begins at a moment in the XID assignments before
+ * any pg_twophase file that existed during the base backup.
+ *
+ * Since AssignTransactionId() ensures subxact XIDs follow the top XID,
an
+ * allowable top XID implies allowable subxact XIDs.
+ *
+ * FIXME reimplement not in terms of epochs
+ */
+ if (EpochFromFullTransactionId(fxid) + 1 <
+ EpochFromFullTransactionId(nextXid))
{
if (fromdisk)
{
@@ -2275,13 +2232,14 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
ereport(WARNING,
(errmsg("removing past two-phase state
from memory for transaction %u",
xid)));
- PrepareRedoRemove(xid, true);
+ PrepareRedoRemoveFull(fxid, true);
}
return NULL;
}
- /* Already processed? */
- if (TransactionIdDidCommit(xid) || TransactionIdDidAbort(xid))
+ /* Reject already-resolved XIDs, deleting their files. FIXME can this
happen? */
+ if (TransactionIdDidCommit(XidFromFullTransactionId(fxid)) ||
+ TransactionIdDidAbort(XidFromFullTransactionId(fxid)))
{
if (fromdisk)
{
@@ -2295,7 +2253,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
ereport(WARNING,
(errmsg("removing stale two-phase state
from memory for transaction %u",
xid)));
- PrepareRedoRemove(xid, true);
+ PrepareRedoRemoveFull(fxid, true);
}
return NULL;
}
@@ -2303,7 +2261,7 @@ ProcessTwoPhaseBuffer(FullTransactionId fxid,
if (fromdisk)
{
/* Read and validate file */
- buf = ReadTwoPhaseFile(xid, false);
+ buf = ReadTwoPhaseFile(fxid, false);
}
else
{
@@ -2536,8 +2494,9 @@ RecordTransactionAbortPrepared(TransactionId xid,
* data, the entry is marked as located on disk.
*/
void
-PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
- XLogRecPtr end_lsn, RepOriginId origin_id)
+PrepareRedoAdd(FullTransactionId fxid, char *buf,
+ XLogRecPtr start_lsn, XLogRecPtr end_lsn,
+ RepOriginId origin_id)
{
TwoPhaseFileHeader *hdr = (TwoPhaseFileHeader *) buf;
char *bufptr;
@@ -2545,7 +2504,11 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
GlobalTransaction gxact;
Assert(LWLockHeldByMeInMode(TwoPhaseStateLock, LW_EXCLUSIVE));
- Assert(RecoveryInProgress());
+ Assert(InRecovery);
+
+ if (!FullTransactionIdIsValid(fxid))
+ fxid =
FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
+
hdr->xid);
bufptr = buf + MAXALIGN(sizeof(TwoPhaseFileHeader));
gid = (const char *) bufptr;
@@ -2574,10 +2537,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
if (!XLogRecPtrIsInvalid(start_lsn))
{
char path[MAXPGPATH];
- FullTransactionId fxid;
- /* Use current epoch */
- fxid = FullTransactionIdFromCurrentEpoch(hdr->xid);
+ Assert(InRecovery);
TwoPhaseFilePath(path, fxid);
if (access(path, F_OK) == 0)
@@ -2609,7 +2570,7 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
gxact->prepared_at = hdr->prepared_at;
gxact->prepare_start_lsn = start_lsn;
gxact->prepare_end_lsn = end_lsn;
- gxact->xid = hdr->xid;
+ gxact->fxid = fxid;
gxact->owner = hdr->owner;
gxact->locking_backend = INVALID_PROC_NUMBER;
gxact->valid = false;
@@ -2628,7 +2589,9 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
false /* backward */ , false
/* WAL */ );
}
- elog(DEBUG2, "added 2PC data in shared memory for transaction %u",
gxact->xid);
+ elog(DEBUG2, "added 2PC data in shared memory for transaction %u of
epoch %u",
+ XidFromFullTransactionId(gxact->fxid),
+ EpochFromFullTransactionId(gxact->fxid));
}
/*
@@ -2640,8 +2603,8 @@ PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
* Caller must hold TwoPhaseStateLock in exclusive mode, because TwoPhaseState
* is updated.
*/
-void
-PrepareRedoRemove(TransactionId xid, bool giveWarning)
+static void
+PrepareRedoRemoveFull(FullTransactionId fxid, bool giveWarning)
{
GlobalTransaction gxact = NULL;
int i;
@@ -2654,7 +2617,7 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
{
gxact = TwoPhaseState->prepXacts[i];
- if (gxact->xid == xid)
+ if (FullTransactionIdEquals(gxact->fxid, fxid))
{
Assert(gxact->inredo);
found = true;
@@ -2671,20 +2634,25 @@ PrepareRedoRemove(TransactionId xid, bool giveWarning)
/*
* And now we can clean up any files we may have left.
*/
- elog(DEBUG2, "removing 2PC data for transaction %u", xid);
- if (gxact->ondisk)
- {
- FullTransactionId fxid;
+ elog(DEBUG2, "removing 2PC data for transaction %u of epoch %u ",
+ XidFromFullTransactionId(fxid),
+ EpochFromFullTransactionId(fxid));
- /*
- * We should deal with a file at the current epoch here.
- */
- fxid = FullTransactionIdFromCurrentEpoch(xid);
+ if (gxact->ondisk)
RemoveTwoPhaseFile(fxid, giveWarning);
- }
+
RemoveGXact(gxact);
}
+void
+PrepareRedoRemove(TransactionId xid, bool giveWarning)
+{
+ FullTransactionId fxid =
+ FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
xid);
+
+ PrepareRedoRemoveFull(fxid, giveWarning);
+}
+
/*
* LookupGXact
* Check if the prepared transaction with the given GID, lsn and
timestamp
@@ -2729,7 +2697,7 @@ LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
* between publisher and subscriber.
*/
if (gxact->ondisk)
- buf = ReadTwoPhaseFile(gxact->xid, false);
+ buf = ReadTwoPhaseFile(gxact->fxid, false);
else
{
Assert(gxact->prepare_start_lsn);
diff --git a/src/backend/access/transam/xact.c
b/src/backend/access/transam/xact.c
index d331ab9..8cd0c8b 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2512,7 +2512,7 @@ static void
PrepareTransaction(void)
{
TransactionState s = CurrentTransactionState;
- TransactionId xid = GetCurrentTransactionId();
+ FullTransactionId fxid = GetCurrentFullTransactionId();
GlobalTransaction gxact;
TimestampTz prepared_at;
@@ -2641,7 +2641,7 @@ PrepareTransaction(void)
* Reserve the GID for this transaction. This could fail if the
requested
* GID is invalid or already in use.
*/
- gxact = MarkAsPreparing(xid, prepareGID, prepared_at,
+ gxact = MarkAsPreparing(fxid, prepareGID, prepared_at,
GetUserId(),
MyDatabaseId);
prepareGID = NULL;
@@ -2691,7 +2691,7 @@ PrepareTransaction(void)
* ProcArrayClearTransaction(). Otherwise, a GetLockConflicts() would
* conclude "xact already committed or aborted" for our locks.
*/
- PostPrepare_Locks(xid);
+ PostPrepare_Locks(fxid);
/*
* Let others know about no transaction in progress by me. This has to
be
@@ -2733,9 +2733,9 @@ PrepareTransaction(void)
PostPrepare_smgr();
- PostPrepare_MultiXact(xid);
+ PostPrepare_MultiXact(fxid);
- PostPrepare_PredicateLocks(xid);
+ PostPrepare_PredicateLocks(fxid);
ResourceOwnerRelease(TopTransactionResourceOwner,
RESOURCE_RELEASE_LOCKS,
@@ -6408,7 +6408,8 @@ xact_redo(XLogReaderState *record)
* gxact entry.
*/
LWLockAcquire(TwoPhaseStateLock, LW_EXCLUSIVE);
- PrepareRedoAdd(XLogRecGetData(record),
+ PrepareRedoAdd(InvalidFullTransactionId,
+ XLogRecGetData(record),
record->ReadRecPtr,
record->EndRecPtr,
XLogRecGetOrigin(record));
diff --git a/src/backend/access/transam/xlogreader.c
b/src/backend/access/transam/xlogreader.c
index 3596af0..91b6a91 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -2166,28 +2166,14 @@ RestoreBlockImage(XLogReaderState *record, uint8
block_id, char *page)
FullTransactionId
XLogRecGetFullXid(XLogReaderState *record)
{
- TransactionId xid,
- next_xid;
- uint32 epoch;
-
/*
* This function is only safe during replay, because it depends on the
* replay state. See AdvanceNextFullTransactionIdPastXid() for more.
*/
Assert(AmStartupProcess() || !IsUnderPostmaster);
- xid = XLogRecGetXid(record);
- next_xid = XidFromFullTransactionId(TransamVariables->nextXid);
- epoch = EpochFromFullTransactionId(TransamVariables->nextXid);
-
- /*
- * If xid is numerically greater than next_xid, it has to be from the
last
- * epoch.
- */
- if (unlikely(xid > next_xid))
- --epoch;
-
- return FullTransactionIdFromEpochAndXid(epoch, xid);
+ return FullTransactionIdFromAllowableAt(TransamVariables->nextXid,
+
XLogRecGetXid(record));
}
#endif
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 3e2f98b..e9bac94 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -3476,9 +3476,9 @@ AtPrepare_Locks(void)
* but that probably costs more cycles.
*/
void
-PostPrepare_Locks(TransactionId xid)
+PostPrepare_Locks(FullTransactionId fxid)
{
- PGPROC *newproc = TwoPhaseGetDummyProc(xid, false);
+ PGPROC *newproc = TwoPhaseGetDummyProc(fxid, false);
HASH_SEQ_STATUS status;
LOCALLOCK *locallock;
LOCK *lock;
@@ -4261,11 +4261,11 @@ DumpAllLocks(void)
* and PANIC anyway.
*/
void
-lock_twophase_recover(TransactionId xid, uint16 info,
+lock_twophase_recover(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len)
{
TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
- PGPROC *proc = TwoPhaseGetDummyProc(xid, false);
+ PGPROC *proc = TwoPhaseGetDummyProc(fxid, false);
LOCKTAG *locktag;
LOCKMODE lockmode;
LOCKMETHODID lockmethodid;
@@ -4442,7 +4442,7 @@ lock_twophase_recover(TransactionId xid, uint16 info,
* starting up into hot standby mode.
*/
void
-lock_twophase_standby_recover(TransactionId xid, uint16 info,
+lock_twophase_standby_recover(FullTransactionId fxid, uint16 info,
void *recdata, uint32
len)
{
TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
@@ -4461,7 +4461,7 @@ lock_twophase_standby_recover(TransactionId xid, uint16
info,
if (lockmode == AccessExclusiveLock &&
locktag->locktag_type == LOCKTAG_RELATION)
{
- StandbyAcquireAccessExclusiveLock(xid,
+
StandbyAcquireAccessExclusiveLock(XidFromFullTransactionId(fxid),
locktag->locktag_field1 /* dboid */ ,
locktag->locktag_field2 /* reloid */ );
}
@@ -4474,11 +4474,11 @@ lock_twophase_standby_recover(TransactionId xid, uint16
info,
* Find and release the lock indicated by the 2PC record.
*/
void
-lock_twophase_postcommit(TransactionId xid, uint16 info,
+lock_twophase_postcommit(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len)
{
TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
- PGPROC *proc = TwoPhaseGetDummyProc(xid, true);
+ PGPROC *proc = TwoPhaseGetDummyProc(fxid, true);
LOCKTAG *locktag;
LOCKMETHODID lockmethodid;
LockMethod lockMethodTable;
@@ -4500,10 +4500,10 @@ lock_twophase_postcommit(TransactionId xid, uint16 info,
* This is actually just the same as the COMMIT case.
*/
void
-lock_twophase_postabort(TransactionId xid, uint16 info,
+lock_twophase_postabort(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len)
{
- lock_twophase_postcommit(xid, info, recdata, len);
+ lock_twophase_postcommit(fxid, info, recdata, len);
}
/*
diff --git a/src/backend/storage/lmgr/predicate.c
b/src/backend/storage/lmgr/predicate.c
index 5b21a05..928647d 100644
--- a/src/backend/storage/lmgr/predicate.c
+++ b/src/backend/storage/lmgr/predicate.c
@@ -191,7 +191,7 @@
* AtPrepare_PredicateLocks(void);
* PostPrepare_PredicateLocks(TransactionId xid);
* PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
- * predicatelock_twophase_recover(TransactionId xid, uint16 info,
+ * predicatelock_twophase_recover(FullTransactionId fxid, uint16
info,
* void
*recdata, uint32 len);
*/
@@ -4846,7 +4846,7 @@ AtPrepare_PredicateLocks(void)
* anyway. We only need to clean up our local state.
*/
void
-PostPrepare_PredicateLocks(TransactionId xid)
+PostPrepare_PredicateLocks(FullTransactionId fxid)
{
if (MySerializableXact == InvalidSerializableXact)
return;
@@ -4869,12 +4869,12 @@ PostPrepare_PredicateLocks(TransactionId xid)
* commits or aborts.
*/
void
-PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit)
+PredicateLockTwoPhaseFinish(FullTransactionId fxid, bool isCommit)
{
SERIALIZABLEXID *sxid;
SERIALIZABLEXIDTAG sxidtag;
- sxidtag.xid = xid;
+ sxidtag.xid = XidFromFullTransactionId(fxid);
LWLockAcquire(SerializableXactHashLock, LW_SHARED);
sxid = (SERIALIZABLEXID *)
@@ -4896,10 +4896,11 @@ PredicateLockTwoPhaseFinish(TransactionId xid, bool
isCommit)
* Re-acquire a predicate lock belonging to a transaction that was prepared.
*/
void
-predicatelock_twophase_recover(TransactionId xid, uint16 info,
+predicatelock_twophase_recover(FullTransactionId fxid, uint16 info,
void *recdata,
uint32 len)
{
TwoPhasePredicateRecord *record;
+ TransactionId xid = XidFromFullTransactionId(fxid);
Assert(len == sizeof(TwoPhasePredicateRecord));
diff --git a/src/backend/utils/activity/pgstat_relation.c
b/src/backend/utils/activity/pgstat_relation.c
index 09247ba..8f5945a 100644
--- a/src/backend/utils/activity/pgstat_relation.c
+++ b/src/backend/utils/activity/pgstat_relation.c
@@ -731,7 +731,7 @@ PostPrepare_PgStat_Relations(PgStat_SubXactStatus
*xact_state)
* Load the saved counts into our local pgstats state.
*/
void
-pgstat_twophase_postcommit(TransactionId xid, uint16 info,
+pgstat_twophase_postcommit(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len)
{
TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
@@ -767,7 +767,7 @@ pgstat_twophase_postcommit(TransactionId xid, uint16 info,
* as aborted.
*/
void
-pgstat_twophase_postabort(TransactionId xid, uint16 info,
+pgstat_twophase_postabort(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len)
{
TwoPhasePgStatRecord *rec = (TwoPhasePgStatRecord *) recdata;
diff --git a/src/backend/utils/adt/xid8funcs.c
b/src/backend/utils/adt/xid8funcs.c
index 4736755..20b28b2 100644
--- a/src/backend/utils/adt/xid8funcs.c
+++ b/src/backend/utils/adt/xid8funcs.c
@@ -97,15 +97,11 @@ static bool
TransactionIdInRecentPast(FullTransactionId fxid, TransactionId *extracted_xid)
{
TransactionId xid = XidFromFullTransactionId(fxid);
- uint32 now_epoch;
- TransactionId now_epoch_next_xid;
FullTransactionId now_fullxid;
- TransactionId oldest_xid;
- FullTransactionId oldest_fxid;
+ TransactionId oldest_clog_xid;
+ FullTransactionId oldest_clog_fxid;
now_fullxid = ReadNextFullTransactionId();
- now_epoch_next_xid = XidFromFullTransactionId(now_fullxid);
- now_epoch = EpochFromFullTransactionId(now_fullxid);
if (extracted_xid != NULL)
*extracted_xid = xid;
@@ -135,52 +131,19 @@ TransactionIdInRecentPast(FullTransactionId fxid,
TransactionId *extracted_xid)
/*
* If fxid is not older than TransamVariables->oldestClogXid, the
relevant
- * CLOG entry is guaranteed to still exist. Convert
- * TransamVariables->oldestClogXid into a FullTransactionId to compare
it
- * with fxid. Determine the right epoch knowing that oldest_fxid
- * shouldn't be more than 2^31 older than now_fullxid.
- */
- oldest_xid = TransamVariables->oldestClogXid;
- Assert(TransactionIdPrecedesOrEquals(oldest_xid, now_epoch_next_xid));
- if (oldest_xid <= now_epoch_next_xid)
- {
- oldest_fxid = FullTransactionIdFromEpochAndXid(now_epoch,
oldest_xid);
- }
- else
- {
- Assert(now_epoch > 0);
- oldest_fxid = FullTransactionIdFromEpochAndXid(now_epoch - 1,
oldest_xid);
- }
- return !FullTransactionIdPrecedes(fxid, oldest_fxid);
-}
-
-/*
- * Convert a TransactionId obtained from a snapshot held by the caller to a
- * FullTransactionId. Use next_fxid as a reference FullTransactionId, so that
- * we can compute the high order bits. It must have been obtained by the
- * caller with ReadNextFullTransactionId() after the snapshot was created.
- */
-static FullTransactionId
-widen_snapshot_xid(TransactionId xid, FullTransactionId next_fxid)
-{
- TransactionId next_xid = XidFromFullTransactionId(next_fxid);
- uint32 epoch = EpochFromFullTransactionId(next_fxid);
-
- /* Special transaction ID. */
- if (!TransactionIdIsNormal(xid))
- return FullTransactionIdFromEpochAndXid(0, xid);
-
- /*
- * The 64 bit result must be <= next_fxid, since next_fxid hadn't been
- * issued yet when the snapshot was created. Every TransactionId in the
- * snapshot must therefore be from the same epoch as next_fxid, or the
- * epoch before. We know this because next_fxid is never allow to get
- * more than one epoch ahead of the TransactionIds in any snapshot.
+ * CLOG entry is guaranteed to still exist.
+ *
+ * TransamVariables->oldestXid governs allowable XIDs. Usually,
+ * oldestClogXid==oldestXid. It's also possible for oldestClogXid to
+ * follow oldestXid, in which case oldestXid might advance after our
+ * ReadNextFullTransactionId() call. If oldestXid has advanced, that
+ * advancement reinstated the usual oldestClogXid==oldestXid. Whether
or
+ * not that happened, oldestClogXid is allowable relative to
now_fullxid.
*/
- if (xid > next_xid)
- epoch--;
-
- return FullTransactionIdFromEpochAndXid(epoch, xid);
+ oldest_clog_xid = TransamVariables->oldestClogXid;
+ oldest_clog_fxid =
+ FullTransactionIdFromAllowableAt(now_fullxid, oldest_clog_xid);
+ return !FullTransactionIdPrecedes(fxid, oldest_clog_fxid);
}
/*
@@ -420,12 +383,18 @@ pg_current_snapshot(PG_FUNCTION_ARGS)
nxip = cur->xcnt;
snap = palloc(PG_SNAPSHOT_SIZE(nxip));
- /* fill */
- snap->xmin = widen_snapshot_xid(cur->xmin, next_fxid);
- snap->xmax = widen_snapshot_xid(cur->xmax, next_fxid);
+ /*
+ * Fill. This is the current backend's active snapshot, so MyProc->xmin
+ * is <= all these XIDs. As long as that remains so, oldestXid can't
+ * advance past any of these XIDs. Hence, these XIDs remain allowable
+ * relative to next_fxid.
+ */
+ snap->xmin = FullTransactionIdFromAllowableAt(next_fxid, cur->xmin);
+ snap->xmax = FullTransactionIdFromAllowableAt(next_fxid, cur->xmax);
snap->nxip = nxip;
for (i = 0; i < nxip; i++)
- snap->xip[i] = widen_snapshot_xid(cur->xip[i], next_fxid);
+ snap->xip[i] =
+ FullTransactionIdFromAllowableAt(next_fxid,
cur->xip[i]);
/*
* We want them guaranteed to be in ascending order. This also removes
diff --git a/src/include/access/multixact.h b/src/include/access/multixact.h
index 4e6b0ee..b876e98 100644
--- a/src/include/access/multixact.h
+++ b/src/include/access/multixact.h
@@ -11,6 +11,7 @@
#ifndef MULTIXACT_H
#define MULTIXACT_H
+#include "access/transam.h"
#include "access/xlogreader.h"
#include "lib/stringinfo.h"
#include "storage/sync.h"
@@ -119,7 +120,7 @@ extern int multixactmemberssyncfiletag(const FileTag
*ftag, char *path);
extern void AtEOXact_MultiXact(void);
extern void AtPrepare_MultiXact(void);
-extern void PostPrepare_MultiXact(TransactionId xid);
+extern void PostPrepare_MultiXact(FullTransactionId fxid);
extern Size MultiXactShmemSize(void);
extern void MultiXactShmemInit(void);
@@ -145,11 +146,11 @@ extern void MultiXactAdvanceNextMXact(MultiXactId
minMulti,
extern void MultiXactAdvanceOldest(MultiXactId oldestMulti, Oid oldestMultiDB);
extern int MultiXactMemberFreezeThreshold(void);
-extern void multixact_twophase_recover(TransactionId xid, uint16 info,
+extern void multixact_twophase_recover(FullTransactionId fxid, uint16 info,
void
*recdata, uint32 len);
-extern void multixact_twophase_postcommit(TransactionId xid, uint16 info,
+extern void multixact_twophase_postcommit(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len);
-extern void multixact_twophase_postabort(TransactionId xid, uint16 info,
+extern void multixact_twophase_postabort(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len);
extern void multixact_redo(XLogReaderState *record);
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 0cab865..604c5d0 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -370,6 +370,43 @@ FullTransactionIdNewer(FullTransactionId a,
FullTransactionId b)
return b;
}
+/*
+ * Compute FullTransactionId for the given TransactionId, assuming xid was
+ * between [oldestXid, nextXid] at the time when TransamVariables->nextXid was
+ * nextFullXid. When adding calls, evaluate what prevents xid from preceding
+ * oldestXid if SetTransactionIdLimit() runs between the collection of xid and
+ * the collection of nextFullXid.
+ */
+static inline FullTransactionId
+FullTransactionIdFromAllowableAt(FullTransactionId nextFullXid,
+ TransactionId
xid)
+{
+ uint32 epoch;
+
+ /* Special transaction ID. */
+ if (!TransactionIdIsNormal(xid))
+ return FullTransactionIdFromEpochAndXid(0, xid);
+
+ Assert(TransactionIdPrecedesOrEquals(xid,
+
XidFromFullTransactionId(nextFullXid)));
+
+ /*
+ * The 64 bit result must be <= nextFullXid, since nextFullXid hadn't
been
+ * issued yet when xid was in the past. The xid must therefore be from
+ * the epoch of nextFullXid or the epoch before. We know this because
we
+ * must remove (by freezing) an XID before assigning the XID half an
epoch
+ * ahead of it.
+ */
+ epoch = EpochFromFullTransactionId(nextFullXid);
+ if (xid > XidFromFullTransactionId(nextFullXid))
+ {
+ Assert(epoch != 0);
+ epoch--;
+ }
+
+ return FullTransactionIdFromEpochAndXid(epoch, xid);
+}
+
#endif /* FRONTEND */
#endif /* TRANSAM_H */
diff --git a/src/include/access/twophase.h b/src/include/access/twophase.h
index 9fa8235..0ab8b3e 100644
--- a/src/include/access/twophase.h
+++ b/src/include/access/twophase.h
@@ -14,6 +14,7 @@
#ifndef TWOPHASE_H
#define TWOPHASE_H
+#include "access/transam.h"
#include "access/xact.h"
#include "access/xlogdefs.h"
#include "datatype/timestamp.h"
@@ -36,10 +37,10 @@ extern void PostPrepare_Twophase(void);
extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
bool *have_more);
-extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held);
-extern int TwoPhaseGetDummyProcNumber(TransactionId xid, bool lock_held);
+extern PGPROC *TwoPhaseGetDummyProc(FullTransactionId fxid, bool lock_held);
+extern int TwoPhaseGetDummyProcNumber(FullTransactionId fxid, bool
lock_held);
-extern GlobalTransaction MarkAsPreparing(TransactionId xid, const char *gid,
+extern GlobalTransaction MarkAsPreparing(FullTransactionId fxid, const char
*gid,
TimestampTz prepared_at,
Oid owner, Oid databaseid);
@@ -56,8 +57,9 @@ extern void CheckPointTwoPhase(XLogRecPtr redo_horizon);
extern void FinishPreparedTransaction(const char *gid, bool isCommit);
-extern void PrepareRedoAdd(char *buf, XLogRecPtr start_lsn,
- XLogRecPtr end_lsn,
RepOriginId origin_id);
+extern void PrepareRedoAdd(FullTransactionId fxid, char *buf,
+ XLogRecPtr start_lsn,
XLogRecPtr end_lsn,
+ RepOriginId origin_id);
extern void PrepareRedoRemove(TransactionId xid, bool giveWarning);
extern void restoreTwoPhaseData(void);
extern bool LookupGXact(const char *gid, XLogRecPtr prepare_end_lsn,
diff --git a/src/include/access/twophase_rmgr.h
b/src/include/access/twophase_rmgr.h
index 3ed154b..8f57640 100644
--- a/src/include/access/twophase_rmgr.h
+++ b/src/include/access/twophase_rmgr.h
@@ -14,7 +14,9 @@
#ifndef TWOPHASE_RMGR_H
#define TWOPHASE_RMGR_H
-typedef void (*TwoPhaseCallback) (TransactionId xid, uint16 info,
+#include "access/transam.h"
+
+typedef void (*TwoPhaseCallback) (FullTransactionId fxid, uint16 info,
void
*recdata, uint32 len);
typedef uint8 TwoPhaseRmgrId;
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 6475889..a8c09de 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -734,9 +734,9 @@ extern void pgstat_count_heap_delete(Relation rel);
extern void pgstat_count_truncate(Relation rel);
extern void pgstat_update_heap_dead_tuples(Relation rel, int delta);
-extern void pgstat_twophase_postcommit(TransactionId xid, uint16 info,
+extern void pgstat_twophase_postcommit(FullTransactionId fxid, uint16 info,
void
*recdata, uint32 len);
-extern void pgstat_twophase_postabort(TransactionId xid, uint16 info,
+extern void pgstat_twophase_postabort(FullTransactionId fxid, uint16 info,
void
*recdata, uint32 len);
extern PgStat_StatTabEntry *pgstat_fetch_stat_tabentry(Oid relid);
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 1076995..3feedfc 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -18,6 +18,7 @@
#error "lock.h may not be included from frontend code"
#endif
+#include "access/transam.h"
#include "lib/ilist.h"
#include "storage/lockdefs.h"
#include "storage/lwlock.h"
@@ -579,7 +580,7 @@ extern bool LockHasWaiters(const LOCKTAG *locktag,
extern VirtualTransactionId *GetLockConflicts(const LOCKTAG *locktag,
LOCKMODE lockmode, int *countp);
extern void AtPrepare_Locks(void);
-extern void PostPrepare_Locks(TransactionId xid);
+extern void PostPrepare_Locks(FullTransactionId fxid);
extern bool LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock, PROCLOCK
*proclock);
@@ -594,13 +595,13 @@ extern BlockedProcsData *GetBlockerStatusData(int
blocked_pid);
extern xl_standby_lock *GetRunningTransactionLocks(int *nlocks);
extern const char *GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode);
-extern void lock_twophase_recover(TransactionId xid, uint16 info,
+extern void lock_twophase_recover(FullTransactionId fxid, uint16 info,
void
*recdata, uint32 len);
-extern void lock_twophase_postcommit(TransactionId xid, uint16 info,
+extern void lock_twophase_postcommit(FullTransactionId fxid, uint16 info,
void
*recdata, uint32 len);
-extern void lock_twophase_postabort(TransactionId xid, uint16 info,
+extern void lock_twophase_postabort(FullTransactionId fxid, uint16 info,
void
*recdata, uint32 len);
-extern void lock_twophase_standby_recover(TransactionId xid, uint16 info,
+extern void lock_twophase_standby_recover(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len);
extern DeadLockState DeadLockCheck(PGPROC *proc);
diff --git a/src/include/storage/predicate.h b/src/include/storage/predicate.h
index 267d5d9..4d3f218 100644
--- a/src/include/storage/predicate.h
+++ b/src/include/storage/predicate.h
@@ -14,6 +14,7 @@
#ifndef PREDICATE_H
#define PREDICATE_H
+#include "access/transam.h"
#include "storage/itemptr.h"
#include "storage/lock.h"
#include "utils/relcache.h"
@@ -72,9 +73,9 @@ extern void PreCommit_CheckForSerializationFailure(void);
/* two-phase commit support */
extern void AtPrepare_PredicateLocks(void);
-extern void PostPrepare_PredicateLocks(TransactionId xid);
-extern void PredicateLockTwoPhaseFinish(TransactionId xid, bool isCommit);
-extern void predicatelock_twophase_recover(TransactionId xid, uint16 info,
+extern void PostPrepare_PredicateLocks(FullTransactionId fxid);
+extern void PredicateLockTwoPhaseFinish(FullTransactionId xid, bool isCommit);
+extern void predicatelock_twophase_recover(FullTransactionId fxid, uint16 info,
void *recdata, uint32 len);
/* parallel query support */