From 180b6076ecb00cdc2c9b9a7ca1bdd1c7e21f7ee2 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Thu, 30 Mar 2023 20:02:16 +0200
Subject: [PATCH v11 1/2] Create a path for separate xlog record construction
 and insertion

The current record insertion routines are likely to be called inside a
transaction, which means they shouldn't throw errors unless there is
something catastrophically wrong, as it'd cause the server to reboot.
In some cases that is the right thing to do; however, not all WAL records
are created equal, and some cases failing to build the expected WAL
record could easily be resolved by rolling back the transaction.

This prepares the code for XLog record size validation, where the
XLogRecordAssemble() code may throw errors if WAL record size
constraints are violated.
---
 src/backend/access/transam/multixact.c  |   5 +
 src/backend/access/transam/twophase.c   |  58 +++--
 src/backend/access/transam/xact.c       | 298 +++++++++++++-----------
 src/backend/access/transam/xlog.c       |   2 +-
 src/backend/access/transam/xloginsert.c | 122 ++++++++++
 src/backend/commands/sequence.c         |  18 ++
 src/include/access/xact.h               |  64 +++--
 src/include/access/xlog.h               |   1 +
 src/include/access/xloginsert.h         |   3 +
 src/include/access/xlogrecord.h         |  11 +
 10 files changed, 408 insertions(+), 174 deletions(-)

diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c
index fe6698d5ff..78d6563df8 100644
--- a/src/backend/access/transam/multixact.c
+++ b/src/backend/access/transam/multixact.c
@@ -832,6 +832,11 @@ MultiXactIdCreateFromMembers(int nmembers, MultiXactMember *members)
 	 * find a more compact representation of this Xlog record -- perhaps all
 	 * the status flags in one XLogRecData, then all the xids in another one?
 	 * Not clear that it's worth the trouble though.
+	 * 
+	 * XXX: large values for nmembers may mean the data would not fit in the
+	 * max xlog record size, in which case this would panic. However, that is
+	 * quite unlikely as such record would log more than 250M members - much
+	 * more than we can currently process efficiently.
 	 */
 	XLogBeginInsert();
 	XLogRegisterData((char *) (&xlrec), SizeOfMultiXactCreate);
diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c
index 068e59bec0..f98d79aa29 100644
--- a/src/backend/access/transam/twophase.c
+++ b/src/backend/access/transam/twophase.c
@@ -1181,18 +1181,24 @@ EndPrepare(GlobalTransaction gxact)
 	 */
 	XLogEnsureRecordSpace(0, records.num_chunks);
 
-	START_CRIT_SECTION();
-
-	Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
-	MyProc->delayChkptFlags |= DELAY_CHKPT_START;
-
+	/*
+	 * Start preparing the record for insertion. Do so outside the critical
+	 * section to make sure we won't throw errors due to oversized registered
+	 * data.
+	 */
 	XLogBeginInsert();
 	for (record = records.head; record != NULL; record = record->next)
 		XLogRegisterData(record->data, record->len);
 
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
+	XLogPrepareInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+
+	START_CRIT_SECTION();
+
+	Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+	MyProc->delayChkptFlags |= DELAY_CHKPT_START;
 
-	gxact->prepare_end_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE);
+	gxact->prepare_end_lsn = XLogInsertPrepared();
 
 	if (replorigin)
 	{
@@ -2294,6 +2300,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	XLogRecPtr	recptr;
 	TimestampTz committs = GetCurrentTimestamp();
 	bool		replorigin;
+	xl_xact_commit_fields recdata;
 
 	/*
 	 * Are we using the replication origins feature?  Or, in other words, are
@@ -2302,6 +2309,22 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	replorigin = (replorigin_session_origin != InvalidRepOriginId &&
 				  replorigin_session_origin != DoNotReplicateId);
 
+	/*
+	 * We can't assume that the data we're trying to fit in the WAL record is
+	 * going to fit in our XLog record format, due to the arbitrary data we're
+	 * registering. Because xlog record creation will throw an ERROR, we should
+	 * create the record outside the critical section: we don't want to crash
+	 * the backend because of oversized xlog records - we "just" want to error
+	 * out and revert the transaction.
+	 */
+	XactPrepareCommitRecord(committs,
+							nchildren, children, nrels, rels,
+							nstats, stats,
+							ninvalmsgs, invalmsgs,
+							initfileinval,
+							MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
+							xid, gid, &recdata);
+
 	START_CRIT_SECTION();
 
 	/* See notes in RecordTransactionCommit */
@@ -2313,13 +2336,7 @@ RecordTransactionCommitPrepared(TransactionId xid,
 	 * potentially having AccessExclusiveLocks since we don't know whether or
 	 * not they do.
 	 */
-	recptr = XactLogCommitRecord(committs,
-								 nchildren, children, nrels, rels,
-								 nstats, stats,
-								 ninvalmsgs, invalmsgs,
-								 initfileinval,
-								 MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								 xid, gid);
+	recptr = XactLogCommitRecord();
 
 
 	if (replorigin)
@@ -2388,6 +2405,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 {
 	XLogRecPtr	recptr;
 	bool		replorigin;
+	xl_xact_abort_fields recdata;
 
 	/*
 	 * Are we using the replication origins feature?  Or, in other words, are
@@ -2404,6 +2422,13 @@ RecordTransactionAbortPrepared(TransactionId xid,
 		elog(PANIC, "cannot abort transaction %u, it was already committed",
 			 xid);
 
+	XactPrepareAbortRecord(GetCurrentTimestamp(),
+						   nchildren, children,
+						   nrels, rels,
+						   nstats, stats,
+						   MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
+						   xid, gid, &recdata);
+
 	START_CRIT_SECTION();
 
 	/*
@@ -2411,12 +2436,7 @@ RecordTransactionAbortPrepared(TransactionId xid,
 	 * potentially having AccessExclusiveLocks since we don't know whether or
 	 * not they do.
 	 */
-	recptr = XactLogAbortRecord(GetCurrentTimestamp(),
-								nchildren, children,
-								nrels, rels,
-								nstats, stats,
-								MyXactFlags | XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK,
-								xid, gid);
+	recptr = XactLogAbortRecord();
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 01b1e0fb8c..79c0448889 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -247,6 +247,10 @@ static TransactionStateData TopTransactionStateData = {
 /*
  * unreportedXids holds XIDs of all subtransactions that have not yet been
  * reported in an XLOG_XACT_ASSIGNMENT record.
+ *
+ * Increasing the size of the unreportedXids cache should be limited to
+ * a reasonable degree, as these xids may need to be fit into a WAL-record,
+ * which have a limited size.
  */
 static int	nUnreportedXids;
 static TransactionId unreportedXids[PGPROC_MAX_CACHED_SUBXIDS];
@@ -1365,6 +1369,7 @@ RecordTransactionCommit(void)
 	else
 	{
 		bool		replorigin;
+		xl_xact_commit_fields recdata;
 
 		/*
 		 * Are we using the replication origins feature?  Or, in other words,
@@ -1376,7 +1381,14 @@ RecordTransactionCommit(void)
 		/*
 		 * Begin commit critical section and insert the commit XLOG record.
 		 */
-
+		XactPrepareCommitRecord(GetCurrentTransactionStopTimestamp(),
+								nchildren, children, nrels, rels,
+								ndroppedstats, droppedstats,
+								nmsgs, invalMessages,
+								RelcacheInitFileInval,
+								MyXactFlags,
+								InvalidTransactionId, NULL /* plain commit */,
+								&recdata);
 		/*
 		 * Mark ourselves as within our "commit critical section".  This
 		 * forces any concurrent checkpoint to wait until we've updated
@@ -1398,13 +1410,7 @@ RecordTransactionCommit(void)
 		START_CRIT_SECTION();
 		MyProc->delayChkptFlags |= DELAY_CHKPT_START;
 
-		XactLogCommitRecord(GetCurrentTransactionStopTimestamp(),
-							nchildren, children, nrels, rels,
-							ndroppedstats, droppedstats,
-							nmsgs, invalMessages,
-							RelcacheInitFileInval,
-							MyXactFlags,
-							InvalidTransactionId, NULL /* plain commit */ );
+		XactLogCommitRecord();
 
 		if (replorigin)
 			/* Move LSNs forward for this replication origin */
@@ -1712,6 +1718,7 @@ RecordTransactionAbort(bool isSubXact)
 	TransactionId *children;
 	TimestampTz xact_time;
 	bool		replorigin;
+	xl_xact_abort_fields recdata;
 
 	/*
 	 * If we haven't been assigned an XID, nobody will care whether we aborted
@@ -1754,10 +1761,6 @@ RecordTransactionAbort(bool isSubXact)
 	nchildren = xactGetCommittedChildren(&children);
 	ndroppedstats = pgstat_get_transactional_drops(false, &droppedstats);
 
-	/* XXX do we really need a critical section here? */
-	START_CRIT_SECTION();
-
-	/* Write the ABORT record */
 	if (isSubXact)
 		xact_time = GetCurrentTimestamp();
 	else
@@ -1765,12 +1768,18 @@ RecordTransactionAbort(bool isSubXact)
 		xact_time = GetCurrentTransactionStopTimestamp();
 	}
 
-	XactLogAbortRecord(xact_time,
-					   nchildren, children,
-					   nrels, rels,
-					   ndroppedstats, droppedstats,
-					   MyXactFlags, InvalidTransactionId,
-					   NULL);
+	XactPrepareAbortRecord(xact_time,
+						   nchildren, children,
+						   nrels, rels,
+						   ndroppedstats, droppedstats,
+						   MyXactFlags, InvalidTransactionId,
+						   NULL, &recdata);
+
+	/* XXX do we really need a critical section here? */
+	START_CRIT_SECTION();
+
+	/* Write the ABORT record */
+	XactLogAbortRecord();
 
 	if (replorigin)
 		/* Move LSNs forward for this replication origin */
@@ -5628,37 +5637,34 @@ xactGetCommittedChildren(TransactionId **ptr)
  *	XLOG support routines
  */
 
-
 /*
- * Log the commit record for a plain or twophase transaction commit.
+ * Prepare the commit record for a plain or twophase transaction commit.
  *
  * A 2pc commit will be emitted when twophase_xid is valid, a plain one
  * otherwise.
+ *
+ * This is separated from writing the record to WAL due to the arbitrary
+ * size of the record potentially resulting in an ERROR whilst constructing
+ * the record, which (when called from critical sections) would panic.
+ * This separation allows us to prepare the insert outside critical sections,
+ * preventing potential crashes.
  */
-XLogRecPtr
-XactLogCommitRecord(TimestampTz commit_time,
-					int nsubxacts, TransactionId *subxacts,
-					int nrels, RelFileLocator *rels,
-					int ndroppedstats, xl_xact_stats_item *droppedstats,
-					int nmsgs, SharedInvalidationMessage *msgs,
-					bool relcacheInval,
-					int xactflags, TransactionId twophase_xid,
-					const char *twophase_gid)
+void
+XactPrepareCommitRecord(TimestampTz commit_time,
+						int nsubxacts, TransactionId *subxacts,
+						int nrels, RelFileLocator *rels,
+						int ndroppedstats, xl_xact_stats_item *droppedstats,
+						int nmsgs, SharedInvalidationMessage *msgs,
+						bool relcacheInval,
+						int xactflags, TransactionId twophase_xid,
+						const char *twophase_gid,
+						xl_xact_commit_fields *recdata)
 {
-	xl_xact_commit xlrec;
-	xl_xact_xinfo xl_xinfo;
-	xl_xact_dbinfo xl_dbinfo;
-	xl_xact_subxacts xl_subxacts;
-	xl_xact_relfilelocators xl_relfilelocators;
-	xl_xact_stats_items xl_dropped_stats;
-	xl_xact_invals xl_invals;
-	xl_xact_twophase xl_twophase;
-	xl_xact_origin xl_origin;
 	uint8		info;
 
-	Assert(CritSectionCount > 0);
+	Assert(CritSectionCount == 0);
 
-	xl_xinfo.xinfo = 0;
+	recdata->xl_xinfo.xinfo = 0;
 
 	/* decide between a plain and 2pc commit */
 	if (!TransactionIdIsValid(twophase_xid))
@@ -5668,21 +5674,21 @@ XactLogCommitRecord(TimestampTz commit_time,
 
 	/* First figure out and collect all the information needed */
 
-	xlrec.xact_time = commit_time;
+	recdata->xlrec.xact_time = commit_time;
 
 	if (relcacheInval)
-		xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
+		recdata->xl_xinfo.xinfo |= XACT_COMPLETION_UPDATE_RELCACHE_FILE;
 	if (forceSyncCommit)
-		xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
+		recdata->xl_xinfo.xinfo |= XACT_COMPLETION_FORCE_SYNC_COMMIT;
 	if ((xactflags & XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK))
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS;
 
 	/*
 	 * Check if the caller would like to ask standbys for immediate feedback
 	 * once this commit is applied.
 	 */
 	if (synchronous_commit >= SYNCHRONOUS_COMMIT_REMOTE_APPLY)
-		xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
+		recdata->xl_xinfo.xinfo |= XACT_COMPLETION_APPLY_FEEDBACK;
 
 	/*
 	 * Relcache invalidations requires information about the current database
@@ -5690,145 +5696,154 @@ XactLogCommitRecord(TimestampTz commit_time,
 	 */
 	if (nmsgs > 0 || XLogLogicalInfoActive())
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
-		xl_dbinfo.dbId = MyDatabaseId;
-		xl_dbinfo.tsId = MyDatabaseTableSpace;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		recdata->xl_dbinfo.dbId = MyDatabaseId;
+		recdata->xl_dbinfo.tsId = MyDatabaseTableSpace;
 	}
 
 	if (nsubxacts > 0)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
-		xl_subxacts.nsubxacts = nsubxacts;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+		recdata->xl_subxacts.nsubxacts = nsubxacts;
 	}
 
 	if (nrels > 0)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILELOCATORS;
-		xl_relfilelocators.nrels = nrels;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILELOCATORS;
+		recdata->xl_relfilelocators.nrels = nrels;
 		info |= XLR_SPECIAL_REL_UPDATE;
 	}
 
 	if (ndroppedstats > 0)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
-		xl_dropped_stats.nitems = ndroppedstats;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
+		recdata->xl_dropped_stats.nitems = ndroppedstats;
 	}
 
 	if (nmsgs > 0)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
-		xl_invals.nmsgs = nmsgs;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_INVALS;
+		recdata->xl_invals.nmsgs = nmsgs;
 	}
 
 	if (TransactionIdIsValid(twophase_xid))
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
-		xl_twophase.xid = twophase_xid;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+		recdata->xl_twophase.xid = twophase_xid;
 		Assert(twophase_gid != NULL);
 
 		if (XLogLogicalInfoActive())
-			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
 	}
 
 	/* dump transaction origin information */
 	if (replorigin_session_origin != InvalidRepOriginId)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		recdata->xl_origin.origin_lsn = replorigin_session_origin_lsn;
+		recdata->xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
-	if (xl_xinfo.xinfo != 0)
+	if (recdata->xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
 	/* Then include all the collected data into the commit record. */
 
 	XLogBeginInsert();
 
-	XLogRegisterData((char *) (&xlrec), sizeof(xl_xact_commit));
+	XLogRegisterData((char *) (&recdata->xlrec), sizeof(xl_xact_commit));
 
-	if (xl_xinfo.xinfo != 0)
-		XLogRegisterData((char *) (&xl_xinfo.xinfo), sizeof(xl_xinfo.xinfo));
+	if (recdata->xl_xinfo.xinfo != 0)
+		XLogRegisterData((char *) (&recdata->xl_xinfo.xinfo), sizeof(recdata->xl_xinfo.xinfo));
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
-		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&recdata->xl_dbinfo), sizeof(recdata->xl_dbinfo));
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
-		XLogRegisterData((char *) (&xl_subxacts),
+		XLogRegisterData((char *) (&recdata->xl_subxacts),
 						 MinSizeOfXactSubxacts);
 		XLogRegisterData((char *) subxacts,
 						 nsubxacts * sizeof(TransactionId));
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILELOCATORS)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILELOCATORS)
 	{
-		XLogRegisterData((char *) (&xl_relfilelocators),
+		XLogRegisterData((char *) (&recdata->xl_relfilelocators),
 						 MinSizeOfXactRelfileLocators);
 		XLogRegisterData((char *) rels,
 						 nrels * sizeof(RelFileLocator));
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
 	{
-		XLogRegisterData((char *) (&xl_dropped_stats),
+		XLogRegisterData((char *) (&recdata->xl_dropped_stats),
 						 MinSizeOfXactStatsItems);
 		XLogRegisterData((char *) droppedstats,
 						 ndroppedstats * sizeof(xl_xact_stats_item));
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_INVALS)
 	{
-		XLogRegisterData((char *) (&xl_invals), MinSizeOfXactInvals);
+		XLogRegisterData((char *) (&recdata->xl_invals), MinSizeOfXactInvals);
 		XLogRegisterData((char *) msgs,
 						 nmsgs * sizeof(SharedInvalidationMessage));
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
-		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
-		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+		XLogRegisterData((char *) (&recdata->xl_twophase), sizeof(xl_xact_twophase));
+		if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
 			XLogRegisterData(unconstify(char *, twophase_gid), strlen(twophase_gid) + 1);
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
-		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+		XLogRegisterData((char *) (&recdata->xl_origin), sizeof(xl_xact_origin));
 
 	/* we allow filtering by xacts */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	XLogPrepareInsert(RM_XACT_ID, info);
 }
 
 /*
- * Log the commit record for a plain or twophase transaction abort.
+ * Log the prepared XLogCommit record.
+ * See XactPrepareCommitRecord for more info.
+ */
+XLogRecPtr
+XactLogCommitRecord(void)
+{
+	Assert(CritSectionCount > 0);
+	return XLogInsertPrepared();
+}
+
+/*
+ * Prepare the commit record for a plain or twophase transaction abort.
  *
  * A 2pc abort will be emitted when twophase_xid is valid, a plain one
  * otherwise.
+ *
+ * This is separated from writing the record to WAL due to the arbitrary
+ * size of the record potentially resulting in an ERROR whilst constructing
+ * the record, which (when called from critical sections) would panic.
+ * This separation allows us to prepare the insert outside critical sections,
+ * preventing potential crashes.
  */
-XLogRecPtr
-XactLogAbortRecord(TimestampTz abort_time,
-				   int nsubxacts, TransactionId *subxacts,
-				   int nrels, RelFileLocator *rels,
-				   int ndroppedstats, xl_xact_stats_item *droppedstats,
-				   int xactflags, TransactionId twophase_xid,
-				   const char *twophase_gid)
+void
+XactPrepareAbortRecord(TimestampTz abort_time,
+					   int nsubxacts, TransactionId *subxacts,
+					   int nrels, RelFileLocator *rels,
+					   int ndroppedstats, xl_xact_stats_item *droppedstats,
+					   int xactflags, TransactionId twophase_xid,
+					   const char *twophase_gid,
+					   xl_xact_abort_fields *recdata)
 {
-	xl_xact_abort xlrec;
-	xl_xact_xinfo xl_xinfo;
-	xl_xact_subxacts xl_subxacts;
-	xl_xact_relfilelocators xl_relfilelocators;
-	xl_xact_stats_items xl_dropped_stats;
-	xl_xact_twophase xl_twophase;
-	xl_xact_dbinfo xl_dbinfo;
-	xl_xact_origin xl_origin;
-
 	uint8		info;
 
-	Assert(CritSectionCount > 0);
+	Assert(CritSectionCount == 0);
 
-	xl_xinfo.xinfo = 0;
+	recdata->xl_xinfo.xinfo = 0;
 
 	/* decide between a plain and 2pc abort */
 	if (!TransactionIdIsValid(twophase_xid))
@@ -5839,45 +5854,45 @@ XactLogAbortRecord(TimestampTz abort_time,
 
 	/* First figure out and collect all the information needed */
 
-	xlrec.xact_time = abort_time;
+	recdata->xlrec.xact_time = abort_time;
 
 	if ((xactflags & XACT_FLAGS_ACQUIREDACCESSEXCLUSIVELOCK))
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_AE_LOCKS;
 
 	if (nsubxacts > 0)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
-		xl_subxacts.nsubxacts = nsubxacts;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_SUBXACTS;
+		recdata->xl_subxacts.nsubxacts = nsubxacts;
 	}
 
 	if (nrels > 0)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILELOCATORS;
-		xl_relfilelocators.nrels = nrels;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_RELFILELOCATORS;
+		recdata->xl_relfilelocators.nrels = nrels;
 		info |= XLR_SPECIAL_REL_UPDATE;
 	}
 
 	if (ndroppedstats > 0)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
-		xl_dropped_stats.nitems = ndroppedstats;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_DROPPED_STATS;
+		recdata->xl_dropped_stats.nitems = ndroppedstats;
 	}
 
 	if (TransactionIdIsValid(twophase_xid))
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
-		xl_twophase.xid = twophase_xid;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_TWOPHASE;
+		recdata->xl_twophase.xid = twophase_xid;
 		Assert(twophase_gid != NULL);
 
 		if (XLogLogicalInfoActive())
-			xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
+			recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_GID;
 	}
 
 	if (TransactionIdIsValid(twophase_xid) && XLogLogicalInfoActive())
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
-		xl_dbinfo.dbId = MyDatabaseId;
-		xl_dbinfo.tsId = MyDatabaseTableSpace;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_DBINFO;
+		recdata->xl_dbinfo.dbId = MyDatabaseId;
+		recdata->xl_dbinfo.tsId = MyDatabaseTableSpace;
 	}
 
 	/*
@@ -5886,65 +5901,76 @@ XactLogAbortRecord(TimestampTz abort_time,
 	 */
 	if (replorigin_session_origin != InvalidRepOriginId)
 	{
-		xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
+		recdata->xl_xinfo.xinfo |= XACT_XINFO_HAS_ORIGIN;
 
-		xl_origin.origin_lsn = replorigin_session_origin_lsn;
-		xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
+		recdata->xl_origin.origin_lsn = replorigin_session_origin_lsn;
+		recdata->xl_origin.origin_timestamp = replorigin_session_origin_timestamp;
 	}
 
-	if (xl_xinfo.xinfo != 0)
+	if (recdata->xl_xinfo.xinfo != 0)
 		info |= XLOG_XACT_HAS_INFO;
 
 	/* Then include all the collected data into the abort record. */
 
 	XLogBeginInsert();
 
-	XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort);
+	XLogRegisterData((char *) (&recdata->xlrec), MinSizeOfXactAbort);
 
-	if (xl_xinfo.xinfo != 0)
-		XLogRegisterData((char *) (&xl_xinfo), sizeof(xl_xinfo));
+	if (recdata->xl_xinfo.xinfo != 0)
+		XLogRegisterData((char *) (&recdata->xl_xinfo), sizeof(recdata->xl_xinfo));
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
-		XLogRegisterData((char *) (&xl_dbinfo), sizeof(xl_dbinfo));
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_DBINFO)
+		XLogRegisterData((char *) (&recdata->xl_dbinfo), sizeof(recdata->xl_dbinfo));
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_SUBXACTS)
 	{
-		XLogRegisterData((char *) (&xl_subxacts),
+		XLogRegisterData((char *) (&recdata->xl_subxacts),
 						 MinSizeOfXactSubxacts);
 		XLogRegisterData((char *) subxacts,
 						 nsubxacts * sizeof(TransactionId));
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILELOCATORS)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_RELFILELOCATORS)
 	{
-		XLogRegisterData((char *) (&xl_relfilelocators),
+		XLogRegisterData((char *) (&recdata->xl_relfilelocators),
 						 MinSizeOfXactRelfileLocators);
 		XLogRegisterData((char *) rels,
 						 nrels * sizeof(RelFileLocator));
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_DROPPED_STATS)
 	{
-		XLogRegisterData((char *) (&xl_dropped_stats),
+		XLogRegisterData((char *) (&recdata->xl_dropped_stats),
 						 MinSizeOfXactStatsItems);
 		XLogRegisterData((char *) droppedstats,
 						 ndroppedstats * sizeof(xl_xact_stats_item));
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_TWOPHASE)
 	{
-		XLogRegisterData((char *) (&xl_twophase), sizeof(xl_xact_twophase));
-		if (xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
+		XLogRegisterData((char *) (&recdata->xl_twophase), sizeof(xl_xact_twophase));
+		if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_GID)
 			XLogRegisterData(unconstify(char *, twophase_gid), strlen(twophase_gid) + 1);
 	}
 
-	if (xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
-		XLogRegisterData((char *) (&xl_origin), sizeof(xl_xact_origin));
+	if (recdata->xl_xinfo.xinfo & XACT_XINFO_HAS_ORIGIN)
+		XLogRegisterData((char *) (&recdata->xl_origin), sizeof(xl_xact_origin));
 
 	/* Include the replication origin */
 	XLogSetRecordFlags(XLOG_INCLUDE_ORIGIN);
 
-	return XLogInsert(RM_XACT_ID, info);
+	XLogPrepareInsert(RM_XACT_ID, info);
+}
+
+/*
+ * Log the prepared XLogAbort record.
+ * See XactPrepareAbortRecord for more info.
+ */
+XLogRecPtr
+XactLogAbortRecord(void)
+{
+	Assert(CritSectionCount > 0);
+	return XLogInsertPrepared();
 }
 
 /*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f9f0f6db8d..b283563f6e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -821,7 +821,7 @@ XLogInsertRecord(XLogRecData *rdata,
 	doPageWrites = (Insert->fullPageWrites || Insert->runningBackups > 0);
 
 	if (doPageWrites &&
-		(!prevDoPageWrites ||
+		(((!prevDoPageWrites) && ((flags & XLOG_NOPAGEWRITE_USED) != 0)) ||
 		 (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr)))
 	{
 		/*
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 008612e032..134cf32bb4 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -132,6 +132,17 @@ static int	max_rdatas;			/* allocated size */
 
 static bool begininsert_called = false;
 
+/*
+ * Used in split creation / logging of the xlog record, when the caller
+ * wants to protect against >maxsized xlog records errors while inserting the
+ * record in a critical section.
+ */
+static bool prepareinsert_called = false;
+static XLogRecData *prepared_record = NULL;
+static XLogRecPtr preprec_fpw_lsn = InvalidXLogRecPtr;
+static bool	preprec_topxid_included;
+static int	preprec_num_fpw;
+
 /* Memory context to hold the registered buffer and data references. */
 static MemoryContext xloginsert_cxt;
 
@@ -233,6 +244,8 @@ XLogResetInsertion(void)
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
 	begininsert_called = false;
+	prepareinsert_called = false;
+	prepared_record = NULL;
 }
 
 /*
@@ -433,6 +446,8 @@ void
 XLogSetRecordFlags(uint8 flags)
 {
 	Assert(begininsert_called);
+	/* don't modify the flags after preparing the record for insertion */
+	Assert(!prepareinsert_called);
 	curinsert_flags |= flags;
 }
 
@@ -506,6 +521,107 @@ XLogInsert(RmgrId rmid, uint8 info)
 	return EndPos;
 }
 
+/*
+ * Prepare to insert an XLOG record having the specified RMID and info bytes,
+ * with the body of the record being the data and buffer references registered
+ * earlier with XLogRegister* calls.
+ * 
+ * The user must later call XLogInsertPrepared() to do the actual insertion,
+ * but once this function returns the user can be assured that the xlog record
+ * itself can be written with the assumed info.
+ */
+void
+XLogPrepareInsert(RmgrId rmid, uint8 info)
+{
+	XLogRecPtr	RedoRecPtr;
+	bool		doPageWrites;
+
+	/* XLogBeginInsert() must have been called. */
+	if (!begininsert_called)
+		elog(ERROR, "XLogBeginInsert was not called");
+	/* ... and you can't prepare the record twice */
+	if (prepareinsert_called)
+		elog(ERROR, "Cannot call XLogPrepareInsert twice");
+
+	prepareinsert_called = true;
+
+	Assert(CritSectionCount == 0);
+	/*
+	 * The caller can set rmgr bits, XLR_SPECIAL_REL_UPDATE and
+	 * XLR_CHECK_CONSISTENCY; the rest are reserved for use by me.
+	 */
+	if ((info & ~(XLR_RMGR_INFO_MASK |
+				  XLR_SPECIAL_REL_UPDATE |
+				  XLR_CHECK_CONSISTENCY)) != 0)
+		elog(PANIC, "invalid xlog info mask %02X", info);
+
+	TRACE_POSTGRESQL_WAL_INSERT(rmid, info);
+
+	/*
+	 * In bootstrap mode, we don't actually log anything but XLOG resources;
+	 * return a phony record pointer.
+	 */
+	if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
+	{
+		prepared_record = NULL;
+		return;
+	}
+
+	preprec_topxid_included = false;
+
+	/*
+	 * Get values needed to decide whether to do full-page writes. Since
+	 * we don't yet have an insertion lock, these could change under us,
+	 * but XLogInsertRecord will recheck them once it has a lock.
+	 */
+	GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
+
+	prepared_record = XLogRecordAssemble(rmid, info, RedoRecPtr,
+										 doPageWrites, &preprec_fpw_lsn,
+										 &preprec_num_fpw, &preprec_topxid_included);
+}
+
+/*
+ * XLogInsertPrepared - write the previously prepared record into WAL.
+ * 
+ * Unlike XLogInsert(), this does not retry insertions, but instead returns
+ * InvalidXLogRecPtr when it fails to immediately insert the record (e.g. due
+ * to changes in the FPW settings between prepare and ). 
+ */
+XLogRecPtr
+XLogInsertPrepared(void)
+{
+	XLogRecPtr	EndPos;
+
+	/*
+	 * XLogBeginInsert() must have been called, and a record must have been
+	 * prepared.
+	 */
+	if (!begininsert_called)
+		elog(ERROR, "XLogBeginInsert was not called");
+	if (!prepareinsert_called)
+		elog(ERROR, "XLogPrepareInsert was not called");
+
+	/*
+	 * In bootstrap mode, we don't actually log anything but XLOG resources;
+	 * return a phony record pointer.
+	 */
+	if (IsBootstrapProcessingMode() && prepared_record == NULL)
+	{
+		XLogResetInsertion();
+		EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
+		return EndPos;
+	}
+
+	EndPos = XLogInsertRecord(prepared_record, preprec_fpw_lsn, curinsert_flags,
+							  preprec_num_fpw, preprec_topxid_included);
+
+	Assert(!XLogRecPtrIsInvalid(EndPos));
+	XLogResetInsertion();
+
+	return EndPos;
+}
+
 /*
  * Assemble a WAL record from the registered data and buffers into an
  * XLogRecData chain, ready for insertion with XLogInsertRecord().
@@ -548,6 +664,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rdt_datas_last = &hdr_rdt;
 	hdr_rdt.data = hdr_scratch;
 
+	/* We're rebuilding the record, reset the internal flag */
+	curinsert_flags &= ~XLOG_NOPAGEWRITE_USED;
+
 	/*
 	 * Enforce consistency checks for this record if user is looking for it.
 	 * Do this before at the beginning of this routine to give the possibility
@@ -584,7 +703,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		else if (regbuf->flags & REGBUF_NO_IMAGE)
 			needs_backup = false;
 		else if (!doPageWrites)
+		{
+			curinsert_flags |= XLOG_NOPAGEWRITE_USED;
 			needs_backup = false;
+		}
 		else
 		{
 			/*
diff --git a/src/backend/commands/sequence.c b/src/backend/commands/sequence.c
index bfe279cddf..0cc6a25cca 100644
--- a/src/backend/commands/sequence.c
+++ b/src/backend/commands/sequence.c
@@ -431,6 +431,15 @@ fill_seq_fork_with_data(Relation rel, HeapTuple tuple, ForkNumber forkNum)
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) tuple->t_data, tuple->t_len);
 
+		/*
+		 * The data in the record can currently not exceed the maximum size of
+		 * a heap tuple by much more than a constant offset, which in turn is
+		 * limited to about BLCKSZ.
+		 * If we ever change this log record's format, be sure to check that
+		 * we can't exceed the maximum size of xlog record while inside this
+		 * critical section, or we'd run the chance of PANICing during this
+		 * insertion.
+		 */
 		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
 
 		PageSetLSN(page, recptr);
@@ -849,6 +858,15 @@ nextval_internal(Oid relid, bool check_permissions)
 		XLogRegisterData((char *) &xlrec, sizeof(xl_seq_rec));
 		XLogRegisterData((char *) seqdatatuple.t_data, seqdatatuple.t_len);
 
+		/*
+		 * The data in the record can currently not exceed the maximum size of
+		 * a heap tuple by much more than a constant offset, which in turn is
+		 * limited to about BLCKSZ.
+		 * If we ever change this log record's format, be sure to check that
+		 * we can't exceed the maximum size of xlog record while inside this
+		 * critical section, or we'd run the chance of PANICing during this
+		 * insertion.
+		 */
 		recptr = XLogInsert(RM_SEQ_ID, XLOG_SEQ_LOG);
 
 		PageSetLSN(page, recptr);
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 7d3b9446e6..7d6a3de0a7 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -425,6 +425,30 @@ typedef struct xl_xact_parsed_abort
 	TimestampTz origin_timestamp;
 } xl_xact_parsed_abort;
 
+/* used to hold the data of the commit record when using prepared xlog insert */
+typedef struct xl_xact_commit_fields {
+	xl_xact_commit xlrec;
+	xl_xact_xinfo xl_xinfo;
+	xl_xact_dbinfo xl_dbinfo;
+	xl_xact_subxacts xl_subxacts;
+	xl_xact_relfilelocators xl_relfilelocators;
+	xl_xact_stats_items xl_dropped_stats;
+	xl_xact_invals xl_invals;
+	xl_xact_twophase xl_twophase;
+	xl_xact_origin xl_origin;
+} xl_xact_commit_fields;
+
+/* used to hold the data of the abort record when using prepared xlog insert */
+typedef struct xl_xact_abort_fields {
+	xl_xact_abort xlrec;
+	xl_xact_xinfo xl_xinfo;
+	xl_xact_subxacts xl_subxacts;
+	xl_xact_relfilelocators xl_relfilelocators;
+	xl_xact_stats_items xl_dropped_stats;
+	xl_xact_twophase xl_twophase;
+	xl_xact_dbinfo xl_dbinfo;
+	xl_xact_origin xl_origin;
+} xl_xact_abort_fields;
 
 /* ----------------
  *		extern definitions
@@ -494,24 +518,28 @@ extern void MarkSubxactTopXidLogged(void);
 
 extern int	xactGetCommittedChildren(TransactionId **ptr);
 
-extern XLogRecPtr XactLogCommitRecord(TimestampTz commit_time,
-									  int nsubxacts, TransactionId *subxacts,
-									  int nrels, RelFileLocator *rels,
-									  int ndroppedstats,
-									  xl_xact_stats_item *droppedstats,
-									  int nmsgs, SharedInvalidationMessage *msgs,
-									  bool relcacheInval,
-									  int xactflags,
-									  TransactionId twophase_xid,
-									  const char *twophase_gid);
-
-extern XLogRecPtr XactLogAbortRecord(TimestampTz abort_time,
-									 int nsubxacts, TransactionId *subxacts,
-									 int nrels, RelFileLocator *rels,
-									 int ndroppedstats,
-									 xl_xact_stats_item *droppedstats,
-									 int xactflags, TransactionId twophase_xid,
-									 const char *twophase_gid);
+extern void XactPrepareCommitRecord(TimestampTz commit_time,
+									int nsubxacts, TransactionId *subxacts,
+									int nrels, RelFileLocator *rels,
+									int ndroppedstats,
+									xl_xact_stats_item *droppedstats,
+									int nmsgs, SharedInvalidationMessage *msgs,
+									bool relcacheInval,
+									int xactflags,
+									TransactionId twophase_xid,
+									const char *twophase_gid,
+									xl_xact_commit_fields *recdata);
+extern XLogRecPtr XactLogCommitRecord(void);
+
+extern void XactPrepareAbortRecord(TimestampTz abort_time,
+								   int nsubxacts, TransactionId *subxacts,
+								   int nrels, RelFileLocator *rels,
+								   int ndroppedstats,
+								   xl_xact_stats_item *droppedstats,
+								   int xactflags, TransactionId twophase_xid,
+								   const char *twophase_gid,
+								   xl_xact_abort_fields *recdata);
+extern XLogRecPtr XactLogAbortRecord(void);
 extern void xact_redo(XLogReaderState *record);
 
 /* xactdesc.c */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index cfe5409738..c10a3c3679 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -150,6 +150,7 @@ extern PGDLLIMPORT bool XLOG_DEBUG;
  */
 #define XLOG_INCLUDE_ORIGIN		0x01	/* include the replication origin */
 #define XLOG_MARK_UNIMPORTANT	0x02	/* record not important for durability */
+#define XLOG_NOPAGEWRITE_USED	0x04	/* xlog record would've had FPI, if not for settings */
 
 
 /* Checkpoint statistics */
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 31785dc578..9d2c8a0893 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -42,6 +42,9 @@
 extern void XLogBeginInsert(void);
 extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
+extern void XLogPrepareInsert(RmgrId rmid, uint8 info);
+extern XLogRecPtr XLogInsertPrepared(void);
+
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
 extern void XLogRegisterData(char *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 0d576f7883..72bee48c5b 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -54,6 +54,17 @@ typedef struct XLogRecord
 
 #define SizeOfXLogRecord	(offsetof(XLogRecord, xl_crc) + sizeof(pg_crc32c))
 
+/* 
+ * XLogReader needs to allocate all the data of an xlog record in a single
+ * chunk.  This means that a single XLogRecord cannot exceed MaxAllocSize
+ * in length if we ignore any allocation overhead of the XLogReader.
+ *
+ * To accommodate some overhead, this value allows for 4MiB of allocation
+ * overhead, that should be plenty enough for what
+ * DecodeXLogRecordRequiredSpace() expects as extra.
+ */
+#define XLogRecordMaxSize	(1020 * 1024 * 1024)
+
 /*
  * The high 4 bits in xl_info may be used freely by rmgr. The
  * XLR_SPECIAL_REL_UPDATE and XLR_CHECK_CONSISTENCY bits can be passed by
-- 
2.39.0

