Hi,

Right now wal_level=logical implies that the compact commit record
format isn't used and similarly 2pc commits also include the non compact
format of commits.

In the course of the 'replication identifier' patch submitted to the
current commitfest I added more information to the non compact commit
record if a xinfo flag was present. I optionally adding data is a better
course than the rigorous split between compact/non compact commits.

In the attached patch I've merged compact/noncompact code, made aborts
use similar logic to avoid including useless bytes and used both for the
2pc equivalents.

To avoid using more space in the compact case the 'xinfo' field
indicating the presence of further data is only included when a byte in
the xl_info flag is set (similar to what heap rmgr does). That means
that transactions without subtransactions and other things are four
bytes smaller than before, but ones with a subtransaction but no other
complications 4 byte larger. database info, nsubxacts, nrels, nmsgs et
al are also only included if required. I think that's a overall win,
even disregarding wal_level = logical.

When compact commits were discussed in
http://archives.postgresql.org/message-id/235610.92468.qm%40web29004.mail.ird.yahoo.com
such a scheme was discussed, but then discarded. I think that was a
mistake; the information in commit records is likely to be growing and
having to decide between the compact/non compact form instead on a more
granular level makes decisions harder. The information about two forms
of commits already has creeped into too many places...

I generally like how the patch looks, the relevant code actually looks
clearer to me afterwards - especially RecordTransactionCommit() being
noticeably shorter, decode.c having to care about less and that
twophase.c knows a less about xact.c type records seems good.

There's one bit that I'm not so sure about though: To avoid duplication
I've added Parse(Commit/Abort)Record(), but unfortunately that has to be
available both in front and backend code - so it's currently living in
xactdesc.c. I think we can live with that, but it's certainly not
pretty.

I'm not going to rebase the replication identifier patch over this for
now, seems premature until some feedback is in.

Comments?

Greetings,

Andres Freund

-- 
 Andres Freund                     http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
>From 06bc3275788b494306c717019c9b1b6082ea579a Mon Sep 17 00:00:00 2001
From: Andres Freund <and...@anarazel.de>
Date: Fri, 20 Feb 2015 12:30:57 +0100
Subject: [PATCH] Debloat and deduplicate transaction commit/abort records.

---
 src/backend/access/rmgrdesc/xactdesc.c   | 189 +++++++++-----
 src/backend/access/transam/twophase.c    |  56 ++---
 src/backend/access/transam/xact.c        | 416 +++++++++++++++++++------------
 src/backend/access/transam/xlog.c        |  16 +-
 src/backend/replication/logical/decode.c | 101 +++-----
 src/include/access/xact.h                | 162 ++++++++----
 6 files changed, 564 insertions(+), 376 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 3e87978..0686d55 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -19,48 +19,143 @@
 #include "storage/sinval.h"
 #include "utils/timestamp.h"
 
+/* Parse the WAL format of a xact abort into a easier to understand format. */
+void
+ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed)
+{
+	char	   *data = ((char *) xlrec) + MinSizeOfXactCommit;
+
+	memset(parsed, 0, sizeof(*parsed));
+
+	parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+	if (info & XLOG_XACT_HAS_INFO)
+	{
+		parsed->xinfo = *(uint32 *) data;
+
+		data += sizeof(uint32);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_DBINFO)
+	{
+		xl_xact_dbinfo *xl_dbinfo = (xl_xact_dbinfo *) data;
+
+		parsed->dbId = xl_dbinfo->dbId;
+		parsed->tsId = xl_dbinfo->tsId;
+
+		data += sizeof(xl_xact_dbinfo);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		xl_xact_subxacts   *xl_subxacts = (xl_xact_subxacts *) data;
+
+		parsed->nsubxacts = xl_subxacts->nsubxacts;
+		parsed->subxacts = xl_subxacts->subxacts;
+
+		data += MinSizeOfXactSubxacts;
+		data += parsed->nsubxacts * sizeof(TransactionId);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+		parsed->nrels = xl_relfilenodes->nrels;
+		parsed->xnodes = xl_relfilenodes->xnodes;
+
+		data += MinSizeOfXactRelfilenodes;
+		data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_INVALS)
+	{
+		xl_xact_invals *xl_invals = (xl_xact_invals *) data;
+
+		parsed->nmsgs = xl_invals->nmsgs;
+		parsed->msgs = xl_invals->msgs;
+
+		data += MinSizeOfXactInvals;
+		data += xl_invals->nmsgs * sizeof(SharedInvalidationMessage);
+	}
+}
+
+/* Parse the WAL format of xact abort into a easier to understand format. */
+void
+ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed)
+{
+	char	   *data = ((char *) xlrec) + MinSizeOfXactAbort;
+
+	memset(parsed, 0, sizeof(*parsed));
+
+	parsed->xinfo = 0; /* default, if no XLOG_XACT_HAS_INFO is present */
+
+	if (info & XLOG_XACT_HAS_INFO)
+	{
+		parsed->xinfo = *(uint32 *) data;
+		data += sizeof(uint32);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		xl_xact_subxacts   *xl_subxacts = (xl_xact_subxacts *) data;
+
+		parsed->nsubxacts = xl_subxacts->nsubxacts;
+		parsed->subxacts = xl_subxacts->subxacts;
+
+		data += MinSizeOfXactSubxacts;
+		data += parsed->nsubxacts * sizeof(TransactionId);
+	}
+
+	if (parsed->xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+		parsed->nrels = xl_relfilenodes->nrels;
+		parsed->xnodes = xl_relfilenodes->xnodes;
+
+		data += MinSizeOfXactRelfilenodes;
+		data += xl_relfilenodes->nrels * sizeof(RelFileNode);
+	}
+}
 
 static void
-xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
+xact_desc_commit(StringInfo buf, uint8 info, xl_xact_commit *xlrec)
 {
+	xl_xact_parsed_commit parsed_c;
 	int			i;
-	TransactionId *subxacts;
 
-	subxacts = (TransactionId *) &xlrec->xnodes[xlrec->nrels];
+	ParseCommitRecord(info, xlrec, &parsed_c);
 
 	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
 
-	if (xlrec->nrels > 0)
+	if (parsed_c.nrels > 0)
 	{
 		appendStringInfoString(buf, "; rels:");
-		for (i = 0; i < xlrec->nrels; i++)
+		for (i = 0; i < parsed_c.nrels; i++)
 		{
-			char	   *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+			char	   *path = relpathperm(parsed_c.xnodes[i], MAIN_FORKNUM);
 
 			appendStringInfo(buf, " %s", path);
 			pfree(path);
 		}
 	}
-	if (xlrec->nsubxacts > 0)
+	if (parsed_c.nsubxacts > 0)
 	{
 		appendStringInfoString(buf, "; subxacts:");
-		for (i = 0; i < xlrec->nsubxacts; i++)
-			appendStringInfo(buf, " %u", subxacts[i]);
+		for (i = 0; i < parsed_c.nsubxacts; i++)
+			appendStringInfo(buf, " %u", parsed_c.subxacts[i]);
 	}
-	if (xlrec->nmsgs > 0)
+	if (parsed_c.nmsgs > 0)
 	{
-		SharedInvalidationMessage *msgs;
-
-		msgs = (SharedInvalidationMessage *) &subxacts[xlrec->nsubxacts];
-
-		if (XactCompletionRelcacheInitFileInval(xlrec->xinfo))
+		if (XactCompletionRelcacheInitFileInval(parsed_c.xinfo))
 			appendStringInfo(buf, "; relcache init file inval dbid %u tsid %u",
-							 xlrec->dbId, xlrec->tsId);
+							 parsed_c.dbId, parsed_c.tsId);
 
 		appendStringInfoString(buf, "; inval msgs:");
-		for (i = 0; i < xlrec->nmsgs; i++)
+		for (i = 0; i < parsed_c.nmsgs; i++)
 		{
-			SharedInvalidationMessage *msg = &msgs[i];
+			SharedInvalidationMessage *msg = &parsed_c.msgs[i];
 
 			if (msg->id >= 0)
 				appendStringInfo(buf, " catcache %d", msg->id);
@@ -83,45 +178,31 @@ xact_desc_commit(StringInfo buf, xl_xact_commit *xlrec)
 }
 
 static void
-xact_desc_commit_compact(StringInfo buf, xl_xact_commit_compact *xlrec)
+xact_desc_abort(StringInfo buf, uint8 info, xl_xact_abort *xlrec)
 {
+	xl_xact_parsed_abort parsed_a;
 	int			i;
 
-	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
-
-	if (xlrec->nsubxacts > 0)
-	{
-		appendStringInfoString(buf, "; subxacts:");
-		for (i = 0; i < xlrec->nsubxacts; i++)
-			appendStringInfo(buf, " %u", xlrec->subxacts[i]);
-	}
-}
-
-static void
-xact_desc_abort(StringInfo buf, xl_xact_abort *xlrec)
-{
-	int			i;
+	ParseAbortRecord(info, xlrec, &parsed_a);
 
 	appendStringInfoString(buf, timestamptz_to_str(xlrec->xact_time));
-	if (xlrec->nrels > 0)
+	if (parsed_a.nrels > 0)
 	{
 		appendStringInfoString(buf, "; rels:");
-		for (i = 0; i < xlrec->nrels; i++)
+		for (i = 0; i < parsed_a.nrels; i++)
 		{
-			char	   *path = relpathperm(xlrec->xnodes[i], MAIN_FORKNUM);
+			char	   *path = relpathperm(parsed_a.xnodes[i], MAIN_FORKNUM);
 
 			appendStringInfo(buf, " %s", path);
 			pfree(path);
 		}
 	}
-	if (xlrec->nsubxacts > 0)
-	{
-		TransactionId *xacts = (TransactionId *)
-		&xlrec->xnodes[xlrec->nrels];
 
+	if (parsed_a.nsubxacts > 0)
+	{
 		appendStringInfoString(buf, "; subxacts:");
-		for (i = 0; i < xlrec->nsubxacts; i++)
-			appendStringInfo(buf, " %u", xacts[i]);
+		for (i = 0; i < parsed_a.nsubxacts; i++)
+			appendStringInfo(buf, " %u", parsed_a.subxacts[i]);
 	}
 }
 
@@ -142,37 +223,33 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 	char	   *rec = XLogRecGetData(record);
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (info == XLOG_XACT_COMMIT_COMPACT)
-	{
-		xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) rec;
+	info &= XLOG_XACT_OPMASK;
 
-		xact_desc_commit_compact(buf, xlrec);
-	}
-	else if (info == XLOG_XACT_COMMIT)
+	if (info == XLOG_XACT_COMMIT)
 	{
 		xl_xact_commit *xlrec = (xl_xact_commit *) rec;
 
-		xact_desc_commit(buf, xlrec);
+		xact_desc_commit(buf, XLogRecGetInfo(record), xlrec);
 	}
 	else if (info == XLOG_XACT_ABORT)
 	{
 		xl_xact_abort *xlrec = (xl_xact_abort *) rec;
 
-		xact_desc_abort(buf, xlrec);
+		xact_desc_abort(buf, XLogRecGetInfo(record), xlrec);
 	}
 	else if (info == XLOG_XACT_COMMIT_PREPARED)
 	{
 		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) rec;
 
 		appendStringInfo(buf, "%u: ", xlrec->xid);
-		xact_desc_commit(buf, &xlrec->crec);
+		xact_desc_commit(buf, XLogRecGetInfo(record), &xlrec->crec);
 	}
 	else if (info == XLOG_XACT_ABORT_PREPARED)
 	{
 		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) rec;
 
 		appendStringInfo(buf, "%u: ", xlrec->xid);
-		xact_desc_abort(buf, &xlrec->arec);
+		xact_desc_abort(buf, XLogRecGetInfo(record), &xlrec->arec);
 	}
 	else if (info == XLOG_XACT_ASSIGNMENT)
 	{
@@ -193,7 +270,10 @@ xact_identify(uint8 info)
 {
 	const char *id = NULL;
 
-	switch (info & ~XLR_INFO_MASK)
+	info &= XLR_INFO_MASK;
+	info &= XLOG_XACT_OPMASK;
+
+	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
 			id = "COMMIT";
@@ -213,9 +293,6 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
-		case XLOG_XACT_COMMIT_COMPACT:
-			id = "COMMIT_COMPACT";
-			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 6c7029e..a69b9fc 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -2083,6 +2083,9 @@ RecordTransactionCommitPrepared(TransactionId xid,
 								bool initfileinval)
 {
 	xl_xact_commit_prepared xlrec;
+
+	uint8 info;
+
 	XLogRecPtr	recptr;
 
 	START_CRIT_SECTION();
@@ -2090,37 +2093,20 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	/* See notes in RecordTransactionCommit */
 	MyPgXact->delayChkpt = true;
 
-	/* Emit the XLOG commit record */
 	xlrec.xid = xid;
 
-	xlrec.crec.xinfo = initfileinval ? XACT_COMPLETION_UPDATE_RELCACHE_FILE : 0;
-
-	xlrec.crec.dbId = MyDatabaseId;
-	xlrec.crec.tsId = MyDatabaseTableSpace;
-
-	xlrec.crec.xact_time = GetCurrentTimestamp();
-	xlrec.crec.nrels = nrels;
-	xlrec.crec.nsubxacts = nchildren;
-	xlrec.crec.nmsgs = ninvalmsgs;
-
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared);
-
-	/* dump rels to delete */
-	if (nrels > 0)
-		XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
 
-	/* dump committed child Xids */
-	if (nchildren > 0)
-		XLogRegisterData((char *) children,
-						 nchildren * sizeof(TransactionId));
+	/* emit the twophase part of the record */
+	XLogRegisterData((char *) (&xlrec), offsetof(xl_xact_commit_prepared, crec));
 
-	/* dump cache invalidation messages */
-	if (ninvalmsgs > 0)
-		XLogRegisterData((char *) invalmsgs,
-						 ninvalmsgs * sizeof(SharedInvalidationMessage));
+	/* and then the embedded commit record */
+	info = XactEmitCommitRecord(GetCurrentTimestamp(),
+								nchildren, children, nrels, rels,
+								ninvalmsgs, invalmsgs,
+								initfileinval, false);
 
-	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED);
+	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED | info);
 
 	/*
 	 * We don't currently try to sleep before flush here ... nor is there any
@@ -2165,6 +2151,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 {
 	xl_xact_abort_prepared xlrec;
 	XLogRecPtr	recptr;
+	uint8 info;
 
 	/*
 	 * Catch the scenario where we aborted partway through
@@ -2178,23 +2165,18 @@ RecordTransactionAbortPrepared(TransactionId xid,
 
 	/* Emit the XLOG abort record */
 	xlrec.xid = xid;
-	xlrec.arec.xact_time = GetCurrentTimestamp();
-	xlrec.arec.nrels = nrels;
-	xlrec.arec.nsubxacts = nchildren;
 
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared);
 
-	/* dump rels to delete */
-	if (nrels > 0)
-		XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
+	/* emit the twophase part of the record */
+	XLogRegisterData((char *) (&xlrec), offsetof(xl_xact_abort_prepared, arec));
 
-	/* dump committed child Xids */
-	if (nchildren > 0)
-		XLogRegisterData((char *) children,
-						 nchildren * sizeof(TransactionId));
+	/* and then the embedded abort record */
+	info = XactEmitAbortRecord(GetCurrentTimestamp(),
+							   nchildren, children,
+							   nrels, rels);
 
-	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED);
+	recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED | info);
 
 	/* Always flush, since we're about to remove the 2PC state file */
 	XLogFlush(recptr);
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 97000ef..134f852 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -1041,6 +1041,8 @@ RecordTransactionCommit(void)
 	}
 	else
 	{
+		uint8				info = 0;
+
 		/*
 		 * Begin commit critical section and insert the commit XLOG record.
 		 */
@@ -1069,70 +1071,14 @@ RecordTransactionCommit(void)
 
 		SetCurrentTransactionStopTimestamp();
 
-		/*
-		 * Do we need the long commit record? If not, use the compact format.
-		 *
-		 * For now always use the non-compact version if wal_level=logical, so
-		 * we can hide commits from other databases. TODO: In the future we
-		 * should merge compact and non-compact commits and use a flags
-		 * variable to determine if it contains subxacts, relations or
-		 * invalidation messages, that's more extensible and degrades more
-		 * gracefully. Till then, it's just 20 bytes of overhead.
-		 */
-		if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit ||
-			XLogLogicalInfoActive())
-		{
-			xl_xact_commit xlrec;
-
-			/*
-			 * Set flags required for recovery processing of commits.
-			 */
-			xlrec.xinfo = 0;
-			if (RelcacheInitFileInval)
-				xlrec.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
-			if (forceSyncCommit)
-				xlrec.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
-
-			xlrec.dbId = MyDatabaseId;
-			xlrec.tsId = MyDatabaseTableSpace;
-
-			xlrec.xact_time = xactStopTimestamp;
-			xlrec.nrels = nrels;
-			xlrec.nsubxacts = nchildren;
-			xlrec.nmsgs = nmsgs;
-
-			XLogBeginInsert();
-			XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit);
-			/* dump rels to delete */
-			if (nrels > 0)
-				XLogRegisterData((char *) rels,
-								 nrels * sizeof(RelFileNode));
-			/* dump committed child Xids */
-			if (nchildren > 0)
-				XLogRegisterData((char *) children,
-								 nchildren * sizeof(TransactionId));
-			/* dump shared cache invalidation messages */
-			if (nmsgs > 0)
-				XLogRegisterData((char *) invalMessages,
-								 nmsgs * sizeof(SharedInvalidationMessage));
-			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT);
-		}
-		else
-		{
-			xl_xact_commit_compact xlrec;
-
-			xlrec.xact_time = xactStopTimestamp;
-			xlrec.nsubxacts = nchildren;
+		XLogBeginInsert();
 
-			XLogBeginInsert();
-			XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact);
-			/* dump committed child Xids */
-			if (nchildren > 0)
-				XLogRegisterData((char *) children,
-								 nchildren * sizeof(TransactionId));
+		info = XactEmitCommitRecord(xactStopTimestamp,
+									nchildren, children, nrels, rels,
+									nmsgs, invalMessages,
+									RelcacheInitFileInval, forceSyncCommit);
 
-			(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT);
-		}
+		(void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT | info);
 	}
 
 	/*
@@ -1419,7 +1365,8 @@ RecordTransactionAbort(bool isSubXact)
 	RelFileNode *rels;
 	int			nchildren;
 	TransactionId *children;
-	xl_xact_abort xlrec;
+	uint8		info = 0;
+	TimestampTz xact_time;
 
 	/*
 	 * If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1459,28 +1406,20 @@ RecordTransactionAbort(bool isSubXact)
 
 	/* Write the ABORT record */
 	if (isSubXact)
-		xlrec.xact_time = GetCurrentTimestamp();
+		xact_time = GetCurrentTimestamp();
 	else
 	{
 		SetCurrentTransactionStopTimestamp();
-		xlrec.xact_time = xactStopTimestamp;
+		xact_time = xactStopTimestamp;
 	}
-	xlrec.nrels = nrels;
-	xlrec.nsubxacts = nchildren;
 
 	XLogBeginInsert();
-	XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
 
-	/* dump rels to delete */
-	if (nrels > 0)
-		XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode));
-
-	/* dump committed child Xids */
-	if (nchildren > 0)
-		XLogRegisterData((char *) children,
-						 nchildren * sizeof(TransactionId));
+	XactEmitAbortRecord(xact_time,
+						nchildren, children,
+						nrels, rels);
 
-	(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT);
+	(void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT | info);
 
 	/*
 	 * Report the latest async abort LSN, so that the WAL writer knows to
@@ -4654,23 +4593,186 @@ xactGetCommittedChildren(TransactionId **ptr)
  *	XLOG support routines
  */
 
+
+/*
+ * Emit a commit record based on the passed in parameters. A xlog insertion
+ * already must have been started. The record isn't inserted though.
+ *
+ * Returns info bytes that have to be ORed with the record type passed to
+ * XLogInsert().
+ */
+uint8
+XactEmitCommitRecord(TimestampTz commit_time,
+					 int nsubxacts, TransactionId *subxacts,
+					 int nrels, RelFileNode *rels,
+					 int nmsgs, SharedInvalidationMessage *msgs,
+					 bool relcacheInval, bool forceSync)
+{
+	uint8						info = 0;
+	/* static, so they're valid until the caller does the XLogInsert() */
+	static uint32				xinfo;
+	static xl_xact_commit		xlrec;
+	static xl_xact_dbinfo		xl_dbinfo;
+	static xl_xact_subxacts		xl_subxacts;
+	static xl_xact_relfilenodes xl_relfilenodes;
+	static xl_xact_invals		xl_invals;
+
+	xinfo = 0;
+
+	/*
+	 * First figure out and collect all the information needed
+	 */
+	xlrec.xact_time = commit_time;
+
+	if (relcacheInval)
+		xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+	if (forceSyncCommit)
+		xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+
+	/*
+	 * Relcache invalidation requires information database and so does
+	 * logical decoding.
+	 */
+	if (nmsgs > 0 || XLogLogicalInfoActive())
+	{
+		xinfo |= XACT_XINFO_HAS_DBINFO;
+		xl_dbinfo.dbId = MyDatabaseId;
+		xl_dbinfo.tsId = MyDatabaseTableSpace;
+	}
+
+	if (nsubxacts > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_SUBXACTS;
+		xl_subxacts.nsubxacts = nsubxacts;
+	}
+
+	if (nrels > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_RELFILENODES;
+		xl_relfilenodes.nrels = nrels;
+	}
+
+	if (nmsgs > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_INVALS;
+		xl_invals.nmsgs = nmsgs;
+	}
+
+	if (xinfo != 0)
+		info |= XLOG_XACT_HAS_INFO;
+
+	/*
+	 * Then include all the collected data.
+	 */
+	XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+
+	if (xinfo != 0)
+		XLogRegisterData((char *) (&xinfo), sizeof(xinfo));
+
+	if (xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+
+	if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		XLogRegisterData((char *) (&xl_subxacts),
+						 MinSizeOfXactSubxacts);
+		XLogRegisterData((char *) subxacts,
+						 nsubxacts * sizeof(TransactionId));
+	}
+
+	if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		XLogRegisterData((char *) (&xl_relfilenodes),
+						 MinSizeOfXactRelfilenodes);
+		XLogRegisterData((char *) rels,
+						 nrels * sizeof(RelFileNode));
+	}
+
+	if (xinfo & XACT_XINFO_HAS_INVALS)
+	{
+		XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+		XLogRegisterData((char *) msgs,
+						 nmsgs * sizeof(SharedInvalidationMessage));
+	}
+
+	return info;
+}
+
+/*
+ * Emit, but don't insert, a abort record.
+ *
+ * See XactEmitCommitRecord for details.
+ */
+uint8
+XactEmitAbortRecord(TimestampTz abort_time,
+					int nsubxacts, TransactionId *subxacts,
+					int nrels, RelFileNode *rels)
+{
+	uint8						info = 0;
+	/* static, so they're valid until the caller does the XLogInsert() */
+	static uint32				xinfo;
+	static xl_xact_commit		xlrec;
+	static xl_xact_subxacts		xl_subxacts;
+	static xl_xact_relfilenodes xl_relfilenodes;
+
+	xinfo = 0;
+
+	/* collect data to log */
+	if (nsubxacts > 0)
+	{
+		xinfo |= XACT_XINFO_HAS_SUBXACTS;
+		xl_subxacts.nsubxacts = nsubxacts;
+	}
+
+   if (nrels > 0)
+   {
+	   xinfo |= XACT_XINFO_HAS_RELFILENODES;
+	   xl_relfilenodes.nrels = nrels;
+   }
+
+   if (xinfo != 0)
+	   info |= XLOG_XACT_HAS_INFO;
+
+   /* and actually log data */
+   XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+
+   if (xinfo != 0)
+	   XLogRegisterData((char *) (&xinfo), sizeof(xinfo));
+
+   if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+   {
+	   XLogRegisterData((char *) (&xl_subxacts),
+						MinSizeOfXactSubxacts);
+	   XLogRegisterData((char *) subxacts,
+						nsubxacts * sizeof(TransactionId));
+   }
+
+   if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+   {
+	   XLogRegisterData((char *) (&xl_relfilenodes),
+						MinSizeOfXactRelfilenodes);
+	   XLogRegisterData((char *) rels,
+						nrels * sizeof(RelFileNode));
+   }
+
+   return info;
+}
+
 /*
  * Before 9.0 this was a fairly short function, but now it performs many
  * actions for which the order of execution is critical.
  */
 static void
-xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
-						  TimestampTz commit_time,
-						  TransactionId *sub_xids, int nsubxacts,
-						  SharedInvalidationMessage *inval_msgs, int nmsgs,
-						  RelFileNode *xnodes, int nrels,
-						  Oid dbId, Oid tsId,
-						  uint32 xinfo)
+xact_redo_commit(uint8 info, xl_xact_commit *xlrec,
+				 TransactionId xid, XLogRecPtr lsn)
 {
+	xl_xact_parsed_commit parsed_c;
 	TransactionId max_xid;
 	int			i;
 
-	max_xid = TransactionIdLatest(xid, nsubxacts, sub_xids);
+	ParseCommitRecord(info, xlrec, &parsed_c);
+
+	max_xid = TransactionIdLatest(xid, parsed_c.nsubxacts, parsed_c.subxacts);
 
 	/*
 	 * Make sure nextXid is beyond any XID mentioned in the record.
@@ -4689,15 +4791,16 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 	}
 
 	/* Set the transaction commit timestamp and metadata */
-	TransactionTreeSetCommitTsData(xid, nsubxacts, sub_xids,
-								   commit_time, InvalidCommitTsNodeId, false);
+	TransactionTreeSetCommitTsData(xid, parsed_c.nsubxacts, parsed_c.subxacts,
+								   xlrec->xact_time, InvalidCommitTsNodeId,
+								   false);
 
 	if (standbyState == STANDBY_DISABLED)
 	{
 		/*
 		 * Mark the transaction committed in pg_clog.
 		 */
-		TransactionIdCommitTree(xid, nsubxacts, sub_xids);
+		TransactionIdCommitTree(xid, parsed_c.nsubxacts, parsed_c.subxacts);
 	}
 	else
 	{
@@ -4721,21 +4824,21 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		 * bits set on changes made by transactions that haven't yet
 		 * recovered. It's unlikely but it's good to be safe.
 		 */
-		TransactionIdAsyncCommitTree(xid, nsubxacts, sub_xids, lsn);
+		TransactionIdAsyncCommitTree(xid, parsed_c.nsubxacts, parsed_c.subxacts, lsn);
 
 		/*
 		 * We must mark clog before we update the ProcArray.
 		 */
-		ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
+		ExpireTreeKnownAssignedTransactionIds(xid, parsed_c.nsubxacts, parsed_c.subxacts, max_xid);
 
 		/*
 		 * Send any cache invalidations attached to the commit. We must
 		 * maintain the same order of invalidation then release locks as
 		 * occurs in CommitTransaction().
 		 */
-		ProcessCommittedInvalidationMessages(inval_msgs, nmsgs,
-								  XactCompletionRelcacheInitFileInval(xinfo),
-											 dbId, tsId);
+		ProcessCommittedInvalidationMessages(parsed_c.msgs, parsed_c.nmsgs,
+											 XactCompletionRelcacheInitFileInval(parsed_c.xinfo),
+											 parsed_c.dbId, parsed_c.tsId);
 
 		/*
 		 * Release locks, if any. We do this for both two phase and normal one
@@ -4748,7 +4851,7 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 	}
 
 	/* Make sure files supposed to be dropped are dropped */
-	if (nrels > 0)
+	if (parsed_c.nrels > 0)
 	{
 		/*
 		 * First update minimum recovery point to cover this WAL record. Once
@@ -4767,13 +4870,13 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 		 */
 		XLogFlush(lsn);
 
-		for (i = 0; i < nrels; i++)
+		for (i = 0; i < parsed_c.nrels; i++)
 		{
-			SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
+			SMgrRelation srel = smgropen(parsed_c.xnodes[i], InvalidBackendId);
 			ForkNumber	fork;
 
 			for (fork = 0; fork <= MAX_FORKNUM; fork++)
-				XLogDropRelation(xnodes[i], fork);
+				XLogDropRelation(parsed_c.xnodes[i], fork);
 			smgrdounlink(srel, true);
 			smgrclose(srel);
 		}
@@ -4791,52 +4894,12 @@ xact_redo_commit_internal(TransactionId xid, XLogRecPtr lsn,
 	 * minRecoveryPoint during recovery) helps to reduce that problem window,
 	 * for any user that requested ForceSyncCommit().
 	 */
-	if (XactCompletionForceSyncCommit(xinfo))
+	if (XactCompletionForceSyncCommit(parsed_c.xinfo))
 		XLogFlush(lsn);
 
 }
 
 /*
- * Utility function to call xact_redo_commit_internal after breaking down xlrec
- */
-static void
-xact_redo_commit(xl_xact_commit *xlrec,
-				 TransactionId xid, XLogRecPtr lsn)
-{
-	TransactionId *subxacts;
-	SharedInvalidationMessage *inval_msgs;
-
-	/* subxid array follows relfilenodes */
-	subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-	/* invalidation messages array follows subxids */
-	inval_msgs = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
-	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
-							  subxacts, xlrec->nsubxacts,
-							  inval_msgs, xlrec->nmsgs,
-							  xlrec->xnodes, xlrec->nrels,
-							  xlrec->dbId,
-							  xlrec->tsId,
-							  xlrec->xinfo);
-}
-
-/*
- * Utility function to call xact_redo_commit_internal  for compact form of message.
- */
-static void
-xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
-						 TransactionId xid, XLogRecPtr lsn)
-{
-	xact_redo_commit_internal(xid, lsn, xlrec->xact_time,
-							  xlrec->subxacts, xlrec->nsubxacts,
-							  NULL, 0,	/* inval msgs */
-							  NULL, 0,	/* relfilenodes */
-							  InvalidOid,		/* dbId */
-							  InvalidOid,		/* tsId */
-							  0);		/* xinfo */
-}
-
-/*
  * Be careful with the order of execution, as with xact_redo_commit().
  * The two functions are similar but differ in key places.
  *
@@ -4846,14 +4909,49 @@ xact_redo_commit_compact(xl_xact_commit_compact *xlrec,
  * because subtransaction commit is never WAL logged.
  */
 static void
-xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
+xact_redo_abort(uint8 info, xl_xact_abort *xlrec, TransactionId xid)
 {
+	int			i;
+	char	   *data = ((char *) xlrec) + MinSizeOfXactCommit;
 	TransactionId *sub_xids;
 	TransactionId max_xid;
-	int			i;
+	uint32		xinfo = 0;
+	int			nsubxacts = 0;
+	RelFileNode *xnodes = NULL;
+	int			nrels = 0;
+
+	if (info & XLOG_XACT_HAS_INFO)
+	{
+		xinfo = *(uint32 *) data;
+		data += sizeof(uint32);
+	}
+
+	if (xinfo & XACT_XINFO_HAS_SUBXACTS)
+	{
+		xl_xact_subxacts   *xl_subxacts = (xl_xact_subxacts *) data;
 
-	sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-	max_xid = TransactionIdLatest(xid, xlrec->nsubxacts, sub_xids);
+		nsubxacts = xl_subxacts->nsubxacts;
+		sub_xids = xl_subxacts->subxacts;
+
+		data += MinSizeOfXactSubxacts;
+		data += nsubxacts * sizeof(TransactionId);
+
+		max_xid = TransactionIdLatest(xid, xl_subxacts->nsubxacts,
+									  xl_subxacts->subxacts);
+	}
+	else
+		max_xid = xid;
+
+	if (xinfo & XACT_XINFO_HAS_RELFILENODES)
+	{
+		xl_xact_relfilenodes *xl_relfilenodes = (xl_xact_relfilenodes *) data;
+
+		data += MinSizeOfXactRelfilenodes;
+		data += xl_relfilenodes->nrels * sizeof(TransactionId);
+
+		nrels = xl_relfilenodes->nrels;
+		xnodes = xl_relfilenodes->xnodes;
+	}
 
 	/*
 	 * Make sure nextXid is beyond any XID mentioned in the record.
@@ -4874,7 +4972,7 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
 	if (standbyState == STANDBY_DISABLED)
 	{
 		/* Mark the transaction aborted in pg_clog, no need for async stuff */
-		TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+		TransactionIdAbortTree(xid, nsubxacts, sub_xids);
 	}
 	else
 	{
@@ -4890,12 +4988,12 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
 		RecordKnownAssignedTransactionIds(max_xid);
 
 		/* Mark the transaction aborted in pg_clog, no need for async stuff */
-		TransactionIdAbortTree(xid, xlrec->nsubxacts, sub_xids);
+		TransactionIdAbortTree(xid, nsubxacts, sub_xids);
 
 		/*
 		 * We must update the ProcArray after we have marked clog.
 		 */
-		ExpireTreeKnownAssignedTransactionIds(xid, xlrec->nsubxacts, sub_xids, max_xid);
+		ExpireTreeKnownAssignedTransactionIds(xid, nsubxacts, sub_xids, max_xid);
 
 		/*
 		 * There are no flat files that need updating, nor invalidation
@@ -4905,17 +5003,17 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid)
 		/*
 		 * Release locks, if any. There are no invalidations to send.
 		 */
-		StandbyReleaseLockTree(xid, xlrec->nsubxacts, sub_xids);
+		StandbyReleaseLockTree(xid, nsubxacts, sub_xids);
 	}
 
 	/* Make sure files supposed to be dropped are dropped */
-	for (i = 0; i < xlrec->nrels; i++)
+	for (i = 0; i < nrels; i++)
 	{
-		SMgrRelation srel = smgropen(xlrec->xnodes[i], InvalidBackendId);
+		SMgrRelation srel = smgropen(xnodes[i], InvalidBackendId);
 		ForkNumber	fork;
 
 		for (fork = 0; fork <= MAX_FORKNUM; fork++)
-			XLogDropRelation(xlrec->xnodes[i], fork);
+			XLogDropRelation(xnodes[i], fork);
 		smgrdounlink(srel, true);
 		smgrclose(srel);
 	}
@@ -4926,26 +5024,23 @@ xact_redo(XLogReaderState *record)
 {
 	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
+	info &= XLOG_XACT_OPMASK;
+
 	/* Backup blocks are not used in xact records */
 	Assert(!XLogRecHasAnyBlockRefs(record));
 
-	if (info == XLOG_XACT_COMMIT_COMPACT)
-	{
-		xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record);
-
-		xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr);
-	}
-	else if (info == XLOG_XACT_COMMIT)
+	if (info == XLOG_XACT_COMMIT)
 	{
 		xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record);
 
-		xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr);
+		xact_redo_commit(XLogRecGetInfo(record), xlrec,
+						 XLogRecGetXid(record), record->EndRecPtr);
 	}
 	else if (info == XLOG_XACT_ABORT)
 	{
 		xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record);
 
-		xact_redo_abort(xlrec, XLogRecGetXid(record));
+		xact_redo_abort(XLogRecGetInfo(record), xlrec, XLogRecGetXid(record));
 	}
 	else if (info == XLOG_XACT_PREPARE)
 	{
@@ -4957,14 +5052,15 @@ xact_redo(XLogReaderState *record)
 	{
 		xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record);
 
-		xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr);
+		xact_redo_commit(XLogRecGetInfo(record), &xlrec->crec, xlrec->xid,
+						 record->EndRecPtr);
 		RemoveTwoPhaseFile(xlrec->xid, false);
 	}
 	else if (info == XLOG_XACT_ABORT_PREPARED)
 	{
 		xl_xact_abort_prepared *xlrec = (xl_xact_abort_prepared *) XLogRecGetData(record);
 
-		xact_redo_abort(&xlrec->arec, xlrec->xid);
+		xact_redo_abort(XLogRecGetInfo(record), &xlrec->arec, xlrec->xid);
 		RemoveTwoPhaseFile(xlrec->xid, false);
 	}
 	else if (info == XLOG_XACT_ASSIGNMENT)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 629a457..f0e83ff 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -5106,11 +5106,6 @@ getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime)
 		*recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time;
 		return true;
 	}
-	if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT)
-	{
-		*recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time;
-		return true;
-	}
 	if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT)
 	{
 		*recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time;
@@ -5169,7 +5164,7 @@ recoveryStopsBefore(XLogReaderState *record)
 		return false;
 	record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 
-	if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT)
+	if (record_info == XLOG_XACT_COMMIT)
 	{
 		isCommit = true;
 		recordXid = XLogRecGetXid(record);
@@ -5289,8 +5284,7 @@ recoveryStopsAfter(XLogReaderState *record)
 	}
 
 	if (rmid == RM_XACT_ID &&
-		(record_info == XLOG_XACT_COMMIT_COMPACT ||
-		 record_info == XLOG_XACT_COMMIT ||
+		(record_info == XLOG_XACT_COMMIT ||
 		 record_info == XLOG_XACT_COMMIT_PREPARED ||
 		 record_info == XLOG_XACT_ABORT ||
 		 record_info == XLOG_XACT_ABORT_PREPARED))
@@ -5326,8 +5320,7 @@ recoveryStopsAfter(XLogReaderState *record)
 			recoveryStopTime = recordXtime;
 			recoveryStopName[0] = '\0';
 
-			if (record_info == XLOG_XACT_COMMIT_COMPACT ||
-				record_info == XLOG_XACT_COMMIT ||
+			if (record_info == XLOG_XACT_COMMIT ||
 				record_info == XLOG_XACT_COMMIT_PREPARED)
 			{
 				ereport(LOG,
@@ -5443,8 +5436,7 @@ recoveryApplyDelay(XLogReaderState *record)
 	 */
 	record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
 	if (!(XLogRecGetRmid(record) == RM_XACT_ID &&
-		  (record_info == XLOG_XACT_COMMIT_COMPACT ||
-		   record_info == XLOG_XACT_COMMIT ||
+		  (record_info == XLOG_XACT_COMMIT ||
 		   record_info == XLOG_XACT_COMMIT_PREPARED)))
 		return false;
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 77c02ba..8211daf 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,12 +64,9 @@ static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			 TransactionId xid, Oid dboid,
-			 TimestampTz commit_time,
-			 int nsubxacts, TransactionId *sub_xids,
-			 int ninval_msgs, SharedInvalidationMessage *msg);
-static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn,
-			TransactionId xid, TransactionId *sub_xids, int nsubxacts);
+						 xl_xact_commit *xlrec, TransactionId xid);
+static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+						xl_xact_abort *xlrec, TransactionId xid);
 
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
@@ -194,23 +191,17 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
+	info &= XLOG_XACT_OPMASK;
+
 	switch (info)
 	{
 		case XLOG_XACT_COMMIT:
 			{
 				xl_xact_commit *xlrec;
-				TransactionId *subxacts = NULL;
-				SharedInvalidationMessage *invals = NULL;
 
 				xlrec = (xl_xact_commit *) XLogRecGetData(r);
 
-				subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-				invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
-				DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
-							 xlrec->xact_time,
-							 xlrec->nsubxacts, subxacts,
-							 xlrec->nmsgs, invals);
+				DecodeCommit(ctx, buf, xlrec, XLogRecGetXid(r));
 
 				break;
 			}
@@ -218,63 +209,35 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				xl_xact_commit_prepared *prec;
 				xl_xact_commit *xlrec;
-				TransactionId *subxacts;
-				SharedInvalidationMessage *invals = NULL;
 
 				/* Prepared commits contain a normal commit record... */
 				prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
 				xlrec = &prec->crec;
 
-				subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-				invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
-
-				DecodeCommit(ctx, buf, prec->xid, xlrec->dbId,
-							 xlrec->xact_time,
-							 xlrec->nsubxacts, subxacts,
-							 xlrec->nmsgs, invals);
-
-				break;
-			}
-		case XLOG_XACT_COMMIT_COMPACT:
-			{
-				xl_xact_commit_compact *xlrec;
-
-				xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
+				DecodeCommit(ctx, buf, xlrec, prec->xid);
 
-				DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
-							 xlrec->xact_time,
-							 xlrec->nsubxacts, xlrec->subxacts,
-							 0, NULL);
 				break;
 			}
 		case XLOG_XACT_ABORT:
 			{
 				xl_xact_abort *xlrec;
-				TransactionId *sub_xids;
 
 				xlrec = (xl_xact_abort *) XLogRecGetData(r);
 
-				sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
-				DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
-							sub_xids, xlrec->nsubxacts);
+				DecodeAbort(ctx, buf, xlrec, XLogRecGetXid(r));
 				break;
 			}
 		case XLOG_XACT_ABORT_PREPARED:
 			{
 				xl_xact_abort_prepared *prec;
 				xl_xact_abort *xlrec;
-				TransactionId *sub_xids;
 
 				/* prepared abort contain a normal commit abort... */
 				prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
 				xlrec = &prec->arec;
 
-				sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
-
 				/* r->xl_xid is committed in a separate record */
-				DecodeAbort(ctx, buf->origptr, prec->xid,
-							sub_xids, xlrec->nsubxacts);
+				DecodeAbort(ctx, buf, xlrec, prec->xid);
 				break;
 			}
 
@@ -477,27 +440,27 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
  */
 static void
 DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
-			 TransactionId xid, Oid dboid,
-			 TimestampTz commit_time,
-			 int nsubxacts, TransactionId *sub_xids,
-			 int ninval_msgs, SharedInvalidationMessage *msgs)
+			 xl_xact_commit *xlrec, TransactionId xid)
 {
+	xl_xact_parsed_commit parsed_c;
 	int			i;
 
+	ParseCommitRecord(XLogRecGetInfo(buf->record), xlrec, &parsed_c);
+
 	/*
 	 * Process invalidation messages, even if we're not interested in the
 	 * transaction's contents, since the various caches need to always be
 	 * consistent.
 	 */
-	if (ninval_msgs > 0)
+	if (parsed_c.nmsgs > 0)
 	{
 		ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
-									  ninval_msgs, msgs);
+									  parsed_c.nmsgs, parsed_c.msgs);
 		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
 	}
 
 	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
-					   nsubxacts, sub_xids);
+					   parsed_c.nsubxacts, parsed_c.subxacts);
 
 	/* ----
 	 * Check whether we are interested in this specific transaction, and tell
@@ -524,12 +487,11 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	 * ---
 	 */
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
-		(dboid != InvalidOid && dboid != ctx->slot->data.database))
+		(parsed_c.dbId != InvalidOid && parsed_c.dbId != ctx->slot->data.database))
 	{
-		for (i = 0; i < nsubxacts; i++)
+		for (i = 0; i < parsed_c.nsubxacts; i++)
 		{
-			ReorderBufferForget(ctx->reorder, *sub_xids, buf->origptr);
-			sub_xids++;
+			ReorderBufferForget(ctx->reorder, parsed_c.subxacts[i], buf->origptr);
 		}
 		ReorderBufferForget(ctx->reorder, xid, buf->origptr);
 
@@ -537,16 +499,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 	}
 
 	/* tell the reorderbuffer about the surviving subtransactions */
-	for (i = 0; i < nsubxacts; i++)
+	for (i = 0; i < parsed_c.nsubxacts; i++)
 	{
-		ReorderBufferCommitChild(ctx->reorder, xid, *sub_xids,
+		ReorderBufferCommitChild(ctx->reorder, xid, parsed_c.subxacts[i],
 								 buf->origptr, buf->endptr);
-		sub_xids++;
 	}
 
 	/* replay actions of all transaction + subtransactions in order */
 	ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
-						commit_time);
+						xlrec->xact_time);
 }
 
 /*
@@ -554,20 +515,24 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
  * snapbuild.c and reorderbuffer.c
  */
 static void
-DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
-			TransactionId *sub_xids, int nsubxacts)
+DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			xl_xact_abort *xlrec, TransactionId xid)
 {
+	xl_xact_parsed_abort parsed_a;
 	int			i;
 
-	SnapBuildAbortTxn(ctx->snapshot_builder, lsn, xid, nsubxacts, sub_xids);
+	ParseAbortRecord(XLogRecGetInfo(buf->record), xlrec, &parsed_a);
+
+	SnapBuildAbortTxn(ctx->snapshot_builder, buf->record->EndRecPtr, xid,
+					  parsed_a.nsubxacts, parsed_a.subxacts);
 
-	for (i = 0; i < nsubxacts; i++)
+	for (i = 0; i < parsed_a.nsubxacts; i++)
 	{
-		ReorderBufferAbort(ctx->reorder, *sub_xids, lsn);
-		sub_xids++;
+		ReorderBufferAbort(ctx->reorder, parsed_a.subxacts[i],
+						   buf->record->EndRecPtr);
 	}
 
-	ReorderBufferAbort(ctx->reorder, xid, lsn);
+	ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
 }
 
 /*
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index d7e5f64..122a383 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -18,6 +18,7 @@
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
 #include "storage/relfilenode.h"
+#include "storage/sinval.h"
 #include "utils/datetime.h"
 
 
@@ -103,8 +104,8 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
  */
 
 /*
- * XLOG allows to store some information in high 4 bits of log
- * record xl_info field
+ * XLOG allows to store some information in high 4 bits of log record xl_info
+ * field. We use 3 for the opcode, and one about an optional flag variable.
  */
 #define XLOG_XACT_COMMIT			0x00
 #define XLOG_XACT_PREPARE			0x10
@@ -112,7 +113,36 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-#define XLOG_XACT_COMMIT_COMPACT	0x60
+/* free opcode 0x60 */
+/* free opcode 0x70 */
+
+#define XLOG_XACT_OPMASK			0x70
+
+#define XLOG_XACT_HAS_INFO			0x80
+
+/*
+ * The following flags, stored in xinfo, determine which information is
+ * contained in commit/abort records.
+ */
+#define XACT_XINFO_HAS_DBINFO			(1U << 0)
+#define XACT_XINFO_HAS_SUBXACTS			(1U << 1)
+#define XACT_XINFO_HAS_RELFILENODES		(1U << 2)
+#define XACT_XINFO_HAS_INVALS			(1U << 3)
+
+/*
+ * Also stored in xinfo, these indicating a variety of additional actions that
+ * need to occur when emulating transaction effects during recovery.
+ *
+ * They are named XactCompletion... to differentiate them from
+ * EOXact... routines which run at the end of the original transaction
+ * completion.
+ */
+#define XACT_COMPLETION_UPDATE_RELCACHE_FILE	(1U << 30)
+#define XACT_COMPLETION_FORCE_SYNC_COMMIT		(1U << 31)
+
+/* Access macros for above flags */
+#define XactCompletionRelcacheInitFileInval(xinfo)	(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
+#define XactCompletionForceSyncCommit(xinfo)		(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
 
 typedef struct xl_xact_assignment
 {
@@ -123,61 +153,62 @@ typedef struct xl_xact_assignment
 
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
-typedef struct xl_xact_commit_compact
+/* sub-records for commit/abort */
+typedef struct xl_xact_dbinfo
+{
+	Oid			dbId;			/* MyDatabaseId */
+	Oid			tsId;			/* MyDatabaseTableSpace */
+} xl_xact_dbinfo;
+
+typedef struct xl_xact_subxacts
 {
-	TimestampTz xact_time;		/* time of commit */
 	int			nsubxacts;		/* number of subtransaction XIDs */
-	/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
 	TransactionId subxacts[FLEXIBLE_ARRAY_MEMBER];
-} xl_xact_commit_compact;
+} xl_xact_subxacts;
 
-#define MinSizeOfXactCommitCompact offsetof(xl_xact_commit_compact, subxacts)
+#define MinSizeOfXactSubxacts offsetof(xl_xact_subxacts, subxacts)
 
-typedef struct xl_xact_commit
+typedef struct xl_xact_relfilenodes
 {
-	TimestampTz xact_time;		/* time of commit */
-	uint32		xinfo;			/* info flags */
-	int			nrels;			/* number of RelFileNodes */
-	int			nsubxacts;		/* number of subtransaction XIDs */
-	int			nmsgs;			/* number of shared inval msgs */
-	Oid			dbId;			/* MyDatabaseId */
-	Oid			tsId;			/* MyDatabaseTableSpace */
-	/* Array of RelFileNode(s) to drop at commit */
+	int			nrels;		/* number of subtransaction XIDs */
 	RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
-	/* ARRAY OF COMMITTED SUBTRANSACTION XIDs FOLLOWS */
-	/* ARRAY OF SHARED INVALIDATION MESSAGES FOLLOWS */
-} xl_xact_commit;
+} xl_xact_relfilenodes;
 
-#define MinSizeOfXactCommit offsetof(xl_xact_commit, xnodes)
+#define MinSizeOfXactRelfilenodes offsetof(xl_xact_relfilenodes, xnodes)
 
-/*
- * These flags are set in the xinfo fields of WAL commit records,
- * indicating a variety of additional actions that need to occur
- * when emulating transaction effects during recovery.
- * They are named XactCompletion... to differentiate them from
- * EOXact... routines which run at the end of the original
- * transaction completion.
- */
-#define XACT_COMPLETION_UPDATE_RELCACHE_FILE	0x01
-#define XACT_COMPLETION_FORCE_SYNC_COMMIT		0x02
+typedef struct xl_xact_invals
+{
+	int			nmsgs;			/* number of shared inval msgs */
+	SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+} xl_xact_invals;
 
-/* Access macros for above flags */
-#define XactCompletionRelcacheInitFileInval(xinfo)	(xinfo & XACT_COMPLETION_UPDATE_RELCACHE_FILE)
-#define XactCompletionForceSyncCommit(xinfo)		(xinfo & XACT_COMPLETION_FORCE_SYNC_COMMIT)
+#define MinSizeOfXactInvals offsetof(xl_xact_invals, msgs)
+
+typedef struct xl_xact_commit
+{
+	TimestampTz xact_time;		/* time of commit */
+
+	/* xinfo flags follows if XLOG_XACT_HAS_INFO */
+	/* xl_xact_dbinfo follows if XINFO_HAS_DBINFO */
+	/* xl_xact_subxacts follows if XINFO_HAS_SUBXACT  */
+	/* xl_xact_relfilenodes follows if XINFO_HAS_RELFILENODES  */
+	/* xl_xact_invals follows if XINFO_HAS_INVALS  */
+} xl_xact_commit;
+
+#define MinSizeOfXactCommit (offsetof(xl_xact_commit, xact_time) + sizeof(TimestampTz))
 
 typedef struct xl_xact_abort
 {
 	TimestampTz xact_time;		/* time of abort */
-	int			nrels;			/* number of RelFileNodes */
-	int			nsubxacts;		/* number of subtransaction XIDs */
-	/* Array of RelFileNode(s) to drop at abort */
-	RelFileNode xnodes[FLEXIBLE_ARRAY_MEMBER];
-	/* ARRAY OF ABORTED SUBTRANSACTION XIDs FOLLOWS */
+
+	/* xinfo flags follows if XLOG_XACT_HAS_INFO */
+	/* xl_xact_subxacts follows if HAS_SUBXACT  */
+	/* xl_xact_relfilenodes follows if HAS_RELFILENODES  */
 } xl_xact_abort;
 
 /* Note the intentional lack of an invalidation message array c.f. commit */
 
-#define MinSizeOfXactAbort offsetof(xl_xact_abort, xnodes)
+#define MinSizeOfXactAbort sizeof(xl_xact_abort)
 
 /*
  * COMMIT_PREPARED and ABORT_PREPARED are identical to COMMIT/ABORT records
@@ -192,8 +223,6 @@ typedef struct xl_xact_commit_prepared
 	/* MORE DATA FOLLOWS AT END OF STRUCT */
 } xl_xact_commit_prepared;
 
-#define MinSizeOfXactCommitPrepared offsetof(xl_xact_commit_prepared, crec.xnodes)
-
 typedef struct xl_xact_abort_prepared
 {
 	TransactionId xid;			/* XID of prepared xact */
@@ -201,7 +230,39 @@ typedef struct xl_xact_abort_prepared
 	/* MORE DATA FOLLOWS AT END OF STRUCT */
 } xl_xact_abort_prepared;
 
-#define MinSizeOfXactAbortPrepared offsetof(xl_xact_abort_prepared, arec.xnodes)
+/*
+ * Commit/Abort records in the above form are a bit verbose to parse, so
+ * there's a deconstructed versions generated by ParseCommit/AbortRecord().
+ */
+typedef struct xl_xact_parsed_commit
+{
+	TimestampTz		xact_time;
+	uint32			xinfo;
+
+	Oid			dbId;			/* MyDatabaseId */
+	Oid			tsId;			/* MyDatabaseTableSpace */
+
+	int				nsubxacts;
+	TransactionId  *subxacts;
+
+	int				nrels;
+	RelFileNode	   *xnodes;
+
+	int				nmsgs;
+	SharedInvalidationMessage *msgs;
+} xl_xact_parsed_commit;
+
+typedef struct xl_xact_parsed_abort
+{
+	TimestampTz		xact_time;
+	uint32			xinfo;
+
+	int				nsubxacts;
+	TransactionId  *subxacts;
+
+	int				nrels;
+	RelFileNode	   *xnodes;
+} xl_xact_parsed_abort;
 
 
 /* ----------------
@@ -256,8 +317,23 @@ extern void UnregisterSubXactCallback(SubXactCallback callback, void *arg);
 
 extern int	xactGetCommittedChildren(TransactionId **ptr);
 
+extern uint8 XactEmitCommitRecord(TimestampTz commit_time,
+								  int nsubxacts, TransactionId *subxacts,
+								  int nrels, RelFileNode *rels,
+								  int nmsgs, SharedInvalidationMessage *msgs,
+								  bool relcacheInval, bool forceSync);
+
+extern uint8 XactEmitAbortRecord(TimestampTz abort_time,
+								 int nsubxacts, TransactionId *subxacts,
+								 int nrels, RelFileNode *rels);
 extern void xact_redo(XLogReaderState *record);
+
+/* xactdesc.c */
 extern void xact_desc(StringInfo buf, XLogReaderState *record);
 extern const char *xact_identify(uint8 info);
 
+/* also in xactdesc.c, so they can be shared between front/backend code */
+extern void ParseCommitRecord(uint8 info, xl_xact_commit *xlrec, xl_xact_parsed_commit *parsed);
+extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_abort *parsed);
+
 #endif   /* XACT_H */
-- 
2.3.0.149.gf3f4077.dirty

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to