On 10/11/14 14:53, Robert Haas wrote:
On Mon, Nov 10, 2014 at 8:39 AM, Petr Jelinek <p...@2ndquadrant.com> wrote:
I did the calculation above wrong btw, it's actually 20 bytes not 24 bytes
per record, I am inclined to just say we can live with that.

If you do it as 20 bytes, you'll have to do some work to squeeze out
the alignment padding.  I'm inclined to think it's fine to have a few
extra padding bytes here; someone might want to use those for
something in the future, and they probably don't cost much.


I did get around the alignment via memcpy, so it is still 20bytes.

Since we agreed that the (B) case is not really feasible and we are doing
the (C), I also wonder if extradata should be renamed to nodeid (even if
it's not used at this point as nodeid). And then there is question about the
size of it, since the nodeid itself can live with 2 bytes probably ("64k of
nodes ought to be enough for everybody" ;) ).
Or leave the extradata as is but use as reserved space for future use and
not expose it at this time on SQL level at all?

I vote for calling it node-ID, and for allowing at least 4 bytes for
it.  Penny wise, pound foolish.


Ok, I went this way.

Anyway here is v8 version of the patch, I think I addressed all the concerns mentioned, it's also rebased against current master (BRIN commit added some conflicts).

Brief list of changes:
 - the commit timestamp record now stores timestamp, lsn and nodeid
- added code to disallow turning track_commit_timestamp on with too small pagesize
 - the get interfaces error out when track_commit_timestamp is off
- if the xid passed to get interface is out of range -infinity timestamp is returned (I think it's bad idea to throw errors here as the valid range is not static and same ID can start throwing errors between calls theoretically) - renamed the sql interfaces to pg_xact_commit_timestamp, pg_xact_commit_timestamp_data and pg_last_committed_xact, they don't expose the nodeid atm, I personally am not big fan of the "xact" but it seems more consistent with existing naming - documented pg_resetxlog changes and make all the pg_resetxlog options alphabetically ordered - committs is not used anymore, it's commit_ts (and CommitTs in camelcase), I think it's not really good idea to spell the timestamp everywhere as some interface then get 30+ chars long names...
 - added WAL logging of the track_commit_timestamp GUC
- added alternative expected output of the regression test so that it works with make installcheck when track_commit_timestamp is on
 - added C interface to set default nodeid for current backend
 - several minor comment and naming adjustments mostly suggested by Michael


--
 Petr Jelinek                  http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/contrib/Makefile b/contrib/Makefile
index b37d0dd..e331297 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -50,6 +50,7 @@ SUBDIRS = \
 		spi		\
 		tablefunc	\
 		tcn		\
+		test_committs	\
 		test_decoding	\
 		test_parser	\
 		test_shm_mq	\
diff --git a/contrib/pg_upgrade/pg_upgrade.c b/contrib/pg_upgrade/pg_upgrade.c
index 3b8241b..f0a023f 100644
--- a/contrib/pg_upgrade/pg_upgrade.c
+++ b/contrib/pg_upgrade/pg_upgrade.c
@@ -423,8 +423,10 @@ copy_clog_xlog_xid(void)
 	/* set the next transaction id and epoch of the new cluster */
 	prep_status("Setting next transaction ID and epoch for new cluster");
 	exec_prog(UTILITY_LOG_FILE, NULL, true,
-			  "\"%s/pg_resetxlog\" -f -x %u \"%s\"",
-			  new_cluster.bindir, old_cluster.controldata.chkpnt_nxtxid,
+			  "\"%s/pg_resetxlog\" -f -x %u -c %u \"%s\"",
+			  new_cluster.bindir,
+			  old_cluster.controldata.chkpnt_nxtxid,
+			  old_cluster.controldata.chkpnt_nxtxid,
 			  new_cluster.pgdata);
 	exec_prog(UTILITY_LOG_FILE, NULL, true,
 			  "\"%s/pg_resetxlog\" -f -e %u \"%s\"",
diff --git a/contrib/pg_xlogdump/rmgrdesc.c b/contrib/pg_xlogdump/rmgrdesc.c
index 9397198..e0af3cf 100644
--- a/contrib/pg_xlogdump/rmgrdesc.c
+++ b/contrib/pg_xlogdump/rmgrdesc.c
@@ -10,6 +10,7 @@
 
 #include "access/brin_xlog.h"
 #include "access/clog.h"
+#include "access/committs.h"
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/hash.h"
diff --git a/contrib/test_committs/.gitignore b/contrib/test_committs/.gitignore
new file mode 100644
index 0000000..1f95503
--- /dev/null
+++ b/contrib/test_committs/.gitignore
@@ -0,0 +1,5 @@
+# Generated subdirectories
+/log/
+/isolation_output/
+/regression_output/
+/tmp_check/
diff --git a/contrib/test_committs/Makefile b/contrib/test_committs/Makefile
new file mode 100644
index 0000000..2240749
--- /dev/null
+++ b/contrib/test_committs/Makefile
@@ -0,0 +1,45 @@
+# Note: because we don't tell the Makefile there are any regression tests,
+# we have to clean those result files explicitly
+EXTRA_CLEAN = $(pg_regress_clean_files) ./regression_output ./isolation_output
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/test_committs
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
+
+# We can't support installcheck because normally installcheck users don't have
+# the required track_commit_timestamp on
+installcheck:;
+
+check: regresscheck
+
+submake-regress:
+	$(MAKE) -C $(top_builddir)/src/test/regress all
+
+submake-test_committs:
+	$(MAKE) -C $(top_builddir)/contrib/test_committs
+
+REGRESSCHECKS=committs_on
+
+regresscheck: all | submake-regress submake-test_committs
+	$(MKDIR_P) regression_output
+	$(pg_regress_check) \
+	    --temp-config $(top_srcdir)/contrib/test_committs/committs.conf \
+	    --temp-install=./tmp_check \
+	    --extra-install=contrib/test_committs \
+	    --outputdir=./regression_output \
+	    $(REGRESSCHECKS)
+
+regresscheck-install-force: | submake-regress submake-test_committs
+	$(pg_regress_installcheck) \
+	    --extra-install=contrib/test_committs \
+	    $(REGRESSCHECKS)
+
+PHONY: submake-test_committs submake-regress check \
+	regresscheck regresscheck-install-force
\ No newline at end of file
diff --git a/contrib/test_committs/committs.conf b/contrib/test_committs/committs.conf
new file mode 100644
index 0000000..d221a60
--- /dev/null
+++ b/contrib/test_committs/committs.conf
@@ -0,0 +1 @@
+track_commit_timestamp = on
\ No newline at end of file
diff --git a/contrib/test_committs/expected/committs_on.out b/contrib/test_committs/expected/committs_on.out
new file mode 100644
index 0000000..1457a27
--- /dev/null
+++ b/contrib/test_committs/expected/committs_on.out
@@ -0,0 +1,21 @@
+--
+-- Commit Timestamp (on)
+--
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+ id | ?column? | ?column? | ?column? 
+----+----------+----------+----------
+  1 | t        | t        | t
+  2 | t        | t        | t
+  3 | t        | t        | t
+(3 rows)
+
+DROP TABLE committs_test;
diff --git a/contrib/test_committs/sql/committs_on.sql b/contrib/test_committs/sql/committs_on.sql
new file mode 100644
index 0000000..0f2d064
--- /dev/null
+++ b/contrib/test_committs/sql/committs_on.sql
@@ -0,0 +1,18 @@
+--
+-- Commit Timestamp (on)
+--
+
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+
+DROP TABLE committs_test;
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 6bfb7bb..2fef80e 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -2673,6 +2673,20 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-track-commit-timestamp" xreflabel="track_commit_timestamp">
+      <term><varname>track_commit_timestamp</varname> (<type>bool</type>)</term>
+      <indexterm>
+       <primary><varname>track_commit_timestamp</> configuration parameter</primary>
+      </indexterm>
+      <listitem>
+       <para>
+        Record commit time of transactions. This parameter
+        can only be set in <filename>postgresql.conf</> file or on the server
+        command line. The default value is <literal>off</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      </variablelist>
     </sect2>
 
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index b58cfa5..e3ace51 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -15918,6 +15918,43 @@ SELECT collation for ('foo' COLLATE "de_DE");
     For example <literal>10:20:10,14,15</literal> means
     <literal>xmin=10, xmax=20, xip_list=10, 14, 15</literal>.
    </para>
+
+   <para>
+    The functions shown in <xref linkend="functions-committs">
+    provide information about transactions that have been already committed.
+    These functions mainly provide information about when the transactions
+    were committed. They only provide useful data when
+    <xref linkend="guc-track-commit-timestamp"> configuration option is enabled
+    and only for transactions that were committed after it was enabled.
+   </para>
+
+   <table id="functions-committs">
+    <title>Committed transaction information</title>
+    <tgroup cols="3">
+     <thead>
+      <row><entry>Name</entry> <entry>Return Type</entry> <entry>Description</entry></row>
+     </thead>
+
+     <tbody>
+      <row>
+       <entry><literal><function>pg_xact_commit_timestamp(<parameter>xid</parameter>)</function></literal></entry>
+       <entry><type>timestamp with time zone</type></entry>
+       <entry>get commit timestamp of a transaction</entry>
+      </row>
+      <row>
+       <entry><literal><function>pg_xact_commit_timestamp_data(<parameter>xid</>)</function></literal></entry>
+       <entry> <parameter>timestamp</> <type>timestamp with time zone</>, <parameter>lsn</> <type>pg_lsn</></entry>
+       <entry>get commit timestamp and lsn of a transaction</entry>
+      </row>
+      <row>
+       <entry><literal><function>pg_last_committed_xact()</function></literal></entry>
+       <entry><parameter>xid</> <type>xid</>, <parameter>timestamp</> <type>timestamp with time zone</>, <parameter>lsn</> <type>pg_lsn</></entry>
+       <entry>get transaction Id, commit timestamp and lsn of latest transaction commit</entry>
+      </row>
+     </tbody>
+    </tgroup>
+   </table>
+
   </sect1>
 
   <sect1 id="functions-admin">
diff --git a/doc/src/sgml/ref/pg_resetxlog.sgml b/doc/src/sgml/ref/pg_resetxlog.sgml
index aba7185..7117118 100644
--- a/doc/src/sgml/ref/pg_resetxlog.sgml
+++ b/doc/src/sgml/ref/pg_resetxlog.sgml
@@ -22,6 +22,7 @@ PostgreSQL documentation
  <refsynopsisdiv>
   <cmdsynopsis>
    <command>pg_resetxlog</command>
+   <arg choice="opt"><option>-c</option> <replaceable class="parameter">xid</replaceable></arg>
    <arg choice="opt"><option>-f</option></arg>
    <arg choice="opt"><option>-n</option></arg>
    <arg choice="opt"><option>-o</option> <replaceable class="parameter">oid</replaceable></arg>
@@ -78,11 +79,12 @@ PostgreSQL documentation
 
   <para>
    The <option>-o</>, <option>-x</>, <option>-e</>,
-   <option>-m</>, <option>-O</>,
-   and <option>-l</>
+   <option>-m</>, <option>-O</>, <option>-l</>
+   and <option>-e</>
    options allow the next OID, next transaction ID, next transaction ID's
-   epoch, next and oldest multitransaction ID, next multitransaction offset, and WAL
-   starting address values to be set manually.  These are only needed when
+   epoch, next and oldest multitransaction ID, next multitransaction offset, WAL
+   starting address and the oldest transaction ID for which the commit time can
+   be retrieved values to be set manually.  These are only needed when
    <command>pg_resetxlog</command> is unable to determine appropriate values
    by reading <filename>pg_control</>.  Safe values can be determined as
    follows:
@@ -130,6 +132,15 @@ PostgreSQL documentation
 
     <listitem>
      <para>
+      A safe value for the oldest transaction ID for which the commit time can
+      be retrieve (<option>-c</>) can be determined by looking for the
+      numerically smallest file name in the directory <filename>pg_committs</>
+      under the data directory As above, the file names are in hexadecimal.
+     </para>
+    </listitem>
+
+    <listitem>
+     <para>
       The WAL starting address (<option>-l</>) should be
       larger than any WAL segment file name currently existing in
       the directory <filename>pg_xlog</> under the data directory.
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index 32cb985..0daa9bb 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -8,9 +8,8 @@ subdir = src/backend/access/rmgrdesc
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = brindesc.o clogdesc.o dbasedesc.o gindesc.o gistdesc.o \
-	   hashdesc.o heapdesc.o \
-	   mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o smgrdesc.o spgdesc.o \
-	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
+OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
+	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o seqdesc.o \
+	   smgrdesc.o spgdesc.o standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/committsdesc.c b/src/backend/access/rmgrdesc/committsdesc.c
new file mode 100644
index 0000000..7802584
--- /dev/null
+++ b/src/backend/access/rmgrdesc/committsdesc.c
@@ -0,0 +1,73 @@
+/*-------------------------------------------------------------------------
+ *
+ * committsdesc.c
+ *    rmgr descriptor routines for access/transam/committs.c
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *    src/backend/access/rmgrdesc/committsdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/committs.h"
+#include "utils/timestamp.h"
+
+
+void
+commit_ts_desc(StringInfo buf, XLogRecord *record)
+{
+	char	   *rec = XLogRecGetData(record);
+	uint8		info = record->xl_info & ~XLR_INFO_MASK;
+
+	if (info == COMMIT_TS_ZEROPAGE)
+	{
+		int			pageno;
+
+		memcpy(&pageno, rec, sizeof(int));
+		appendStringInfo(buf, "zeropage: %d", pageno);
+	}
+	else if (info == COMMIT_TS_TRUNCATE)
+	{
+		int			pageno;
+
+		memcpy(&pageno, rec, sizeof(int));
+		appendStringInfo(buf, "truncate before: %d", pageno);
+	}
+	else if (info == COMMIT_TS_SETTS)
+	{
+		xl_commit_ts_set *xlrec = (xl_commit_ts_set *) rec;
+		int		i;
+
+		appendStringInfo(buf, "set commit_ts %s for: %u",
+						 timestamptz_to_str(xlrec->timestamp),
+						 xlrec->mainxid);
+		for (i = 0; i < xlrec->nsubxids; i++)
+			appendStringInfo(buf, ", %u", xlrec->subxids[i]);
+	}
+}
+
+const char *
+commit_ts_identify(uint8 info)
+{
+	const char *id = NULL;
+
+	switch (info)
+	{
+		case COMMIT_TS_ZEROPAGE:
+			id = "ZEROPAGE";
+			break;
+		case COMMIT_TS_TRUNCATE:
+			id = "TRUNCATE";
+			break;
+		case COMMIT_TS_SETTS:
+			id = "SETTS";
+			break;
+	}
+
+	return id;
+}
diff --git a/src/backend/access/rmgrdesc/xlogdesc.c b/src/backend/access/rmgrdesc/xlogdesc.c
index e0957ff..9919c52 100644
--- a/src/backend/access/rmgrdesc/xlogdesc.c
+++ b/src/backend/access/rmgrdesc/xlogdesc.c
@@ -45,7 +45,7 @@ xlog_desc(StringInfo buf, XLogRecord *record)
 		appendStringInfo(buf, "redo %X/%X; "
 						 "tli %u; prev tli %u; fpw %s; xid %u/%u; oid %u; multi %u; offset %u; "
 						 "oldest xid %u in DB %u; oldest multi %u in DB %u; "
-						 "oldest running xid %u; %s",
+						 "oldest commit timestamp xid: %u; oldest running xid %u; %s",
 				(uint32) (checkpoint->redo >> 32), (uint32) checkpoint->redo,
 						 checkpoint->ThisTimeLineID,
 						 checkpoint->PrevTimeLineID,
@@ -58,6 +58,7 @@ xlog_desc(StringInfo buf, XLogRecord *record)
 						 checkpoint->oldestXidDB,
 						 checkpoint->oldestMulti,
 						 checkpoint->oldestMultiDB,
+						 checkpoint->oldestCommitTs,
 						 checkpoint->oldestActiveXid,
 				 (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
 	}
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 82a6c76..a1979ca 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = clog.o transam.o varsup.o xact.o rmgr.o slru.o subtrans.o multixact.o \
 	timeline.o twophase.o twophase_rmgr.o xlog.o xlogarchive.o xlogfuncs.o \
-	xloginsert.o xlogreader.o xlogutils.o
+	xloginsert.o xlogreader.o xlogutils.o committs.o
 
 include $(top_srcdir)/src/backend/common.mk
 
diff --git a/src/backend/access/transam/committs.c b/src/backend/access/transam/committs.c
new file mode 100644
index 0000000..a50a027
--- /dev/null
+++ b/src/backend/access/transam/committs.c
@@ -0,0 +1,887 @@
+/*-------------------------------------------------------------------------
+ *
+ * committs.c
+ *		PostgreSQL commit timestamp manager
+ *
+ * This module is a pg_clog-like system that stores the commit timestamp
+ * for each transaction.
+ *
+ * XLOG interactions: this module generates an XLOG record whenever a new
+ * CommitTs page is initialized to zeroes.  Also, one XLOG record is
+ * generated for setting of values when the caller requests it; this allows
+ * us to support values coming from places other than transaction commit.
+ * Other writes of CommitTS come from recording of transaction commit in
+ * xact.c, which generates its own XLOG records for these events and will
+ * re-perform the status update on redo; so we need make no additional XLOG
+ * entry here.
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/transam/committs.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/committs.h"
+#include "access/htup_details.h"
+#include "access/slru.h"
+#include "access/transam.h"
+#include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
+#include "pg_trace.h"
+#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
+#include "utils/snapmgr.h"
+#include "utils/timestamp.h"
+
+/*
+ * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
+ * everywhere else in Postgres.
+ *
+ * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
+ * CommitTs page numbering also wraps around at
+ * 0xFFFFFFFF/COMMITTS_XACTS_PER_PAGE, and CommitTs segment numbering at
+ * 0xFFFFFFFF/COMMITTS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
+ * explicit notice of that fact in this module, except when comparing segment
+ * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
+ */
+
+/* We need 8+4 bytes per xact */
+typedef struct CommitTimestampEntry
+{
+	TimestampTz		time;
+	XLogRecPtr		lsn;
+	NodeIdRec		nodeid;
+} CommitTimestampEntry;
+
+#define SizeOfCommitTimestampEntry sizeof(CommitTimestampEntry)
+
+/* this is limited by how much data we can fit into SLRU cache */
+#define COMMIT_TS_MIN_BLCKSZ 4096
+
+#define COMMIT_TS_XACTS_PER_PAGE \
+	(BLCKSZ / SizeOfCommitTimestampEntry)
+
+#define TransactionIdToCTsPage(xid)	\
+	((xid) / (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+#define TransactionIdToCTsEntry(xid)	\
+	((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
+
+/*
+ * Link to shared-memory data structures for CommitTs control
+ */
+static SlruCtlData CommitTsCtlData;
+
+#define CommitTsCtl (&CommitTsCtlData)
+
+/*
+ * We keep a cache of the last value set in shared memory.  This is protected
+ * by CommitTsLock.
+ */
+typedef struct CommitTimestampShared
+{
+	TransactionId	xidLastCommit;
+	CommitTimestampEntry dataLastCommit;
+} CommitTimestampShared;
+
+CommitTimestampShared	*commitTsShared;
+
+
+/* GUC variable */
+bool	track_commit_timestamp;
+
+NodeIdRec CommitTsDefaultNodeId = InvalidNodeId;
+
+static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+					 TransactionId *subxids, TimestampTz ts,
+					 XLogRecPtr lsn, NodeIdRec nodeid, int pageno);
+static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+						  XLogRecPtr lsn, NodeIdRec nodeid, int slotno);
+static int	ZeroCommitTsPage(int pageno, bool writeXlog);
+static bool CommitTsPagePrecedes(int page1, int page2);
+static void WriteZeroPageXlogRec(int pageno);
+static void WriteTruncateXlogRec(int pageno);
+static void WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+						 TransactionId *subxids, TimestampTz timestamp,
+						 XLogRecPtr lsn, NodeIdRec nodeid);
+
+
+/*
+ * CommitTsSetCurrentNodeId
+ *
+ * Set default nodeid for current backend.
+ */
+extern void CommitTsSetDefaultNodeId(NodeIdRec nodeid)
+{
+	CommitTsDefaultNodeId = nodeid;
+}
+
+/*
+ * TransactionTreeSetCommitTsData
+ *
+ * Record the final commit timestamp of transaction entries in the commit log
+ * for a transaction and its subtransaction tree, as efficiently as possible.
+ *
+ * xid is the top level transaction id.
+ *
+ * subxids is an array of xids of length nsubxids, representing subtransactions
+ * in the tree of xid. In various cases nsubxids may be zero.
+ * The reason why tracking just the parent xid committs is not enough is that
+ * the subtrans SLRU does not stay valid across crashes (is not permanent) so we
+ * need to keep the information about them here. If the subtrans implementation
+ * changes in the future, we might want to revisit the decision of storing
+ * committs for each subxid.
+ *
+ * The do_xlog parameter tells us whether to include a XLog record of this
+ * or not.  Normal path through RecordTransactionCommit() will be related
+ * to a transaction commit XLog record, and so should pass "false" here.
+ * Other callers probably want to pass true, so that the given values persist
+ * in case of crashes.
+ */
+void
+TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
+							   TransactionId *subxids, TimestampTz timestamp,
+							   XLogRecPtr lsn, NodeIdRec nodeid, bool do_xlog)
+{
+	int			i;
+	TransactionId headxid;
+
+	Assert(xid != InvalidTransactionId);
+
+	if (!track_commit_timestamp)
+		return;
+
+	/*
+	 * Comply with the WAL-before-data rule: if caller specified it wants
+	 * this value to be recorded in WAL, do so before touching the data.
+	 */
+	if (do_xlog)
+		WriteSetTimestampXlogRec(xid, nsubxids, subxids, timestamp, lsn, nodeid);
+
+	/*
+	 * We split the xids to set the timestamp to in groups belonging to the
+	 * same SLRU page; the first element in each such set is its head.  The
+	 * first group has the main XID as the head; subsequent sets use the
+	 * first subxid not on the previous page as head.  This way, we only have
+	 * to lock/modify each SLRU page once.
+	 */
+	for (i = 0, headxid = xid;;)
+	{
+		int			pageno = TransactionIdToCTsPage(headxid);
+		int			j;
+
+		for (j = i; j < nsubxids; j++)
+		{
+			if (TransactionIdToCTsPage(subxids[j]) != pageno)
+				break;
+		}
+		/* subxids[i..j] are on the same page as the head */
+
+		SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, lsn,
+							 nodeid, pageno);
+
+		/* if we wrote out all subxids, we're done. */
+		if (j + 1 >= nsubxids)
+			break;
+
+		/*
+		 * Set the new head and skip over it, as well as over the subxids
+		 * we just wrote.
+		 */
+		headxid = subxids[j];
+		i += j - i + 1;
+	}
+
+	/*
+	 * Update the cached value in shared memory
+	 */
+	LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
+	commitTsShared->xidLastCommit = xid;
+	commitTsShared->dataLastCommit.time = timestamp;
+	commitTsShared->dataLastCommit.lsn = lsn;
+	LWLockRelease(CommitTsLock);
+}
+
+/*
+ * Record the commit timestamp of transaction entries in the commit log for all
+ * entries on a single page.  Atomic only on this page.
+ */
+static void
+SetXidCommitTsInPage(TransactionId xid, int nsubxids,
+					 TransactionId *subxids, TimestampTz ts,
+					 XLogRecPtr lsn, NodeIdRec nodeid, int pageno)
+{
+	int			slotno;
+	int			i;
+
+	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+	slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
+
+	TransactionIdSetCommitTs(xid, ts, lsn, nodeid, slotno);
+	for (i = 0; i < nsubxids; i++)
+		TransactionIdSetCommitTs(subxids[i], ts, lsn, nodeid, slotno);
+
+	CommitTsCtl->shared->page_dirty[slotno] = true;
+
+	LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * Sets the commit timestamp of a single transaction.
+ *
+ * Must be called with CommitTsControlLock held
+ */
+static void
+TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
+						 XLogRecPtr lsn, NodeIdRec nodeid, int slotno)
+{
+	int			entryno = TransactionIdToCTsEntry(xid);
+	CommitTimestampEntry entry;
+
+	entry.time = ts;
+	entry.lsn = lsn;
+	entry.nodeid = nodeid;
+
+	memcpy(CommitTsCtl->shared->page_buffer[slotno] +
+				SizeOfCommitTimestampEntry * entryno,
+		   &entry, SizeOfCommitTimestampEntry);
+}
+
+/*
+ * Interrogate the commit timestamp of a transaction.
+ */
+void
+TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
+							 XLogRecPtr *lsn, NodeIdRec *nodeid)
+{
+	int			pageno = TransactionIdToCTsPage(xid);
+	int			entryno = TransactionIdToCTsEntry(xid);
+	int			slotno;
+	CommitTimestampEntry entry;
+	TransactionId oldestCommitTs;
+
+	/* Error if module not enabled */
+	if (!track_commit_timestamp)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("Cannot get commit timestamp data because \"track_commit_timestamp\" is not enabled")));
+	}
+
+	/*
+	 * Return empty if the requested value is older than what we have or
+	 * newer than newest we have.
+	 *
+	 * XXX: should this be error instead?
+	 */
+	LWLockAcquire(CommitTsControlLock, LW_SHARED);
+	oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+	LWLockRelease(CommitTsControlLock);
+
+	if (!TransactionIdIsValid(oldestCommitTs) ||
+		TransactionIdPrecedes(xid, oldestCommitTs) ||
+		TransactionIdPrecedes(commitTsShared->xidLastCommit, xid))
+	{
+		if (ts)
+			TIMESTAMP_NOBEGIN(*ts);
+		if (lsn)
+			*lsn = InvalidXLogRecPtr;
+		if (nodeid)
+			*nodeid = InvalidNodeId;
+		return;
+	}
+
+	/*
+	 * Use an unlocked atomic read on our cached value in shared memory;
+	 * if it's a hit, acquire a lock and read the data, after verifying
+	 * that it's still what we initially read.  Otherwise, fall through
+	 * to read from SLRU.
+	 */
+	if (commitTsShared->xidLastCommit == xid)
+	{
+		LWLockAcquire(CommitTsLock, LW_SHARED);
+		if (commitTsShared->xidLastCommit == xid)
+		{
+			if (ts)
+				*ts = commitTsShared->dataLastCommit.time;
+			if (lsn)
+				*lsn = commitTsShared->dataLastCommit.lsn;
+			if (nodeid)
+				*nodeid = commitTsShared->dataLastCommit.nodeid;
+			LWLockRelease(CommitTsLock);
+			return;
+		}
+		LWLockRelease(CommitTsLock);
+	}
+
+	/* lock is acquired by SimpleLruReadPage_ReadOnly */
+	slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
+	memcpy(&entry,
+		   CommitTsCtl->shared->page_buffer[slotno] +
+				SizeOfCommitTimestampEntry * entryno,
+		   SizeOfCommitTimestampEntry);
+
+	if (ts)
+		*ts = entry.time;
+	if (lsn)
+		*lsn = entry.lsn;
+	if (nodeid)
+		*nodeid = entry.nodeid;
+
+	LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * Return the Xid of the latest committed transaction.  (As far as this module
+ * is concerned, anyway; it's up to the caller to ensure the value is useful
+ * for its purposes.)
+ *
+ * ts and extra are filled with the corresponding data; they can be passed
+ * as NULL if not wanted.
+ */
+TransactionId
+GetLatestCommitTsData(TimestampTz *ts, XLogRecPtr *lsn, NodeIdRec *nodeid)
+{
+	TransactionId	xid;
+
+	/* Return empty if module not enabled */
+	if (!track_commit_timestamp)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("Cannot get commit timestamp data because \"track_commit_timestamp\" is not enabled")));
+	}
+
+	LWLockAcquire(CommitTsLock, LW_SHARED);
+	xid = commitTsShared->xidLastCommit;
+	if (ts)
+		*ts = commitTsShared->dataLastCommit.time;
+	if (lsn)
+		*lsn = commitTsShared->dataLastCommit.lsn;
+	if (nodeid)
+		*nodeid = commitTsShared->dataLastCommit.nodeid;
+	LWLockRelease(CommitTsLock);
+
+	return xid;
+}
+
+/*
+ * SQL-callable wrapper to obtain commit time of a transaction
+ */
+Datum
+pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
+{
+	TransactionId	xid = PG_GETARG_UINT32(0);
+	TimestampTz		ts;
+
+	TransactionIdGetCommitTsData(xid, &ts, NULL, NULL);
+
+	PG_RETURN_TIMESTAMPTZ(ts);
+}
+
+Datum
+pg_xact_commit_timestamp_data(PG_FUNCTION_ARGS)
+{
+	TransactionId	xid = PG_GETARG_UINT32(0);
+	TimestampTz		ts;
+	XLogRecPtr		lsn;
+	Datum       values[2];
+	bool        nulls[2];
+	TupleDesc   tupdesc;
+	HeapTuple	htup;
+
+	/*
+	 * Construct a tuple descriptor for the result row.  This must match this
+	 * function's pg_proc entry!
+	 */
+	tupdesc = CreateTemplateTupleDesc(2, false);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "timestamp",
+					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "lsn",
+					   LSNOID, -1, 0);
+	tupdesc = BlessTupleDesc(tupdesc);
+
+	/* and construct a tuple with our data */
+	TransactionIdGetCommitTsData(xid, &ts, &lsn, NULL);
+
+	values[0] = TimestampTzGetDatum(ts);
+	nulls[0] = false;
+
+	values[1] = LSNGetDatum(lsn);
+	nulls[1] = false;
+
+	htup = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
+
+Datum
+pg_last_committed_xact(PG_FUNCTION_ARGS)
+{
+	TransactionId	xid;
+	TimestampTz		ts;
+	XLogRecPtr		lsn;
+	Datum       values[3];
+	bool        nulls[3];
+	TupleDesc   tupdesc;
+	HeapTuple	htup;
+
+	/*
+	 * Construct a tuple descriptor for the result row.  This must match this
+	 * function's pg_proc entry!
+	 */
+	tupdesc = CreateTemplateTupleDesc(3, false);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 1, "xid",
+					   XIDOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 2, "timestamp",
+					   TIMESTAMPTZOID, -1, 0);
+	TupleDescInitEntry(tupdesc, (AttrNumber) 3, "lsn",
+					   LSNOID, -1, 0);
+	tupdesc = BlessTupleDesc(tupdesc);
+
+	/* and construct a tuple with our data */
+	xid = GetLatestCommitTsData(&ts, &lsn, NULL);
+
+	values[0] = TransactionIdGetDatum(xid);
+	nulls[0] = false;
+
+	values[1] = TimestampTzGetDatum(ts);
+	nulls[1] = false;
+
+	values[2] = LSNGetDatum(lsn);
+	nulls[2] = false;
+
+	htup = heap_form_tuple(tupdesc, values, nulls);
+
+	PG_RETURN_DATUM(HeapTupleGetDatum(htup));
+}
+
+/*
+ * Number of shared CommitTS buffers.
+ *
+ * We use a very similar logic as for the number of CLOG buffers; see comments
+ * in CLOGShmemBuffers.
+ */
+Size
+CommitTsShmemBuffers(void)
+{
+	return Min(16, Max(4, NBuffers / 1024));
+}
+
+/*
+ * Initialization of shared memory for CommitTs
+ */
+Size
+CommitTsShmemSize(void)
+{
+	return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
+		sizeof(CommitTimestampShared);
+}
+
+void
+CommitTsShmemInit(void)
+{
+	bool	found;
+
+	CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
+	SimpleLruInit(CommitTsCtl, "CommitTs Ctl", CommitTsShmemBuffers(), 0,
+				  CommitTsControlLock, "pg_commit_ts");
+
+	commitTsShared = ShmemInitStruct("CommitTs shared",
+									 sizeof(CommitTimestampShared),
+									 &found);
+
+	if (!IsUnderPostmaster)
+	{
+		Assert(!found);
+
+		commitTsShared->xidLastCommit = InvalidTransactionId;
+		TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
+		commitTsShared->dataLastCommit.lsn = InvalidXLogRecPtr;
+		commitTsShared->dataLastCommit.nodeid = InvalidNodeId;
+	}
+	else
+		Assert(found);
+}
+
+/*
+ * This function must be called ONCE on system install.
+ *
+ * (The CommitTs directory is assumed to have been created by initdb, and
+ * CommitTsShmemInit must have been called already.)
+ */
+void
+BootStrapCommitTs(void)
+{
+	/*
+	 * Nothing to do here at present, unlike most other SLRU modules; segments
+	 * are created when the server is started with this module enabled.
+	 * See StartupCommitTs.
+	 */
+}
+
+/*
+ * Initialize (or reinitialize) a page of CommitTs to zeroes.
+ * If writeXlog is TRUE, also emit an XLOG record saying we did this.
+ *
+ * The page is not actually written, just set up in shared memory.
+ * The slot number of the new page is returned.
+ *
+ * Control lock must be held at entry, and will be held at exit.
+ */
+static int
+ZeroCommitTsPage(int pageno, bool writeXlog)
+{
+	int			slotno;
+
+	slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
+
+	if (writeXlog)
+		WriteZeroPageXlogRec(pageno);
+
+	return slotno;
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ */
+void
+StartupCommitTs(void)
+{
+	TransactionId xid = ShmemVariableCache->nextXid;
+	int			pageno = TransactionIdToCTsPage(xid);
+
+	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+	/*
+	 * Initialize our idea of the latest page number.
+	 */
+	CommitTsCtl->shared->latest_page_number = pageno;
+
+	LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend startup,
+ * when commit timestamp is enabled.
+ * Must be called after recovery has finished.
+ *
+ * This is in charge of creating the currently active segment, if it's not
+ * already there.  The reason for this is that the server might have been
+ * running with this module disabled for a while and thus might have skipped
+ * the normal creation point.
+ */
+void
+InitCommitTs(void)
+{
+	TransactionId xid = ShmemVariableCache->nextXid;
+	int			pageno = TransactionIdToCTsPage(xid);
+
+	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+	/*
+	 * Re-Initialize our idea of the latest page number.
+	 */
+	CommitTsCtl->shared->latest_page_number = pageno;
+
+	/*
+	 * If this module is not currently enabled, make sure we don't hand back
+	 * possibly-invalid data; also remove segments of old data.
+	 */
+	if (!track_commit_timestamp)
+	{
+		ShmemVariableCache->oldestCommitTs = InvalidTransactionId;
+		LWLockRelease(CommitTsControlLock);
+
+		TruncateCommitTs(ReadNewTransactionId());
+
+		return;
+	}
+
+	/*
+	 * If CommitTs is enabled, but it wasn't in the previous server run, we
+	 * need to set the oldest value to the next Xid; that way, we will not try
+	 * to read data that might not have been set.
+	 *
+	 * XXX does this have a problem if a server is started with commitTs
+	 * enabled, then started with commitTs disabled, then restarted with it
+	 * enabled again?  It doesn't look like it does, because there should be a
+	 * checkpoint that sets the value to InvalidTransactionId at end of
+	 * recovery; and so any chance of injecting new transactions without
+	 * CommitTs values would occur after the oldestCommitTs has been set to
+	 * Invalid temporarily.
+	 */
+	if (ShmemVariableCache->oldestCommitTs == InvalidTransactionId)
+		ShmemVariableCache->oldestCommitTs = ReadNewTransactionId();
+
+	/* Finally, create the current segment file, if necessary */
+	if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
+	{
+		int		slotno;
+
+		slotno = ZeroCommitTsPage(pageno, false);
+		SimpleLruWritePage(CommitTsCtl, slotno);
+		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+	}
+
+	LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * This must be called ONCE during postmaster or standalone-backend shutdown
+ */
+void
+ShutdownCommitTs(void)
+{
+	/* Flush dirty CommitTs pages to disk */
+	SimpleLruFlush(CommitTsCtl, false);
+}
+
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
+void
+CheckPointCommitTs(void)
+{
+	/* Flush dirty CommitTs pages to disk */
+	SimpleLruFlush(CommitTsCtl, true);
+}
+
+/*
+ * Make sure that CommitTs has room for a newly-allocated XID.
+ *
+ * NB: this is called while holding XidGenLock.  We want it to be very fast
+ * most of the time; even when it's not so fast, no actual I/O need happen
+ * unless we're forced to write out a dirty CommitTs or xlog page to make room
+ * in shared memory.
+ *
+ * NB2: the current implementation relies on the fact that
+ * track_commit_timestamp is flagged as PGC_POSTMASTER
+ * (only possible to be set at server start).
+ */
+void
+ExtendCommitTs(TransactionId newestXact)
+{
+	int			pageno;
+
+	/* nothing to do if module not enabled */
+	if (!track_commit_timestamp)
+		return;
+
+	/*
+	 * No work except at first XID of a page.  But beware: just after
+	 * wraparound, the first XID of page zero is FirstNormalTransactionId.
+	 */
+	if (TransactionIdToCTsEntry(newestXact) != 0 &&
+		!TransactionIdEquals(newestXact, FirstNormalTransactionId))
+		return;
+
+	pageno = TransactionIdToCTsPage(newestXact);
+
+	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+	/* Zero the page and make an XLOG entry about it */
+	ZeroCommitTsPage(pageno, !InRecovery);
+
+	LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * Remove all CommitTs segments before the one holding the passed
+ * transaction ID
+ *
+ * Note that we don't need to flush XLOG here.
+ */
+void
+TruncateCommitTs(TransactionId oldestXact)
+{
+	int			cutoffPage;
+
+	/*
+	 * The cutoff point is the start of the segment containing oldestXact. We
+	 * pass the *page* containing oldestXact to SimpleLruTruncate.
+	 */
+	cutoffPage = TransactionIdToCTsPage(oldestXact);
+
+	/* Check to see if there's any files that could be removed */
+	if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence, &cutoffPage))
+		return;					/* nothing to remove */
+
+	/* Write XLOG record */
+	WriteTruncateXlogRec(cutoffPage);
+
+	/* Now we can remove the old CommitTs segment(s) */
+	SimpleLruTruncate(CommitTsCtl, cutoffPage);
+}
+
+/*
+ * Set the earliest value for which commit TS can be consulted.
+ */
+void
+SetCommitTsLimit(TransactionId oldestXact)
+{
+	/*
+	 * Be careful not to overwrite values that are either further into the
+	 * "future" or signal a disabled committs.
+	 */
+	LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+	if (ShmemVariableCache->oldestCommitTs != InvalidTransactionId &&
+		TransactionIdPrecedes(ShmemVariableCache->oldestCommitTs, oldestXact))
+		ShmemVariableCache->oldestCommitTs = oldestXact;
+	LWLockRelease(CommitTsControlLock);
+}
+
+/*
+ * Decide which of two CLOG page numbers is "older" for truncation purposes.
+ *
+ * We need to use comparison of TransactionIds here in order to do the right
+ * thing with wraparound XID arithmetic.  However, if we are asked about
+ * page number zero, we don't want to hand InvalidTransactionId to
+ * TransactionIdPrecedes: it'll get weird about permanent xact IDs.  So,
+ * offset both xids by FirstNormalTransactionId to avoid that.
+ */
+static bool
+CommitTsPagePrecedes(int page1, int page2)
+{
+	TransactionId xid1;
+	TransactionId xid2;
+
+	xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
+	xid1 += FirstNormalTransactionId;
+	xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
+	xid2 += FirstNormalTransactionId;
+
+	return TransactionIdPrecedes(xid1, xid2);
+}
+
+
+/*
+ * Write a ZEROPAGE xlog record
+ */
+static void
+WriteZeroPageXlogRec(int pageno)
+{
+	XLogRecData rdata;
+
+	rdata.data = (char *) (&pageno);
+	rdata.len = sizeof(int);
+	rdata.buffer = InvalidBuffer;
+	rdata.next = NULL;
+	(void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE, &rdata);
+}
+
+/*
+ * Write a TRUNCATE xlog record
+ */
+static void
+WriteTruncateXlogRec(int pageno)
+{
+	XLogRecData rdata;
+
+	rdata.data = (char *) (&pageno);
+	rdata.len = sizeof(int);
+	rdata.buffer = InvalidBuffer;
+	rdata.next = NULL;
+	XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE, &rdata);
+}
+
+/*
+ * Write a SETTS xlog record
+ */
+static void
+WriteSetTimestampXlogRec(TransactionId mainxid, int nsubxids,
+						 TransactionId *subxids, TimestampTz timestamp,
+						 XLogRecPtr lsn, NodeIdRec nodeid)
+{
+	XLogRecData			rdata;
+	xl_commit_ts_set	record;
+
+	record.timestamp = timestamp;
+	record.lsn = lsn;
+	record.nodeid = nodeid;
+	record.mainxid = mainxid;
+	record.nsubxids = nsubxids;
+	memcpy(record.subxids, subxids, sizeof(TransactionId) * nsubxids);
+
+	rdata.data = (char *) &record;
+	rdata.len = offsetof(xl_commit_ts_set, subxids) +
+		nsubxids * sizeof(TransactionId);
+	rdata.buffer = InvalidBuffer;
+	rdata.next = NULL;
+	XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_SETTS, &rdata);
+}
+
+
+/*
+ * CommitTS resource manager's routines
+ */
+void
+commit_ts_redo(XLogRecPtr lsn, XLogRecord *record)
+{
+	uint8		info = record->xl_info & ~XLR_INFO_MASK;
+
+	/* Backup blocks are not used in commit_ts records */
+	Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK));
+
+	if (info == COMMIT_TS_ZEROPAGE)
+	{
+		int			pageno;
+		int			slotno;
+
+		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+
+		LWLockAcquire(CommitTsControlLock, LW_EXCLUSIVE);
+
+		slotno = ZeroCommitTsPage(pageno, false);
+		SimpleLruWritePage(CommitTsCtl, slotno);
+		Assert(!CommitTsCtl->shared->page_dirty[slotno]);
+
+		LWLockRelease(CommitTsControlLock);
+	}
+	else if (info == COMMIT_TS_TRUNCATE)
+	{
+		int			pageno;
+
+		memcpy(&pageno, XLogRecGetData(record), sizeof(int));
+
+		/*
+		 * During XLOG replay, latest_page_number isn't set up yet; insert a
+		 * suitable value to bypass the sanity test in SimpleLruTruncate.
+		 */
+		CommitTsCtl->shared->latest_page_number = pageno;
+
+		SimpleLruTruncate(CommitTsCtl, pageno);
+	}
+	else if (info == COMMIT_TS_SETTS)
+	{
+		xl_commit_ts_set *setts = (xl_commit_ts_set *) XLogRecGetData(record);
+
+		TransactionTreeSetCommitTsData(setts->mainxid, setts->nsubxids,
+									   setts->subxids, setts->timestamp,
+									   setts->lsn, setts->nodeid, false);
+	}
+	else
+		elog(PANIC, "commit_ts_redo: unknown op code %u", info);
+}
+
+/*
+ * Helper function for GUC
+ *
+ * Check if we can enable the track_commit_timestamp.
+ */
+bool
+check_track_commit_timestamp(bool *newval, void **extra, GucSource source)
+{
+	if (*newval && BLCKSZ < COMMIT_TS_MIN_BLCKSZ)
+	{
+		GUC_check_errmsg("Commit timestamps tacking cannot be enabled for builds with page size smaller than %d",
+						 COMMIT_TS_MIN_BLCKSZ);
+		return false;
+	}
+
+	return true;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index befd60f..f24861c 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -8,6 +8,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/committs.h"
 #include "access/gin.h"
 #include "access/gist_private.h"
 #include "access/hash.h"
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index d51cca4..d3287da 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -14,6 +14,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/committs.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -158,9 +159,10 @@ GetNewTransactionId(bool isSubXact)
 	 * XID before we zero the page.  Fortunately, a page of the commit log
 	 * holds 32K or more transactions, so we don't have to do this very often.
 	 *
-	 * Extend pg_subtrans too.
+	 * Extend pg_subtrans and pg_committs too.
 	 */
 	ExtendCLOG(xid);
+	ExtendCommitTs(xid);
 	ExtendSUBTRANS(xid);
 
 	/*
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 6f92bad..fc5f7c9 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -20,6 +20,7 @@
 #include <time.h>
 #include <unistd.h>
 
+#include "access/committs.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "access/transam.h"
@@ -1168,6 +1169,18 @@ RecordTransactionCommit(void)
 	}
 
 	/*
+	 * We don't need to log the commit timestamp separately since the commit
+	 * record logged above has all the necessary action to set the timestamp
+	 * again.
+	 */
+	if (markXidCommitted)
+	{
+		TransactionTreeSetCommitTsData(xid, nchildren, children,
+									   xactStopTimestamp, XactLastRecEnd,
+									   CommitTsDefaultNodeId, false);
+	}
+
+	/*
 	 * Check if we want to commit asynchronously.  We can allow the XLOG flush
 	 * to happen asynchronously if synchronous_commit=off, or if the current
 	 * transaction has not performed any WAL-logged operation.  The latter
@@ -4683,6 +4696,7 @@ xactGetCommittedChildren(TransactionId **ptr)
  */
 static void
 xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
+						  TimestampTz commit_time,
 						  TransactionId *sub_xids, int nsubxacts,
 						  SharedInvalidationMessage *inval_msgs, int nmsgs,
 						  RelFileNode *xnodes, int nrels,
@@ -4710,6 +4724,11 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		LWLockRelease(XidGenLock);
 	}
 
+	/* Set the transaction commit timestamp and metadata */
+	TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
+								   commit_time, lsn,
+								   CommitTsDefaultNodeId, false);
+
 	if (standbyState == STANDBY_DISABLED)
 	{
 		/*
@@ -4829,7 +4848,8 @@ xact_redo_commit(xl_xact_commit *xlrec,
 	/* invalidation messages array follows subxids */
 	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
 
-	xact_redo_commit_internal(xid, lsn, subxacts, xlrec->nsubxacts,
+	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
+							  subxacts, xlrec->nsubxacts,
 							  inval_msgs, xlrec->nmsgs,
 							  xlrec->xnodes, xlrec->nrels,
 							  xlrec->dbId,
@@ -4844,7 +4864,8 @@ static void
 xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
 						 TransactionId xid, XLogRecPtr lsn)
 {
-	xact_redo_commit_internal(xid, lsn, xlrec->subxacts, xlrec->nsubxacts,
+	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
+							  xlrec->subxacts, xlrec->nsubxacts,
 							  NULL, 0,	/* inval msgs */
 							  NULL, 0,	/* relfilenodes */
 							  InvalidOid,		/* dbId */
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 99f702c..02b1dca 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -22,6 +22,7 @@
 #include <unistd.h>
 
 #include "access/clog.h"
+#include "access/committs.h"
 #include "access/multixact.h"
 #include "access/rewriteheap.h"
 #include "access/subtrans.h"
@@ -4520,6 +4521,7 @@ BootStrapXLOG(void)
 	checkPoint.oldestXidDB = TemplateDbOid;
 	checkPoint.oldestMulti = FirstMultiXactId;
 	checkPoint.oldestMultiDB = TemplateDbOid;
+	checkPoint.oldestCommitTs = InvalidTransactionId;
 	checkPoint.time = (pg_time_t) time(NULL);
 	checkPoint.oldestActiveXid = InvalidTransactionId;
 
@@ -4529,6 +4531,7 @@ BootStrapXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+	SetCommitTsLimit(InvalidTransactionId);
 
 	/* Set up the XLOG page header */
 	page->xlp_magic = XLOG_PAGE_MAGIC;
@@ -4602,6 +4605,7 @@ BootStrapXLOG(void)
 	ControlFile->max_locks_per_xact = max_locks_per_xact;
 	ControlFile->wal_level = wal_level;
 	ControlFile->wal_log_hints = wal_log_hints;
+	ControlFile->track_commit_timestamp = track_commit_timestamp;
 	ControlFile->data_checksum_version = bootstrap_data_checksum_version;
 
 	/* some additional ControlFile fields are set in WriteControlFile() */
@@ -4610,6 +4614,7 @@ BootStrapXLOG(void)
 
 	/* Bootstrap the commit log, too */
 	BootStrapCLOG();
+	BootStrapCommitTs();
 	BootStrapSUBTRANS();
 	BootStrapMultiXact();
 
@@ -5858,6 +5863,9 @@ StartupXLOG(void)
 	ereport(DEBUG1,
 			(errmsg("oldest MultiXactId: %u, in database %u",
 					checkPoint.oldestMulti, checkPoint.oldestMultiDB)));
+	ereport(DEBUG1,
+			(errmsg("oldest commit timestamp Xid: %u",
+					checkPoint.oldestCommitTs)));
 	if (!TransactionIdIsNormal(checkPoint.nextXid))
 		ereport(PANIC,
 				(errmsg("invalid next transaction ID")));
@@ -5869,6 +5877,7 @@ StartupXLOG(void)
 	MultiXactSetNextMXact(checkPoint.nextMulti, checkPoint.nextMultiOffset);
 	SetTransactionIdLimit(checkPoint.oldestXid, checkPoint.oldestXidDB);
 	SetMultiXactIdLimit(checkPoint.oldestMulti, checkPoint.oldestMultiDB);
+	SetCommitTsLimit(checkPoint.oldestCommitTs);
 	MultiXactSetSafeTruncate(checkPoint.oldestMulti);
 	XLogCtl->ckptXidEpoch = checkPoint.nextXidEpoch;
 	XLogCtl->ckptXid = checkPoint.nextXid;
@@ -6091,11 +6100,12 @@ StartupXLOG(void)
 			ProcArrayInitRecovery(ShmemVariableCache->nextXid);
 
 			/*
-			 * Startup commit log and subtrans only. MultiXact has already
-			 * been started up and other SLRUs are not maintained during
-			 * recovery and need not be started yet.
+			 * Startup commit log, commit timestamp and subtrans
+			 * only. MultiXact has already been started up and other SLRUs are
+			 * not maintained during recovery and need not be started yet.
 			 */
 			StartupCLOG();
+			StartupCommitTs();
 			StartupSUBTRANS(oldestActiveXID);
 
 			/*
@@ -6742,12 +6752,13 @@ StartupXLOG(void)
 	LWLockRelease(ProcArrayLock);
 
 	/*
-	 * Start up the commit log and subtrans, if not already done for hot
-	 * standby.
+	 * Start up the commit log, commit timestamp and subtrans, if not already
+	 * done for hot standby.
 	 */
 	if (standbyState == STANDBY_DISABLED)
 	{
 		StartupCLOG();
+		StartupCommitTs();
 		StartupSUBTRANS(oldestActiveXID);
 	}
 
@@ -6783,6 +6794,12 @@ StartupXLOG(void)
 	XLogReportParameters();
 
 	/*
+	 * Local WAL inserts enables, so it's time to finish initialization
+	 * of commit timestamp.
+	 */
+	InitCommitTs();
+
+	/*
 	 * All done.  Allow backends to write WAL.  (Although the bool flag is
 	 * probably atomic in itself, we use the info_lck here to ensure that
 	 * there are no race conditions concerning visibility of other recent
@@ -7347,6 +7364,7 @@ ShutdownXLOG(int code, Datum arg)
 		CreateCheckPoint(CHECKPOINT_IS_SHUTDOWN | CHECKPOINT_IMMEDIATE);
 	}
 	ShutdownCLOG();
+	ShutdownCommitTs();
 	ShutdownSUBTRANS();
 	ShutdownMultiXact();
 
@@ -7674,6 +7692,10 @@ CreateCheckPoint(int flags)
 	checkPoint.oldestXidDB = ShmemVariableCache->oldestXidDB;
 	LWLockRelease(XidGenLock);
 
+	LWLockAcquire(CommitTsControlLock, LW_SHARED);
+	checkPoint.oldestCommitTs = ShmemVariableCache->oldestCommitTs;
+	LWLockRelease(CommitTsControlLock);
+
 	/* Increase XID epoch if we've wrapped around since last checkpoint */
 	checkPoint.nextXidEpoch = ControlFile->checkPointCopy.nextXidEpoch;
 	if (checkPoint.nextXid < ControlFile->checkPointCopy.nextXid)
@@ -7959,6 +7981,7 @@ static void
 CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
 {
 	CheckPointCLOG();
+	CheckPointCommitTs();
 	CheckPointSUBTRANS();
 	CheckPointMultiXact();
 	CheckPointPredicate();
@@ -8399,7 +8422,8 @@ XLogReportParameters(void)
 		MaxConnections != ControlFile->MaxConnections ||
 		max_worker_processes != ControlFile->max_worker_processes ||
 		max_prepared_xacts != ControlFile->max_prepared_xacts ||
-		max_locks_per_xact != ControlFile->max_locks_per_xact)
+		max_locks_per_xact != ControlFile->max_locks_per_xact ||
+		track_commit_timestamp != ControlFile->track_commit_timestamp)
 	{
 		/*
 		 * The change in number of backend slots doesn't need to be WAL-logged
@@ -8420,6 +8444,7 @@ XLogReportParameters(void)
 			xlrec.max_locks_per_xact = max_locks_per_xact;
 			xlrec.wal_level = wal_level;
 			xlrec.wal_log_hints = wal_log_hints;
+			xlrec.track_commit_timestamp = track_commit_timestamp;
 
 			rdata.buffer = InvalidBuffer;
 			rdata.data = (char *) &xlrec;
@@ -8436,6 +8461,7 @@ XLogReportParameters(void)
 		ControlFile->max_locks_per_xact = max_locks_per_xact;
 		ControlFile->wal_level = wal_level;
 		ControlFile->wal_log_hints = wal_log_hints;
+		ControlFile->track_commit_timestamp = track_commit_timestamp;
 		UpdateControlFile();
 	}
 }
@@ -8815,6 +8841,7 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
 		ControlFile->max_locks_per_xact = xlrec.max_locks_per_xact;
 		ControlFile->wal_level = xlrec.wal_level;
 		ControlFile->wal_log_hints = wal_log_hints;
+		ControlFile->track_commit_timestamp = track_commit_timestamp;
 
 		/*
 		 * Update minRecoveryPoint to ensure that if recovery is aborted, we
diff --git a/src/backend/commands/vacuum.c b/src/backend/commands/vacuum.c
index 6384dc7..23b5248 100644
--- a/src/backend/commands/vacuum.c
+++ b/src/backend/commands/vacuum.c
@@ -23,6 +23,7 @@
 #include <math.h>
 
 #include "access/clog.h"
+#include "access/committs.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/htup_details.h"
@@ -1071,10 +1072,12 @@ vac_truncate_clog(TransactionId frozenXID,
 		return;
 
 	/*
-	 * Truncate CLOG to the oldest computed value.  Note we don't truncate
-	 * multixacts; that will be done by the next checkpoint.
+	 * Truncate CLOG and CommitTs to the oldest computed value.
+	 * Note we don't truncate multixacts; that will be done by the next
+	 * checkpoint.
 	 */
 	TruncateCLOG(frozenXID);
+	TruncateCommitTs(frozenXID);
 
 	/*
 	 * Update the wrap limit for GetNewTransactionId and creation of new
@@ -1084,6 +1087,7 @@ vac_truncate_clog(TransactionId frozenXID,
 	 */
 	SetTransactionIdLimit(frozenXID, oldestxid_datoid);
 	SetMultiXactIdLimit(minMulti, minmulti_datoid);
+	SetCommitTsLimit(frozenXID);
 }
 
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8e78aaf..44898ab 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -133,6 +133,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
 		case RM_SEQ_ID:
 		case RM_SPGIST_ID:
 		case RM_BRIN_ID:
+		case RM_COMMIT_TS_ID:
 			break;
 		case RM_NEXT_ID:
 			elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 1d04c55..9025601 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -15,6 +15,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/committs.h"
 #include "access/heapam.h"
 #include "access/multixact.h"
 #include "access/nbtree.h"
@@ -117,6 +118,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, ProcGlobalShmemSize());
 		size = add_size(size, XLOGShmemSize());
 		size = add_size(size, CLOGShmemSize());
+		size = add_size(size, CommitTsShmemSize());
 		size = add_size(size, SUBTRANSShmemSize());
 		size = add_size(size, TwoPhaseShmemSize());
 		size = add_size(size, BackgroundWorkerShmemSize());
@@ -198,6 +200,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	 */
 	XLOGShmemInit();
 	CLOGShmemInit();
+	CommitTsShmemInit();
 	SUBTRANSShmemInit();
 	MultiXactShmemInit();
 	InitBufferPool();
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c
index 719181c..4b4b4bf 100644
--- a/src/backend/storage/lmgr/lwlock.c
+++ b/src/backend/storage/lmgr/lwlock.c
@@ -29,6 +29,7 @@
 #include "postgres.h"
 
 #include "access/clog.h"
+#include "access/committs.h"
 #include "access/multixact.h"
 #include "access/subtrans.h"
 #include "commands/async.h"
@@ -259,6 +260,9 @@ NumLWLocks(void)
 	/* clog.c needs one per CLOG buffer */
 	numLocks += CLOGShmemBuffers();
 
+	/* committs.c needs one per CommitTs buffer */
+	numLocks += CommitTsShmemBuffers();
+
 	/* subtrans.c needs one per SubTrans buffer */
 	numLocks += NUM_SUBTRANS_BUFFERS;
 
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index db65c76..df6c952 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -26,6 +26,7 @@
 #include <syslog.h>
 #endif
 
+#include "access/committs.h"
 #include "access/gin.h"
 #include "access/transam.h"
 #include "access/twophase.h"
@@ -826,6 +827,15 @@ static struct config_bool ConfigureNamesBool[] =
 		check_bonjour, NULL, NULL
 	},
 	{
+		{"track_commit_timestamp", PGC_POSTMASTER, REPLICATION,
+			gettext_noop("Collects transaction commit time."),
+			NULL
+		},
+		&track_commit_timestamp,
+		false,
+		check_track_commit_timestamp, NULL, NULL
+	},
+	{
 		{"ssl", PGC_POSTMASTER, CONN_AUTH_SECURITY,
 			gettext_noop("Enables SSL connections."),
 			NULL
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 6e8ea1e..4da89a6 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -227,6 +227,7 @@
 #wal_sender_timeout = 60s	# in milliseconds; 0 disables
 
 #max_replication_slots = 0	# max number of replication slots
+#track_commit_timestamp = off	# collect timestamp of transaction commit
 				# (change requires restart)
 
 # - Master Server -
diff --git a/src/bin/initdb/initdb.c b/src/bin/initdb/initdb.c
index dc1f1df..28e6dfd 100644
--- a/src/bin/initdb/initdb.c
+++ b/src/bin/initdb/initdb.c
@@ -185,6 +185,7 @@ static const char *subdirs[] = {
 	"pg_xlog",
 	"pg_xlog/archive_status",
 	"pg_clog",
+	"pg_commit_ts",
 	"pg_dynshmem",
 	"pg_notify",
 	"pg_serial",
diff --git a/src/bin/pg_controldata/pg_controldata.c b/src/bin/pg_controldata/pg_controldata.c
index b2e0793..a838bb5 100644
--- a/src/bin/pg_controldata/pg_controldata.c
+++ b/src/bin/pg_controldata/pg_controldata.c
@@ -270,6 +270,8 @@ main(int argc, char *argv[])
 		   ControlFile.checkPointCopy.oldestMulti);
 	printf(_("Latest checkpoint's oldestMulti's DB: %u\n"),
 		   ControlFile.checkPointCopy.oldestMultiDB);
+	printf(_("Latest checkpoint's oldestCommitTs:   %u\n"),
+		   ControlFile.checkPointCopy.oldestCommitTs);
 	printf(_("Time of latest checkpoint:            %s\n"),
 		   ckpttime_str);
 	printf(_("Fake LSN counter for unlogged rels:   %X/%X\n"),
@@ -300,6 +302,8 @@ main(int argc, char *argv[])
 		   ControlFile.max_prepared_xacts);
 	printf(_("Current max_locks_per_xact setting:   %d\n"),
 		   ControlFile.max_locks_per_xact);
+	printf(_("Current track_commit_timestamp setting: %s\n"),
+		   ControlFile.track_commit_timestamp ? _("on") : _("off"));
 	printf(_("Maximum data alignment:               %u\n"),
 		   ControlFile.maxAlign);
 	/* we don't print floatFormat since can't say much useful about it */
diff --git a/src/bin/pg_resetxlog/pg_resetxlog.c b/src/bin/pg_resetxlog/pg_resetxlog.c
index 2ba9946..a6bd8d5 100644
--- a/src/bin/pg_resetxlog/pg_resetxlog.c
+++ b/src/bin/pg_resetxlog/pg_resetxlog.c
@@ -63,6 +63,7 @@ static bool guessed = false;	/* T if we had to guess at any values */
 static const char *progname;
 static uint32 set_xid_epoch = (uint32) -1;
 static TransactionId set_xid = 0;
+static TransactionId set_commit_ts = 0;
 static Oid	set_oid = 0;
 static MultiXactId set_mxid = 0;
 static MultiXactOffset set_mxoff = (MultiXactOffset) -1;
@@ -112,7 +113,7 @@ main(int argc, char *argv[])
 	}
 
 
-	while ((c = getopt(argc, argv, "D:fl:m:no:O:x:e:")) != -1)
+	while ((c = getopt(argc, argv, "c:D:e:fl:m:no:O:x:")) != -1)
 	{
 		switch (c)
 		{
@@ -158,6 +159,21 @@ main(int argc, char *argv[])
 				}
 				break;
 
+			case 'c':
+				set_commit_ts = strtoul(optarg, &endptr, 0);
+				if (endptr == optarg || *endptr != '\0')
+				{
+					fprintf(stderr, _("%s: invalid argument for option -c\n"), progname);
+					fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
+					exit(1);
+				}
+				if (set_commit_ts == 0)
+				{
+					fprintf(stderr, _("%s: transaction ID (-c) must not be 0\n"), progname);
+					exit(1);
+				}
+				break;
+
 			case 'o':
 				set_oid = strtoul(optarg, &endptr, 0);
 				if (endptr == optarg || *endptr != '\0')
@@ -345,6 +361,9 @@ main(int argc, char *argv[])
 		ControlFile.checkPointCopy.oldestXidDB = InvalidOid;
 	}
 
+	if (set_commit_ts != 0)
+		ControlFile.checkPointCopy.oldestCommitTs = set_commit_ts;
+
 	if (set_oid != 0)
 		ControlFile.checkPointCopy.nextOid = set_oid;
 
@@ -539,6 +558,7 @@ GuessControlValues(void)
 
 	ControlFile.wal_level = WAL_LEVEL_MINIMAL;
 	ControlFile.wal_log_hints = false;
+	ControlFile.track_commit_timestamp = false;
 	ControlFile.MaxConnections = 100;
 	ControlFile.max_worker_processes = 8;
 	ControlFile.max_prepared_xacts = 0;
@@ -621,6 +641,8 @@ PrintControlValues(bool guessed)
 		   ControlFile.checkPointCopy.oldestMulti);
 	printf(_("Latest checkpoint's oldestMulti's DB: %u\n"),
 		   ControlFile.checkPointCopy.oldestMultiDB);
+	printf(_("Latest checkpoint's oldestCommitTs:   %u\n"),
+		   ControlFile.checkPointCopy.oldestCommitTs);
 	printf(_("Maximum data alignment:               %u\n"),
 		   ControlFile.maxAlign);
 	/* we don't print floatFormat since can't say much useful about it */
@@ -702,6 +724,12 @@ PrintNewControlValues()
 		printf(_("NextXID epoch:                        %u\n"),
 			   ControlFile.checkPointCopy.nextXidEpoch);
 	}
+
+	if (set_commit_ts != 0)
+	{
+		printf(_("oldestCommitTs:                       %u\n"),
+			   ControlFile.checkPointCopy.oldestCommitTs);
+	}
 }
 
 
@@ -739,6 +767,7 @@ RewriteControlFile(void)
 	 */
 	ControlFile.wal_level = WAL_LEVEL_MINIMAL;
 	ControlFile.wal_log_hints = false;
+	ControlFile.track_commit_timestamp = false;
 	ControlFile.MaxConnections = 100;
 	ControlFile.max_worker_processes = 8;
 	ControlFile.max_prepared_xacts = 0;
@@ -1095,6 +1124,7 @@ usage(void)
 	printf(_("%s resets the PostgreSQL transaction log.\n\n"), progname);
 	printf(_("Usage:\n  %s [OPTION]... {[-D] DATADIR}\n\n"), progname);
 	printf(_("Options:\n"));
+	printf(_("  -c XID           set the oldest transaction with retrievable commit timestamp\n"));
 	printf(_("  -e XIDEPOCH      set next transaction ID epoch\n"));
 	printf(_("  -f               force update to be done\n"));
 	printf(_("  -l XLOGFILE      force minimum WAL starting location for new transaction log\n"));
diff --git a/src/include/access/committs.h b/src/include/access/committs.h
new file mode 100644
index 0000000..04e8203
--- /dev/null
+++ b/src/include/access/committs.h
@@ -0,0 +1,75 @@
+/*
+ * committs.h
+ *
+ * PostgreSQL commit timestamp manager
+ *
+ * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/committs.h
+ */
+#ifndef COMMITTS_H
+#define COMMITTS_H
+
+#include "access/xlog.h"
+#include "datatype/timestamp.h"
+#include "utils/guc.h"
+
+extern PGDLLIMPORT bool	track_commit_timestamp;
+extern bool check_track_commit_timestamp(bool *newval, void **extra,
+										 GucSource source);
+
+typedef uint32 NodeIdRec;
+
+#define InvalidNodeId 0
+
+extern NodeIdRec CommitTsDefaultNodeId;
+
+extern void CommitTsSetDefaultNodeId(NodeIdRec nodeid);
+extern void TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
+										   TransactionId *subxids,
+										   TimestampTz timestamp,
+										   XLogRecPtr lsn,
+										   NodeIdRec nodeid,
+										   bool do_xlog);
+extern void TransactionIdGetCommitTsData(TransactionId xid,
+										 TimestampTz *ts,
+										 XLogRecPtr *lsn,
+										 NodeIdRec *nodeid);
+extern TransactionId GetLatestCommitTsData(TimestampTz *ts,
+										   XLogRecPtr *lsn,
+										   NodeIdRec *nodeid);
+
+extern Size CommitTsShmemBuffers(void);
+extern Size CommitTsShmemSize(void);
+extern void CommitTsShmemInit(void);
+extern void BootStrapCommitTs(void);
+extern void StartupCommitTs(void);
+extern void InitCommitTs(void);
+extern void ShutdownCommitTs(void);
+extern void CheckPointCommitTs(void);
+extern void ExtendCommitTs(TransactionId newestXact);
+extern void TruncateCommitTs(TransactionId oldestXact);
+extern void SetCommitTsLimit(TransactionId oldestXact);
+
+/* XLOG stuff */
+#define COMMIT_TS_ZEROPAGE		0x00
+#define COMMIT_TS_TRUNCATE		0x10
+#define COMMIT_TS_SETTS			0x20
+
+typedef struct xl_commit_ts_set
+{
+	TimestampTz		timestamp;
+	XLogRecPtr	    lsn;
+	NodeIdRec		nodeid;
+	TransactionId	mainxid;
+	int				nsubxids;
+	TransactionId	subxids[FLEXIBLE_ARRAY_MEMBER];
+} xl_commit_ts_set;
+
+
+extern void commit_ts_redo(XLogRecPtr lsn, XLogRecord *record);
+extern void commit_ts_desc(StringInfo buf, XLogRecord *record);
+extern const char *commit_ts_identify(uint8 info);
+
+#endif   /* COMMITTS_H */
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index 76a6421..27168c3 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -24,7 +24,7 @@
  * Changes to this list possibly need a XLOG_PAGE_MAGIC bump.
  */
 
-/* symbol name, textual name, redo, desc, startup, cleanup */
+/* symbol name, textual name, redo, desc, identify, startup, cleanup */
 PG_RMGR(RM_XLOG_ID, "XLOG", xlog_redo, xlog_desc, xlog_identify, NULL, NULL)
 PG_RMGR(RM_XACT_ID, "Transaction", xact_redo, xact_desc, xact_identify, NULL, NULL)
 PG_RMGR(RM_SMGR_ID, "Storage", smgr_redo, smgr_desc, smgr_identify, NULL, NULL)
@@ -43,3 +43,4 @@ PG_RMGR(RM_GIST_ID, "Gist", gist_redo, gist_desc, gist_identify, gist_xlog_start
 PG_RMGR(RM_SEQ_ID, "Sequence", seq_redo, seq_desc, seq_identify, NULL, NULL)
 PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_startup, spg_xlog_cleanup)
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
+PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 32d1b29..b59fd98 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -124,6 +124,11 @@ typedef struct VariableCacheData
 	Oid			oldestXidDB;	/* database with minimum datfrozenxid */
 
 	/*
+	 * These fields are protected by CommitTsControlLock
+	 */
+	TransactionId oldestCommitTs;
+
+	/*
 	 * These fields are protected by ProcArrayLock.
 	 */
 	TransactionId latestCompletedXid;	/* newest XID that has committed or
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 19b2ef8..56203b9 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -186,6 +186,7 @@ typedef struct xl_parameter_change
 	int			max_locks_per_xact;
 	int			wal_level;
 	bool		wal_log_hints;
+	bool		track_commit_timestamp;
 } xl_parameter_change;
 
 /* logs restore point */
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
index ba79d25..70afbd1 100644
--- a/src/include/catalog/pg_control.h
+++ b/src/include/catalog/pg_control.h
@@ -46,6 +46,7 @@ typedef struct CheckPoint
 	MultiXactId oldestMulti;	/* cluster-wide minimum datminmxid */
 	Oid			oldestMultiDB;	/* database with minimum datminmxid */
 	pg_time_t	time;			/* time stamp of checkpoint */
+	TransactionId oldestCommitTs; /* oldest Xid with valid commit timestamp */
 
 	/*
 	 * Oldest XID still running. This is only needed to initialize hot standby
@@ -176,6 +177,7 @@ typedef struct ControlFileData
 	int			max_worker_processes;
 	int			max_prepared_xacts;
 	int			max_locks_per_xact;
+	bool		track_commit_timestamp;
 
 	/*
 	 * This data is used to check for hardware-architecture compatibility of
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 5d4e889..47d9f01 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3017,6 +3017,15 @@ DESCR("view two-phase transactions");
 DATA(insert OID = 3819 (  pg_get_multixact_members PGNSP PGUID 12 1 1000 0 0 f f f f t t v 1 0 2249 "28" "{28,28,25}" "{i,o,o}" "{multixid,xid,mode}" _null_ pg_get_multixact_members _null_ _null_ _null_ ));
 DESCR("view members of a multixactid");
 
+DATA(insert OID = 3581 ( pg_xact_commit_timestamp PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 1184 "28" _null_ _null_ _null_ _null_ pg_xact_commit_timestamp _null_ _null_ _null_ ));
+DESCR("get commit timestamp of a transaction");
+
+DATA(insert OID = 3582 ( pg_xact_commit_timestamp_data PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 2249 "28" "{28,1184,3220}" "{i,o,o}" "{xid,timestamp,lsn}" _null_ pg_xact_commit_timestamp_data _null_ _null_ _null_ ));
+DESCR("get commit timestamp and lsn of a transaction");
+
+DATA(insert OID = 3583 ( pg_last_committed_xact PGNSP PGUID 12 1 0 0 0 f f f f t f s 0 0 2249 "" "{28,1184,3220}" "{o,o,o}" "{xid,timestamp,lsn}" _null_ pg_last_committed_xact _null_ _null_ _null_ ));
+DESCR("get transaction Id, commit timestamp and lsn of latest transaction commit");
+
 DATA(insert OID = 3537 (  pg_describe_object		PGNSP PGUID 12 1 0 0 0 f f f f t f s 3 0 25 "26 26 23" _null_ _null_ _null_ _null_ pg_describe_object _null_ _null_ _null_ ));
 DESCR("get identification of SQL object");
 
diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h
index 91cab87..09654a8 100644
--- a/src/include/storage/lwlock.h
+++ b/src/include/storage/lwlock.h
@@ -127,7 +127,10 @@ extern PGDLLIMPORT LWLockPadded *MainLWLockArray;
 #define AutoFileLock				(&MainLWLockArray[35].lock)
 #define ReplicationSlotAllocationLock	(&MainLWLockArray[36].lock)
 #define ReplicationSlotControlLock		(&MainLWLockArray[37].lock)
-#define NUM_INDIVIDUAL_LWLOCKS		38
+#define CommitTsControlLock			(&MainLWLockArray[38].lock)
+#define CommitTsLock				(&MainLWLockArray[39].lock)
+
+#define NUM_INDIVIDUAL_LWLOCKS		40
 
 /*
  * It's a bit odd to declare NUM_BUFFER_PARTITIONS and NUM_LOCK_PARTITIONS
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index 3ba34f8..2618e7e 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1182,6 +1182,11 @@ extern Datum pg_prepared_xact(PG_FUNCTION_ARGS);
 /* access/transam/multixact.c */
 extern Datum pg_get_multixact_members(PG_FUNCTION_ARGS);
 
+/* access/transam/committs.c */
+extern Datum pg_xact_commit_timestamp(PG_FUNCTION_ARGS);
+extern Datum pg_xact_commit_timestamp_data(PG_FUNCTION_ARGS);
+extern Datum pg_last_committed_xact(PG_FUNCTION_ARGS);
+
 /* catalogs/dependency.c */
 extern Datum pg_describe_object(PG_FUNCTION_ARGS);
 extern Datum pg_identify_object(PG_FUNCTION_ARGS);
diff --git a/src/test/regress/expected/committs.out b/src/test/regress/expected/committs.out
new file mode 100644
index 0000000..77d7a61
--- /dev/null
+++ b/src/test/regress/expected/committs.out
@@ -0,0 +1,21 @@
+--
+-- Commit Timestamp (off)
+--
+SHOW track_commit_timestamp;
+ track_commit_timestamp 
+------------------------
+ off
+(1 row)
+
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+ERROR:  Cannot get commit timestamp data because "track_commit_timestamp" is not enabled
+DROP TABLE committs_test;
diff --git a/src/test/regress/expected/committs_1.out b/src/test/regress/expected/committs_1.out
new file mode 100644
index 0000000..1457a27
--- /dev/null
+++ b/src/test/regress/expected/committs_1.out
@@ -0,0 +1,21 @@
+--
+-- Commit Timestamp (on)
+--
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+ id | ?column? | ?column? | ?column? 
+----+----------+----------+----------
+  1 | t        | t        | t
+  2 | t        | t        | t
+  3 | t        | t        | t
+(3 rows)
+
+DROP TABLE committs_test;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index d4f02e5..ec0a7c9 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -88,7 +88,7 @@ test: brin privileges security_label collate matview lock replica_identity rowse
 # ----------
 # Another group of parallel tests
 # ----------
-test: alter_generic misc psql async
+test: alter_generic misc psql async committs
 
 # rules cannot run concurrently with any test that creates a view
 test: rules
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 611b0a8..b0c4f39 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -148,3 +148,4 @@ test: largeobject
 test: with
 test: xml
 test: stats
+test: committs
diff --git a/src/test/regress/sql/committs.sql b/src/test/regress/sql/committs.sql
new file mode 100644
index 0000000..321a30a
--- /dev/null
+++ b/src/test/regress/sql/committs.sql
@@ -0,0 +1,20 @@
+--
+-- Commit Timestamp (off)
+--
+
+SHOW track_commit_timestamp;
+
+CREATE TABLE committs_test(id serial, ts timestamptz default now());
+
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+INSERT INTO committs_test DEFAULT VALUES;
+
+SELECT id,
+       pg_xact_commit_timestamp(xmin) >= ts,
+       pg_xact_commit_timestamp(xmin) < now(),
+       pg_xact_commit_timestamp(xmin) - ts < '60s' -- 60s should give a lot of reserve
+FROM committs_test
+ORDER BY id;
+
+DROP TABLE committs_test;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to