On 25 January 2017 at 13:44, Craig Ringer <cr...@2ndquadrant.com> wrote:
> On 24 January 2017 at 23:49, Robert Haas <robertmh...@gmail.com> wrote:
>
>> I think it's fairly surprising that TruncateCLOG() has responsibility
>> for writing the xlog record that protects advancing
>> ShmemVariableCache->oldestXid, but not the responsibility for actually
>> advancing that value.  In other words, I think the AdvanceOldestXid()
>> call in vac_truncate_clog() should be moved into TruncateClog().
>> (Similarly, one wonders why AdvanceOldestCommitTsXid() isn't the
>> responsibility of TruncateCommitTs().)
>
> Agreed, and done in attached.
>
> I haven't done the same for commit_ts but will do so on the thread for
> the commit_ts race fix if we go ahead with this change here.
>
>> I think it is not correct to advance oldestXid but not oldestXidDB.
>> Otherwise, GetNewTransactionId() might complain about the wrong
>> database.
>
> That's a clear oversight. Fixed in attached.

New attached also records it in xlog and applies it to the standby.

If we want to save the 4 bytes per xmin advance (probably not worth
caring) we can instead skip setting it on the standby, in which case
it'll be potentially wrong until the next checkpoint. I'd rather make
sure it stays correct.


-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From ce69d2824f89f675a25e098b3980a304baca7ade Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 23 Jan 2017 13:25:30 +0800
Subject: [PATCH 1/2] Fix race between clog truncation and lookup

There was previously no way to look up an arbitrary xid without
running the risk of having clog truncated out from under you.

This hasn 't been a problem because anything looking up xids in clog knows
they're protected by datminxid, but that's not the case for arbitrary
user-supplied XIDs. clog was truncated before we advanced oldestXid under
XidGenLock, so holding XidGenLock during a clog lookup was insufficient to
prevent the race. There's no way to look up a SLRU with soft-failure;
attempting a lookup produces an I/O error. There's also no safe way to trap and
swallow the SLRU lookup error due mainly to locking issues.

To address this, increase oldestXid under XidGenLock before we trunate clog
rather than after, so concurrent lookups of arbitrary XIDs are safe if they are
done under XidGenLock. The rest of the xid limits are still advanced after clog
truncation to ensure there's no chance of a new xid trying to access an
about-to-be-truncated clog page. In practice this can't happen anyway since we
use only half the xid space at any time, but additional guards against future
change are warranted with something this crucial.

This race also exists in a worse form on standby servers. On a standby we only
advance oldestXid when we replay the next checkpoint, so there's a much larger
window between clog truncation and subsequent updating of the limit. Fix this
by recording the oldest xid in clog truncation records and applying the
oldestXid under XidGenLock before replaying the clog truncation.

Note that There's no need to take XidGenLock for normal clog lookups protected
by datfrozenxid, only if accepting arbitrary XIDs that might not be protected by
vacuum thresholds.
---
 src/backend/access/rmgrdesc/clogdesc.c | 12 ++++++--
 src/backend/access/transam/clog.c      | 50 +++++++++++++++++++++++++++-------
 src/backend/access/transam/varsup.c    | 49 ++++++++++++++++++++++++++-------
 src/backend/access/transam/xlog.c      | 22 ++++++++++-----
 src/backend/commands/vacuum.c          |  4 +--
 src/include/access/clog.h              |  8 +++++-
 src/include/access/transam.h           |  4 +--
 7 files changed, 115 insertions(+), 34 deletions(-)

diff --git a/src/backend/access/rmgrdesc/clogdesc.c b/src/backend/access/rmgrdesc/clogdesc.c
index 352de48..ef268c5 100644
--- a/src/backend/access/rmgrdesc/clogdesc.c
+++ b/src/backend/access/rmgrdesc/clogdesc.c
@@ -23,12 +23,20 @@ clog_desc(StringInfo buf, XLogReaderState *record)
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info == CLOG_ZEROPAGE || info == CLOG_TRUNCATE)
+	if (info == CLOG_ZEROPAGE)
 	{
 		int			pageno;
 
 		memcpy(&pageno, rec, sizeof(int));
-		appendStringInfo(buf, "%d", pageno);
+		appendStringInfo(buf, "page %d", pageno);
+	}
+	else if (info == CLOG_TRUNCATE)
+	{
+		xl_clog_truncate xlrec;
+
+		memcpy(&xlrec, rec, sizeof(xl_clog_truncate));
+		appendStringInfo(buf, "page %d; oldestXact %u",
+			xlrec.pageno, xlrec.oldestXact);
 	}
 }
 
diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c
index 1a43819..cfd1c91 100644
--- a/src/backend/access/transam/clog.c
+++ b/src/backend/access/transam/clog.c
@@ -83,7 +83,8 @@ static SlruCtlData ClogCtlData;
 static int	ZeroCLOGPage(int pageno, bool writeXlog);
 static bool CLOGPagePrecedes(int page1, int page2);
 static void WriteZeroPageXlogRec(int pageno);
-static void WriteTruncateXlogRec(int pageno);
+static void WriteTruncateXlogRec(int pageno, TransactionId oldestXact,
+								 Oid oldestXidDb);
 static void TransactionIdSetPageStatus(TransactionId xid, int nsubxids,
 						   TransactionId *subxids, XidStatus status,
 						   XLogRecPtr lsn, int pageno);
@@ -640,11 +641,24 @@ ExtendCLOG(TransactionId newestXact)
  * the XLOG flush unless we have confirmed that there is a removable segment.
  */
 void
-TruncateCLOG(TransactionId oldestXact)
+TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid)
 {
 	int			cutoffPage;
 
 	/*
+	 * Advance oldestXid before truncating clog, so concurrent xact status
+	 * lookups can ensure they don't attempt to access truncated-away clog.
+	 *
+	 * We must do this even if we find we can't actually truncate away any clog
+	 * pages, since we'll advance the xid limits and need oldestXmin to be
+	 * consistent with the new limits.
+	 *
+	 * Losing this on crash before a checkpoint is harmless unless we truncated
+	 * clog, in which case redo of the clog truncation will re-apply it.
+	 */
+	AdvanceOldestXid(oldestXact, oldestxid_datoid);
+
+	/*
 	 * The cutoff point is the start of the segment containing oldestXact. We
 	 * pass the *page* containing oldestXact to SimpleLruTruncate.
 	 */
@@ -654,8 +668,17 @@ TruncateCLOG(TransactionId oldestXact)
 	if (!SlruScanDirectory(ClogCtl, SlruScanDirCbReportPresence, &cutoffPage))
 		return;					/* nothing to remove */
 
-	/* Write XLOG record and flush XLOG to disk */
-	WriteTruncateXlogRec(cutoffPage);
+	/* vac_truncate_clog already advanced oldestXid */
+	Assert(TransactionIdPrecedesOrEquals(oldestXact,
+		   ShmemVariableCache->oldestXid));
+
+	/*
+	 * Write XLOG record and flush XLOG to disk. We record the oldest xid we're
+	 * keeping information about here so we can ensure that it's always ahead
+	 * of clog truncation in case we crash, and so a standby finds out the new
+	 * valid xid before the next checkpoint.
+	 */
+	WriteTruncateXlogRec(cutoffPage, oldestXact, oldestxid_datoid);
 
 	/* Now we can remove the old CLOG segment(s) */
 	SimpleLruTruncate(ClogCtl, cutoffPage);
@@ -704,12 +727,17 @@ WriteZeroPageXlogRec(int pageno)
  * in TruncateCLOG().
  */
 static void
-WriteTruncateXlogRec(int pageno)
+WriteTruncateXlogRec(int pageno, TransactionId oldestXact, Oid oldestXactDb)
 {
 	XLogRecPtr	recptr;
+	xl_clog_truncate xlrec;
+
+	xlrec.pageno = pageno;
+	xlrec.oldestXact = oldestXact;
+	xlrec.oldestXactDb = oldestXactDb;
 
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&pageno), sizeof(int));
+	XLogRegisterData((char *) (&xlrec), sizeof(xl_clog_truncate));
 	recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE);
 	XLogFlush(recptr);
 }
@@ -742,17 +770,19 @@ clog_redo(XLogReaderState *record)
 	}
 	else if (info == CLOG_TRUNCATE)
 	{
-		int			pageno;
+		xl_clog_truncate xlrec;
 
-		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+		memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_clog_truncate));
 
 		/*
 		 * During XLOG replay, latest_page_number isn't set up yet; insert a
 		 * suitable value to bypass the sanity test in SimpleLruTruncate.
 		 */
-		ClogCtl->shared->latest_page_number = pageno;
+		ClogCtl->shared->latest_page_number = xlrec.pageno;
+
+		AdvanceOldestXid(xlrec.oldestXact, xlrec.oldestXactDb);
 
-		SimpleLruTruncate(ClogCtl, pageno);
+		SimpleLruTruncate(ClogCtl, xlrec.pageno);
 	}
 	else
 		elog(PANIC, "clog_redo: unknown op code %u", info);
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index fc084c5..9aacb0f 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -259,19 +259,52 @@ ReadNewTransactionId(void)
 }
 
 /*
- * Determine the last safe XID to allocate given the currently oldest
+ * Advance the cluster-wide oldestXid.
+ *
+ * We must ensure that this never goes backwards, otherwise the xid limits set
+ * in SetTransactionIdLimit(...) could be insufficiently conservative if two
+ * vacuums race, with the lower oldestXmin winning then the higher xid limits
+ * winning.
+ */
+void
+AdvanceOldestXid(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
+{
+	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
+	if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
+		oldest_datfrozenxid))
+	{
+		ShmemVariableCache->oldestXid = oldest_datfrozenxid;
+		ShmemVariableCache->oldestXidDB = oldest_datoid;
+	}
+	LWLockRelease(XidGenLock);
+}
+
+/*
+ * Determine the last safe XID to allocate using the currently oldest
  * datfrozenxid (ie, the oldest XID that might exist in any database
  * of our cluster), and the OID of the (or a) database with that value.
  */
 void
-SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
+SetTransactionIdLimit(void)
 {
 	TransactionId xidVacLimit;
 	TransactionId xidWarnLimit;
 	TransactionId xidStopLimit;
 	TransactionId xidWrapLimit;
 	TransactionId curXid;
+	Oid			  oldest_datoid;
+	TransactionId oldest_datfrozenxid;
+
+	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
 
+	/*
+	 * We fetch oldest_datfrozenxid and oldest_datoid from shmem to ensure that
+	 * if some other vacuum has advanced oldestXid since we did, we use the
+	 * latest values and the limits never go backwards.
+	 */
+	oldest_datfrozenxid = ShmemVariableCache->oldestXid;
+	oldest_datoid = ShmemVariableCache->oldestXidDB;
+	
 	Assert(TransactionIdIsNormal(oldest_datfrozenxid));
 
 	/*
@@ -284,6 +317,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidWrapLimit = oldest_datfrozenxid + (MaxTransactionId >> 1);
 	if (xidWrapLimit < FirstNormalTransactionId)
 		xidWrapLimit += FirstNormalTransactionId;
+	ShmemVariableCache->xidWrapLimit = xidWrapLimit;
 
 	/*
 	 * We'll refuse to continue assigning XIDs in interactive mode once we get
@@ -296,6 +330,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidStopLimit = xidWrapLimit - 1000000;
 	if (xidStopLimit < FirstNormalTransactionId)
 		xidStopLimit -= FirstNormalTransactionId;
+	ShmemVariableCache->xidStopLimit = xidStopLimit;
 
 	/*
 	 * We'll start complaining loudly when we get within 10M transactions of
@@ -310,6 +345,7 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidWarnLimit = xidStopLimit - 10000000;
 	if (xidWarnLimit < FirstNormalTransactionId)
 		xidWarnLimit -= FirstNormalTransactionId;
+	ShmemVariableCache->xidWarnLimit = xidWarnLimit;
 
 	/*
 	 * We'll start trying to force autovacuums when oldest_datfrozenxid gets
@@ -329,15 +365,8 @@ SetTransactionIdLimit(TransactionId oldest_datfrozenxid, Oid oldest_datoid)
 	xidVacLimit = oldest_datfrozenxid + autovacuum_freeze_max_age;
 	if (xidVacLimit < FirstNormalTransactionId)
 		xidVacLimit += FirstNormalTransactionId;
-
-	/* Grab lock for just long enough to set the new limit values */
-	LWLockAcquire(XidGenLock, LW_EXCLUSIVE);
-	ShmemVariableCache->oldestXid = oldest_datfrozenxid;
 	ShmemVariableCache->xidVacLimit = xidVacLimit;
-	ShmemVariableCache->xidWarnLimit = xidWarnLimit;
-	ShmemVariableCache->xidStopLimit = xidStopLimit;
-	ShmemVariableCache->xidWrapLimit = xidWrapLimit;
-	ShmemVariableCache->oldestXidDB = oldest_datoid;
+
 	curXid = ShmemVariableCache->nextXid;
 	LWLockRelease(XidGenLock);
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2f5d603..823f4bd 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -4877,7 +4877,8 @@ BootStrapXLOG(void)
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
 	ShmemVariableCache->oidCount = 0;
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
-	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	AdvanceOldestXid(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetTransactionIdLimit();
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 	SetCommitTsLimit(InvalidTransactionId, InvalidTransactionId);
 
@@ -6471,7 +6472,8 @@ StartupXLOG(void)
 	ShmemVariableCache->nextOid = checkPoint.nextOid;
 	ShmemVariableCache->oidCount = 0;
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
-	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	AdvanceOldestXid(checkPoint.oldestXid, checkPoint.oldestXidDB);
+	SetTransactionIdLimit();
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
 	SetCommitTsLimit(checkPoint.oldestCommitTsXid,
 					 checkPoint.newestCommitTsXid);
@@ -9456,7 +9458,11 @@ xlog_redo(XLogReaderState *record)
 
 		MultiXactAdvanceOldest(checkPoint.oldestMulti,
 							   checkPoint.oldestMultiDB);
-		SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
+		/*
+		 * No need to call AdvanceOldestXid, startup or an earlier clog trunate
+		 * record will have already advanced it. Just advance the limits.
+		 */
+		SetTransactionIdLimit();
 
 		/*
 		 * If we see a shutdown checkpoint while waiting for an end-of-backup
@@ -9553,10 +9559,12 @@ xlog_redo(XLogReaderState *record)
 		 */
 		MultiXactAdvanceOldest(checkPoint.oldestMulti,
 							   checkPoint.oldestMultiDB);
-		if (TransactionIdPrecedes(ShmemVariableCache->oldestXid,
-								  checkPoint.oldestXid))
-			SetTransactionIdLimit(checkPoint.oldestXid,
-								  checkPoint.oldestXidDB);
+		/*
+		 * We don't need to AdvanceOldestXid here; StartupXLOG or a clog
+		 * truncation record will ensure it's up to date, and we just update
+		 * the limits here based on what's already in shmem.
+		 */
+		SetTransactionIdLimit();
 		/* ControlFile->checkPointCopy always tracks the latest ckpt XID */
 		ControlFile->checkPointCopy.nextXidEpoch = checkPoint.nextXidEpoch;
 		ControlFile->checkPointCopy.nextXid = checkPoint.nextXid;
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 812fb4a..5ec307f 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -1164,7 +1164,7 @@ vac_truncate_clog(TransactionId frozenXID,
 	/*
 	 * Truncate CLOG, multixact and CommitTs to the oldest computed value.
 	 */
-	TruncateCLOG(frozenXID);
+	TruncateCLOG(frozenXID, oldestxid_datoid);
 	TruncateCommitTs(frozenXID);
 	TruncateMultiXact(minMulti, minmulti_datoid);
 
@@ -1174,7 +1174,7 @@ vac_truncate_clog(TransactionId frozenXID,
 	 * for an(other) autovac cycle if needed.   XXX should we avoid possibly
 	 * signalling twice?
 	 */
-	SetTransactionIdLimit(frozenXID, oldestxid_datoid);
+	SetTransactionIdLimit();
 	SetMultiXactIdLimit(minMulti, minmulti_datoid);
 }
 
diff --git a/src/include/access/clog.h b/src/include/access/clog.h
index 2894bd5..60a9e11 100644
--- a/src/include/access/clog.h
+++ b/src/include/access/clog.h
@@ -28,6 +28,12 @@ typedef int XidStatus;
 #define TRANSACTION_STATUS_ABORTED			0x02
 #define TRANSACTION_STATUS_SUB_COMMITTED	0x03
 
+typedef struct xl_clog_truncate
+{
+	int pageno;
+	TransactionId oldestXact;
+	Oid oldestXactDb;
+} xl_clog_truncate;
 
 extern void TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,
 				   TransactionId *subxids, XidStatus status, XLogRecPtr lsn);
@@ -42,7 +48,7 @@ extern void TrimCLOG(void);
 extern void ShutdownCLOG(void);
 extern void CheckPointCLOG(void);
 extern void ExtendCLOG(TransactionId newestXact);
-extern void TruncateCLOG(TransactionId oldestXact);
+extern void TruncateCLOG(TransactionId oldestXact, Oid oldestxid_datoid);
 
 /* XLOG stuff */
 #define CLOG_ZEROPAGE		0x00
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 522c104..95cdef1 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -171,8 +171,8 @@ extern XLogRecPtr TransactionIdGetCommitLSN(TransactionId xid);
 /* in transam/varsup.c */
 extern TransactionId GetNewTransactionId(bool isSubXact);
 extern TransactionId ReadNewTransactionId(void);
-extern void SetTransactionIdLimit(TransactionId oldest_datfrozenxid,
-					  Oid oldest_datoid);
+extern void AdvanceOldestXid(TransactionId oldest_datfrozenxid, Oid oldest_datoid);
+extern void SetTransactionIdLimit(void);
 extern bool ForceTransactionIdLimitUpdate(void);
 extern Oid	GetNewObjectId(void);
 
-- 
2.5.5

From c8f517e63762c9f89eaa3aa80c24e5668e801b8e Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 23 Jan 2017 13:34:02 +0800
Subject: [PATCH 2/2] Introduce txid_status(bigint) to get status of an xact

If an application loses its connection while a COMMIT request is in
flight, the backend crashes mid-commit, etc, then an application may
not be sure whether or not a commit completed successfully or was
rolled back. While two-phase commit solves this it does so at a
considerable overhead, so introduce a lighter alternative.

txid_status(bigint) lets an application determine the status of a a
commit based on an xid-with-epoch as returned by txid_current() or
similar. Status may be committed, aborted, in-progress (including
prepared xacts) or null if the xact is too old for its commit status
to still be retained because it has passed the wrap-around epoch
boundary.

Applications must call txid_current() in their transactions to make
much use of this since PostgreSQL does not automatically report an xid
to the client when one is assigned.

Introduces TransactionIdInRecentPast(...) for the use of other
functions that need similar logic in future.

Authors: Craig Ringer, Robert Haas
---
 doc/src/sgml/func.sgml             |  27 ++++++
 src/backend/utils/adt/txid.c       | 116 ++++++++++++++++++++++++++
 src/include/catalog/pg_proc.h      |   2 +
 src/include/utils/builtins.h       | 164 +++++++++++++++++++++++++++++++++++++
 src/test/regress/expected/txid.out |  68 +++++++++++++++
 src/test/regress/sql/txid.sql      |  38 +++++++++
 6 files changed, 415 insertions(+)

diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index b214218..0095915 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17190,6 +17190,10 @@ SELECT collation for ('foo' COLLATE "de_DE");
     <primary>txid_visible_in_snapshot</primary>
    </indexterm>
 
+   <indexterm>
+    <primary>txid_status</primary>
+   </indexterm>
+
    <para>
     The functions shown in <xref linkend="functions-txid-snapshot">
     provide server transaction information in an exportable form.  The main
@@ -17240,6 +17244,11 @@ SELECT collation for ('foo' COLLATE "de_DE");
        <entry><type>boolean</type></entry>
        <entry>is transaction ID visible in snapshot? (do not use with subtransaction ids)</entry>
       </row>
+      <row>
+       <entry><literal><function>txid_status(<parameter>bigint</parameter>)</function></literal></entry>
+       <entry><type>txid_status</type></entry>
+       <entry>report the status of the given xact - <literal>committed</literal>, <literal>aborted</literal>, <literal>in progress</literal>, or NULL if the xid is too old</entry>
+      </row>
      </tbody>
     </tgroup>
    </table>
@@ -17310,6 +17319,24 @@ SELECT collation for ('foo' COLLATE "de_DE");
    </para>
 
    <para>
+    <function>txid_status(bigint)</> reports the commit status of a recent
+    transaction.  Applications may use it to determine whether a transaction
+    committed or aborted when the application and database server become
+    disconnected while a <literal>COMMIT</literal> is in progress.
+    The status of a transaction will be reported as either
+    <literal>in progress</>,
+    <literal>committed</>, or <literal>aborted</>, provided that the
+    transaction is recent enough that the system retains the commit status
+    of that transaction.  If is old enough that no references to that
+    transaction survive in the system and the commit status information has
+    been discarded, this function will return NULL.  Note that prepared
+    transactions are reported as <literal>in progress</>; applications must
+    check <link
+    linkend="view-pg-prepared-xacts"><literal>pg_prepared_xacts</></> if they
+    need to determine whether the xid is a prepared transaction.
+   </para>
+
+   <para>
     The functions shown in <xref linkend="functions-commit-timestamp">
     provide information about transactions that have been already committed.
     These functions mainly provide information about when the transactions
diff --git a/src/backend/utils/adt/txid.c b/src/backend/utils/adt/txid.c
index 772d7c7..8f86916 100644
--- a/src/backend/utils/adt/txid.c
+++ b/src/backend/utils/adt/txid.c
@@ -21,6 +21,7 @@
 
 #include "postgres.h"
 
+#include "access/clog.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
@@ -28,6 +29,7 @@
 #include "miscadmin.h"
 #include "libpq/pqformat.h"
 #include "postmaster/postmaster.h"
+#include "storage/lwlock.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/snapmgr.h"
@@ -93,6 +95,63 @@ load_xid_epoch(TxidEpoch *state)
 }
 
 /*
+ * Helper to get a TransactionId from a 64-bit xid with wraparound detection.
+ *
+ * It is an ERROR if the xid is in the future.  Otherwise, returns true if
+ * the transaction is still new enough that we can determine whether it
+ * committed and false otherwise.  If *extracted_xid is not NULL, it is set
+ * to the low 32 bits of the transaction ID (i.e. the actual XID, without the
+ * epoch).
+ */
+static bool
+TransactionIdInRecentPast(uint64 xid_with_epoch, TransactionId *extracted_xid)
+{
+	uint32		xid_epoch = (uint32) (xid_with_epoch >> 32);
+	TransactionId xid = (TransactionId) xid_with_epoch;
+	uint32		now_epoch;
+	TransactionId now_epoch_last_xid;
+
+	GetNextXidAndEpoch(&now_epoch_last_xid, &now_epoch);
+
+	if (extracted_xid != NULL)
+		*extracted_xid = xid;
+
+	/* For non-normal transaction IDs, we can ignore the epoch. */
+	if (!TransactionIdIsNormal(xid))
+		return true;
+
+	/* If the transaction ID is in the future, throw an error. */
+	if (xid_epoch > now_epoch
+		|| (xid_epoch == now_epoch && xid > now_epoch_last_xid))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("transaction ID " UINT64_FORMAT " is in the future",
+						xid_with_epoch)));
+
+	/*
+	 * ShmemVariableCache->oldestXid is protected by XidGenLock, but we don't
+	 * acquire that lock here.  Instead, we require the caller to acquire it,
+	 * because the caller is presumably going to look up the returned XID.
+	 * If we took and released the lock within this function, a CLOG
+	 * truncation could occur before the caller finished with the XID.
+	 */
+	Assert(LWLockHeldByMe(XidGenLock));
+
+	/*
+	 * If the transaction ID has wrapped around, it's definitely too old to
+	 * determine the commit status.  Otherwise, we can compare it to
+	 * ShmemVariableCache->oldestXid to determine whether the relevant CLOG
+	 * entry is guaranteed to still exist.
+	 */
+	if (xid_epoch + 1 < now_epoch
+		|| (xid_epoch + 1 == now_epoch && xid < now_epoch_last_xid)
+		|| TransactionIdPrecedes(xid, ShmemVariableCache->oldestXid))
+		return false;
+
+	return true;
+}
+
+/*
  * do a TransactionId -> txid conversion for an XID near the given epoch
  */
 static txid
@@ -354,6 +413,9 @@ bad_format:
  *
  *	Return the current toplevel transaction ID as TXID
  *	If the current transaction does not have one, one is assigned.
+ *
+ *	This value has the epoch as the high 32 bits and the 32-bit xid
+ *	as the low 32 bits.
  */
 Datum
 txid_current(PG_FUNCTION_ARGS)
@@ -658,3 +720,57 @@ txid_snapshot_xip(PG_FUNCTION_ARGS)
 		SRF_RETURN_DONE(fctx);
 	}
 }
+
+/*
+ * Report the status of a recent transaction ID, or null for wrapped,
+ * truncated away or otherwise too old XIDs.
+ */
+Datum
+txid_status(PG_FUNCTION_ARGS)
+{
+	const char	   *status;
+	uint64			xid_with_epoch = PG_GETARG_INT64(0);
+	TransactionId	xid;
+
+	/*
+	 * Ensure clog isn't removed between when we check whether the current xid
+	 * is valid and when we look its status up.
+	 */
+	LWLockAcquire(XidGenLock, LW_SHARED);
+	if (TransactionIdInRecentPast(xid_with_epoch, &xid))
+	{
+		if (!TransactionIdIsValid(xid))
+		{
+			LWLockRelease(XidGenLock);
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("transaction ID " UINT64_FORMAT " is an invalid xid",
+						xid_with_epoch)));
+		}
+
+		if (TransactionIdIsCurrentTransactionId(xid))
+			status = gettext_noop("in progress");
+		else if (TransactionIdDidCommit(xid))
+			status = gettext_noop("committed");
+		else if (TransactionIdDidAbort(xid))
+			status = gettext_noop("aborted");
+		else
+
+			/*
+			 * can't test TransactionIdIsInProgress here or we race with
+			 * concurrent commit/abort. There's no point anyway, since it
+			 * might then commit/abort just after we check.
+			 */
+			status = gettext_noop("in progress");
+	}
+	else
+	{
+		status = NULL;
+	}
+	LWLockRelease(XidGenLock);
+
+	if (status == NULL)
+		PG_RETURN_NULL();
+	else
+		PG_RETURN_TEXT_P(cstring_to_text(status));
+}
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index ab12761..912cb5a 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -4936,6 +4936,8 @@ DATA(insert OID = 2947 (  txid_snapshot_xip			PGNSP PGUID 12 1 50 0 0 f f f f t
 DESCR("get set of in-progress txids in snapshot");
 DATA(insert OID = 2948 (  txid_visible_in_snapshot	PGNSP PGUID 12 1  0 0 0 f f f f t f i s 2 0 16 "20 2970" _null_ _null_ _null_ _null_ _null_ txid_visible_in_snapshot _null_ _null_ _null_ ));
 DESCR("is txid visible in snapshot?");
+DATA(insert OID = 3360 (  txid_status				PGNSP PGUID 12 1  0 0 0 f f f f t f v s 1 0 25 "20" _null_ _null_ _null_ _null_ _null_ txid_status _null_ _null_ _null_ ));
+DESCR("commit status of transaction");
 
 /* record comparison using normal comparison rules */
 DATA(insert OID = 2981 (  record_eq		   PGNSP PGUID 12 1 0 0 0 f f f f t f i s 2 0 16 "2249 2249" _null_ _null_ _null_ _null_ _null_ record_eq _null_ _null_ _null_ ));
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 5bdca82..26f76ca 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -119,5 +119,169 @@ extern int32 type_maximum_size(Oid type_oid, int32 typemod);
 
 /* quote.c */
 extern char *quote_literal_cstr(const char *rawstr);
+extern Datum quote_nullable(PG_FUNCTION_ARGS);
+
+/* guc.c */
+extern Datum show_config_by_name(PG_FUNCTION_ARGS);
+extern Datum show_config_by_name_missing_ok(PG_FUNCTION_ARGS);
+extern Datum set_config_by_name(PG_FUNCTION_ARGS);
+extern Datum show_all_settings(PG_FUNCTION_ARGS);
+extern Datum show_all_file_settings(PG_FUNCTION_ARGS);
+
+/* pg_config.c */
+extern Datum pg_config(PG_FUNCTION_ARGS);
+
+/* pg_controldata.c */
+extern Datum pg_control_checkpoint(PG_FUNCTION_ARGS);
+extern Datum pg_control_system(PG_FUNCTION_ARGS);
+extern Datum pg_control_init(PG_FUNCTION_ARGS);
+extern Datum pg_control_recovery(PG_FUNCTION_ARGS);
+
+/* rls.c */
+extern Datum row_security_active(PG_FUNCTION_ARGS);
+extern Datum row_security_active_name(PG_FUNCTION_ARGS);
+
+/* lockfuncs.c */
+extern Datum pg_lock_status(PG_FUNCTION_ARGS);
+extern Datum pg_blocking_pids(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_lock_int8(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_xact_lock_int8(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_lock_shared_int8(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_lock_int8(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_xact_lock_int8(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_lock_shared_int8(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_xact_lock_shared_int8(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_unlock_int8(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_unlock_shared_int8(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_lock_int4(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_xact_lock_int4(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_lock_shared_int4(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_lock_int4(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_xact_lock_int4(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_lock_shared_int4(PG_FUNCTION_ARGS);
+extern Datum pg_try_advisory_xact_lock_shared_int4(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_unlock_int4(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_unlock_shared_int4(PG_FUNCTION_ARGS);
+extern Datum pg_advisory_unlock_all(PG_FUNCTION_ARGS);
+
+/* txid.c */
+extern Datum txid_snapshot_in(PG_FUNCTION_ARGS);
+extern Datum txid_snapshot_out(PG_FUNCTION_ARGS);
+extern Datum txid_snapshot_recv(PG_FUNCTION_ARGS);
+extern Datum txid_snapshot_send(PG_FUNCTION_ARGS);
+extern Datum txid_current(PG_FUNCTION_ARGS);
+extern Datum txid_current_if_assigned(PG_FUNCTION_ARGS);
+extern Datum txid_current_snapshot(PG_FUNCTION_ARGS);
+extern Datum txid_snapshot_xmin(PG_FUNCTION_ARGS);
+extern Datum txid_snapshot_xmax(PG_FUNCTION_ARGS);
+extern Datum txid_snapshot_xip(PG_FUNCTION_ARGS);
+extern Datum txid_visible_in_snapshot(PG_FUNCTION_ARGS);
+extern Datum txid_status(PG_FUNCTION_ARGS);
+
+/* uuid.c */
+extern Datum uuid_in(PG_FUNCTION_ARGS);
+extern Datum uuid_out(PG_FUNCTION_ARGS);
+extern Datum uuid_send(PG_FUNCTION_ARGS);
+extern Datum uuid_recv(PG_FUNCTION_ARGS);
+extern Datum uuid_lt(PG_FUNCTION_ARGS);
+extern Datum uuid_le(PG_FUNCTION_ARGS);
+extern Datum uuid_eq(PG_FUNCTION_ARGS);
+extern Datum uuid_ge(PG_FUNCTION_ARGS);
+extern Datum uuid_gt(PG_FUNCTION_ARGS);
+extern Datum uuid_ne(PG_FUNCTION_ARGS);
+extern Datum uuid_cmp(PG_FUNCTION_ARGS);
+extern Datum uuid_sortsupport(PG_FUNCTION_ARGS);
+extern Datum uuid_hash(PG_FUNCTION_ARGS);
+
+/* windowfuncs.c */
+extern Datum window_row_number(PG_FUNCTION_ARGS);
+extern Datum window_rank(PG_FUNCTION_ARGS);
+extern Datum window_dense_rank(PG_FUNCTION_ARGS);
+extern Datum window_percent_rank(PG_FUNCTION_ARGS);
+extern Datum window_cume_dist(PG_FUNCTION_ARGS);
+extern Datum window_ntile(PG_FUNCTION_ARGS);
+extern Datum window_lag(PG_FUNCTION_ARGS);
+extern Datum window_lag_with_offset(PG_FUNCTION_ARGS);
+extern Datum window_lag_with_offset_and_default(PG_FUNCTION_ARGS);
+extern Datum window_lead(PG_FUNCTION_ARGS);
+extern Datum window_lead_with_offset(PG_FUNCTION_ARGS);
+extern Datum window_lead_with_offset_and_default(PG_FUNCTION_ARGS);
+extern Datum window_first_value(PG_FUNCTION_ARGS);
+extern Datum window_last_value(PG_FUNCTION_ARGS);
+extern Datum window_nth_value(PG_FUNCTION_ARGS);
+
+/* access/spgist/spgquadtreeproc.c */
+extern Datum spg_quad_config(PG_FUNCTION_ARGS);
+extern Datum spg_quad_choose(PG_FUNCTION_ARGS);
+extern Datum spg_quad_picksplit(PG_FUNCTION_ARGS);
+extern Datum spg_quad_inner_consistent(PG_FUNCTION_ARGS);
+extern Datum spg_quad_leaf_consistent(PG_FUNCTION_ARGS);
+
+/* access/spgist/spgkdtreeproc.c */
+extern Datum spg_kd_config(PG_FUNCTION_ARGS);
+extern Datum spg_kd_choose(PG_FUNCTION_ARGS);
+extern Datum spg_kd_picksplit(PG_FUNCTION_ARGS);
+extern Datum spg_kd_inner_consistent(PG_FUNCTION_ARGS);
+
+/* access/spgist/spgtextproc.c */
+extern Datum spg_text_config(PG_FUNCTION_ARGS);
+extern Datum spg_text_choose(PG_FUNCTION_ARGS);
+extern Datum spg_text_picksplit(PG_FUNCTION_ARGS);
+extern Datum spg_text_inner_consistent(PG_FUNCTION_ARGS);
+extern Datum spg_text_leaf_consistent(PG_FUNCTION_ARGS);
+
+/* access/gin/ginarrayproc.c */
+extern Datum ginarrayextract(PG_FUNCTION_ARGS);
+extern Datum ginarrayextract_2args(PG_FUNCTION_ARGS);
+extern Datum ginqueryarrayextract(PG_FUNCTION_ARGS);
+extern Datum ginarrayconsistent(PG_FUNCTION_ARGS);
+extern Datum ginarraytriconsistent(PG_FUNCTION_ARGS);
+
+/* access/tablesample/bernoulli.c */
+extern Datum tsm_bernoulli_handler(PG_FUNCTION_ARGS);
+
+/* access/tablesample/system.c */
+extern Datum tsm_system_handler(PG_FUNCTION_ARGS);
+
+/* access/transam/twophase.c */
+extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
+
+/* access/transam/multixact.c */
+extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS);
+
+/* access/transam/committs.c */
+extern Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS);
+extern Datum pg_last_committed_xact(PG_FUNCTION_ARGS);
+
+/* catalogs/dependency.c */
+extern Datum pg_describe_object(PG_FUNCTION_ARGS);
+extern Datum pg_identify_object(PG_FUNCTION_ARGS);
+extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS);
+
+/* catalog/objectaddress.c */
+extern Datum pg_get_object_address(PG_FUNCTION_ARGS);
+
+/* commands/constraint.c */
+extern Datum unique_key_recheck(PG_FUNCTION_ARGS);
+
+/* commands/event_trigger.c */
+extern Datum pg_event_trigger_dropped_objects(PG_FUNCTION_ARGS);
+extern Datum pg_event_trigger_table_rewrite_oid(PG_FUNCTION_ARGS);
+extern Datum pg_event_trigger_table_rewrite_reason(PG_FUNCTION_ARGS);
+extern Datum pg_event_trigger_ddl_commands(PG_FUNCTION_ARGS);
+
+/* commands/extension.c */
+extern Datum pg_available_extensions(PG_FUNCTION_ARGS);
+extern Datum pg_available_extension_versions(PG_FUNCTION_ARGS);
+extern Datum pg_extension_update_paths(PG_FUNCTION_ARGS);
+extern Datum pg_extension_config_dump(PG_FUNCTION_ARGS);
+
+/* commands/prepare.c */
+extern Datum pg_prepared_statement(PG_FUNCTION_ARGS);
+
+/* utils/mmgr/portalmem.c */
+extern Datum pg_cursor(PG_FUNCTION_ARGS);
 
 #endif   /* BUILTINS_H */
diff --git a/src/test/regress/expected/txid.out b/src/test/regress/expected/txid.out
index 802ccb9..015dae3 100644
--- a/src/test/regress/expected/txid.out
+++ b/src/test/regress/expected/txid.out
@@ -254,3 +254,71 @@ SELECT txid_current_if_assigned() IS NOT DISTINCT FROM BIGINT :'txid_current';
 (1 row)
 
 COMMIT;
+-- test xid status functions
+BEGIN;
+SELECT txid_current() AS committed \gset
+COMMIT;
+BEGIN;
+SELECT txid_current() AS rolledback \gset
+ROLLBACK;
+BEGIN;
+SELECT txid_current() AS inprogress \gset
+SELECT txid_status(:committed) AS committed;
+ committed 
+-----------
+ committed
+(1 row)
+
+SELECT txid_status(:rolledback) AS rolledback;
+ rolledback 
+------------
+ aborted
+(1 row)
+
+SELECT txid_status(:inprogress) AS inprogress;
+ inprogress  
+-------------
+ in progress
+(1 row)
+
+SELECT txid_status(1); -- BootstrapTransactionId is always committed
+ txid_status 
+-------------
+ committed
+(1 row)
+
+SELECT txid_status(2); -- FrozenTransactionId is always committed
+ txid_status 
+-------------
+ committed
+(1 row)
+
+SELECT txid_status(3); -- in regress testing FirstNormalTransactionId will always be behind oldestXmin
+ txid_status 
+-------------
+ 
+(1 row)
+
+COMMIT;
+BEGIN;
+CREATE FUNCTION test_future_xid_status(bigint)
+RETURNS void
+LANGUAGE plpgsql
+AS
+$$
+BEGIN
+  PERFORM txid_status($1);
+  RAISE EXCEPTION 'didn''t ERROR at xid in the future as expected';
+EXCEPTION
+  WHEN invalid_parameter_value THEN
+    RAISE NOTICE 'Got expected error for xid in the future';
+END;
+$$;
+SELECT test_future_xid_status(:inprogress + 10000);
+NOTICE:  Got expected error for xid in the future
+ test_future_xid_status 
+------------------------
+ 
+(1 row)
+
+ROLLBACK;
diff --git a/src/test/regress/sql/txid.sql b/src/test/regress/sql/txid.sql
index 4aefd9e..bd6decf 100644
--- a/src/test/regress/sql/txid.sql
+++ b/src/test/regress/sql/txid.sql
@@ -59,3 +59,41 @@ SELECT txid_current_if_assigned() IS NULL;
 SELECT txid_current() \gset
 SELECT txid_current_if_assigned() IS NOT DISTINCT FROM BIGINT :'txid_current';
 COMMIT;
+
+-- test xid status functions
+BEGIN;
+SELECT txid_current() AS committed \gset
+COMMIT;
+
+BEGIN;
+SELECT txid_current() AS rolledback \gset
+ROLLBACK;
+
+BEGIN;
+SELECT txid_current() AS inprogress \gset
+
+SELECT txid_status(:committed) AS committed;
+SELECT txid_status(:rolledback) AS rolledback;
+SELECT txid_status(:inprogress) AS inprogress;
+SELECT txid_status(1); -- BootstrapTransactionId is always committed
+SELECT txid_status(2); -- FrozenTransactionId is always committed
+SELECT txid_status(3); -- in regress testing FirstNormalTransactionId will always be behind oldestXmin
+
+COMMIT;
+
+BEGIN;
+CREATE FUNCTION test_future_xid_status(bigint)
+RETURNS void
+LANGUAGE plpgsql
+AS
+$$
+BEGIN
+  PERFORM txid_status($1);
+  RAISE EXCEPTION 'didn''t ERROR at xid in the future as expected';
+EXCEPTION
+  WHEN invalid_parameter_value THEN
+    RAISE NOTICE 'Got expected error for xid in the future';
+END;
+$$;
+SELECT test_future_xid_status(:inprogress + 10000);
+ROLLBACK;
-- 
2.5.5

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to