From bcfa4182f128a54db15117aa79483905e9b2f69a Mon Sep 17 00:00:00 2001
From: Amit Kapila <akapila@postgresql.org>
Date: Sat, 6 Jun 2020 09:54:21 +0530
Subject: [PATCH v31 2/4] WAL Log invalidations at command end with
 wal_level=logical.

When wal_level=logical, write invalidations at command end into WAL so
that decoding can use this information.

This patch is required to allow the streaming of in-progress transactions
in logical decoding.  The actual work to allow streaming will be committed
as a separate patch.

We still add the invalidations to the cache and write them to WAL at
commit time in RecordTransactionCommit(). This uses the existing
XLOG_INVALIDATIONS xlog record type, from the RM_STANDBY_ID resource
manager (see LogStandbyInvalidations for details).

So existing code relying on those invalidations (e.g. redo) does not need
to be changed.

The invalidations written at command end uses a new xlog record type
XLOG_XACT_INVALIDATIONS, from RM_XACT_ID resource manager. See
LogLogicalInvalidations for details.

These new xlog records are ignored by existing redo procedures, which
still rely on the invalidations written to commit records.

The invalidations are decoded and accumulated in top-transaction, and then
executed during replay.  This obviates the need to decode the
invalidations as part of a commit record.

LogStandbyInvalidations was accumulating all the invalidations in memory,
and then only wrote them once at commit time, which may reduce the
performance impact by amortizing the overhead and deduplicating the
invalidations.

Author: Dilip Kumar, Tomas Vondra, Amit Kapila
Reviewed-by: Amit Kapila
Tested-by: Neha Sharma and Mahendra Singh Thalor
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com
---
 src/backend/access/rmgrdesc/xactdesc.c          | 10 +++++
 src/backend/access/transam/xact.c               |  7 +++
 src/backend/replication/logical/decode.c        | 58 +++++++++++++++----------
 src/backend/replication/logical/reorderbuffer.c | 52 ++++++++++++++++++----
 src/backend/utils/cache/inval.c                 | 56 ++++++++++++++++++++++++
 src/include/access/xact.h                       | 13 +++++-
 src/include/replication/reorderbuffer.h         |  3 ++
 7 files changed, 166 insertions(+), 33 deletions(-)

diff --git a/src/backend/access/rmgrdesc/xactdesc.c b/src/backend/access/rmgrdesc/xactdesc.c
index 9fce755..68aa994 100644
--- a/src/backend/access/rmgrdesc/xactdesc.c
+++ b/src/backend/access/rmgrdesc/xactdesc.c
@@ -396,6 +396,13 @@ xact_desc(StringInfo buf, XLogReaderState *record)
 		appendStringInfo(buf, "xtop %u: ", xlrec->xtop);
 		xact_desc_assignment(buf, xlrec);
 	}
+	else if (info == XLOG_XACT_INVALIDATIONS)
+	{
+		xl_xact_invalidations *xlrec = (xl_xact_invalidations *) rec;
+
+		standby_desc_invalidations(buf, xlrec->nmsgs, xlrec->msgs, InvalidOid,
+								   InvalidOid, false);
+	}
 }
 
 const char *
@@ -423,6 +430,9 @@ xact_identify(uint8 info)
 		case XLOG_XACT_ASSIGNMENT:
 			id = "ASSIGNMENT";
 			break;
+		case XLOG_XACT_INVALIDATIONS:
+			id = "INVALIDATION";
+			break;
 	}
 
 	return id;
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index a93fb8a..d93b40f 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -6022,6 +6022,13 @@ xact_redo(XLogReaderState *record)
 			ProcArrayApplyXidAssignment(xlrec->xtop,
 										xlrec->nsubxacts, xlrec->xsub);
 	}
+	else if (info == XLOG_XACT_INVALIDATIONS)
+	{
+		/*
+		 * XXX we do ignore this for now, what matters are invalidations
+		 * written into the commit record.
+		 */
+	}
 	else
 		elog(PANIC, "xact_redo: unknown op code %u", info);
 }
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0c0c371..7153eba 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -278,10 +278,39 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 			/*
 			 * We assign subxact to the toplevel xact while processing each
-			 * record if required.  So, we don't need to do anything here.
-			 * See LogicalDecodingProcessRecord.
+			 * record if required.  So, we don't need to do anything here. See
+			 * LogicalDecodingProcessRecord.
 			 */
 			break;
+		case XLOG_XACT_INVALIDATIONS:
+			{
+				TransactionId xid;
+				xl_xact_invalidations *invals;
+
+				xid = XLogRecGetXid(r);
+				invals = (xl_xact_invalidations *) XLogRecGetData(r);
+
+				/*
+				 * Execute the invalidations for xid-less transactions,
+				 * otherwise, accumulate them so that they can be processed at
+				 * the commit time.
+				 */
+				if (!ctx->fast_forward)
+				{
+					if (TransactionIdIsValid(xid))
+					{
+						ReorderBufferAddInvalidations(reorder, xid, buf->origptr,
+													  invals->nmsgs, invals->msgs);
+						ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
+														  buf->origptr);
+					}
+					else
+						ReorderBufferImmediateInvalidation(ctx->reorder,
+														   invals->nmsgs,
+														   invals->msgs);
+				}
+			}
+			break;
 		case XLOG_XACT_PREPARE:
 
 			/*
@@ -334,15 +363,11 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_STANDBY_LOCK:
 			break;
 		case XLOG_INVALIDATIONS:
-			{
-				xl_invalidations *invalidations =
-				(xl_invalidations *) XLogRecGetData(r);
 
-				if (!ctx->fast_forward)
-					ReorderBufferImmediateInvalidation(ctx->reorder,
-													   invalidations->nmsgs,
-													   invalidations->msgs);
-			}
+			/*
+			 * We are processing the invalidations at the command level via
+			 * XLOG_XACT_INVALIDATIONS.  So we don't need to do anything here.
+			 */
 			break;
 		default:
 			elog(ERROR, "unexpected RM_STANDBY_ID record type: %u", info);
@@ -573,19 +598,6 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 		commit_time = parsed->origin_timestamp;
 	}
 
-	/*
-	 * Process invalidation messages, even if we're not interested in the
-	 * transaction's contents, since the various caches need to always be
-	 * consistent.
-	 */
-	if (parsed->nmsgs > 0)
-	{
-		if (!ctx->fast_forward)
-			ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
-										  parsed->nmsgs, parsed->msgs);
-		ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
-	}
-
 	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
 					   parsed->nsubxacts, parsed->subxacts);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 642a1c7..4b277fe 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -860,6 +860,9 @@ ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
 	subtxn->toplevel_xid = xid;
 	Assert(subtxn->nsubtxns == 0);
 
+	/* set the reference to top-level transaction */
+	subtxn->toptxn = txn;
+
 	/* add to subtransaction list */
 	dlist_push_tail(&txn->subtxns, &subtxn->node);
 	txn->nsubtxns++;
@@ -2205,7 +2208,11 @@ ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
 /*
  * Setup the invalidation of the toplevel transaction.
  *
- * This needs to be done before ReorderBufferCommit is called!
+ * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
+ * accumulates all the invalidation messages in the toplevel transaction.
+ * This is required because in some cases where we skip processing the
+ * transaction (see ReorderBufferForget), we need to execute all the
+ * invalidations together.
  */
 void
 ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
@@ -2216,17 +2223,35 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
-	if (txn->ninvalidations != 0)
-		elog(ERROR, "only ever add one set of invalidations");
+	/*
+	 * We collect all the invalidations under the top transaction so that we
+	 * can execute them all together.
+	 */
+	if (txn->toptxn)
+		txn = txn->toptxn;
 
 	Assert(nmsgs > 0);
 
-	txn->ninvalidations = nmsgs;
-	txn->invalidations = (SharedInvalidationMessage *)
-		MemoryContextAlloc(rb->context,
-						   sizeof(SharedInvalidationMessage) * nmsgs);
-	memcpy(txn->invalidations, msgs,
-		   sizeof(SharedInvalidationMessage) * nmsgs);
+	/* Accumulate invalidations. */
+	if (txn->ninvalidations == 0)
+	{
+		txn->ninvalidations = nmsgs;
+		txn->invalidations = (SharedInvalidationMessage *)
+			MemoryContextAlloc(rb->context,
+							   sizeof(SharedInvalidationMessage) * nmsgs);
+		memcpy(txn->invalidations, msgs,
+			   sizeof(SharedInvalidationMessage) * nmsgs);
+	}
+	else
+	{
+		txn->invalidations = (SharedInvalidationMessage *)
+			repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
+					 (txn->ninvalidations + nmsgs));
+
+		memcpy(txn->invalidations + txn->ninvalidations, msgs,
+			   nmsgs * sizeof(SharedInvalidationMessage));
+		txn->ninvalidations += nmsgs;
+	}
 }
 
 /*
@@ -2254,6 +2279,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 	txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+
+	/*
+	 * Mark top-level transaction as having catalog changes too if one of its
+	 * children has so that the ReorderBufferBuildTupleCidHash can
+	 * conveniently check just top-level transaction and decide whether to
+	 * build the hash table or not.
+	 */
+	if (txn->toptxn != NULL)
+		txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
 }
 
 /*
diff --git a/src/backend/utils/cache/inval.c b/src/backend/utils/cache/inval.c
index 591dd33..7d4fd9f 100644
--- a/src/backend/utils/cache/inval.c
+++ b/src/backend/utils/cache/inval.c
@@ -85,6 +85,9 @@
  *	worth trying to avoid sending such inval traffic in the future, if those
  *	problems can be overcome cheaply.
  *
+ *	When wal_level=logical, write invalidations into WAL at each command end to
+ *	support the decoding of the in-progress transactions.  See
+ *      CommandEndInvalidationMessages.
  *
  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -104,6 +107,7 @@
 #include "catalog/pg_constraint.h"
 #include "miscadmin.h"
 #include "storage/sinval.h"
+#include "storage/standby.h"
 #include "storage/smgr.h"
 #include "utils/catcache.h"
 #include "utils/inval.h"
@@ -210,6 +214,8 @@ static struct RELCACHECALLBACK
 
 static int	relcache_callback_count = 0;
 
+static void LogLogicalInvalidations(void);
+
 /* ----------------------------------------------------------------
  *				Invalidation list support functions
  *
@@ -1094,6 +1100,11 @@ CommandEndInvalidationMessages(void)
 
 	ProcessInvalidationMessages(&transInvalInfo->CurrentCmdInvalidMsgs,
 								LocalExecuteInvalidationMessage);
+
+	/* WAL Log per-command invalidation messages for wal_level=logical */
+	if (XLogLogicalInfoActive())
+		LogLogicalInvalidations();
+
 	AppendInvalidationMessages(&transInvalInfo->PriorCmdInvalidMsgs,
 							   &transInvalInfo->CurrentCmdInvalidMsgs);
 }
@@ -1501,3 +1512,48 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
 		i = ccitem->link - 1;
 	}
 }
+
+/*
+ * LogLogicalInvalidations
+ *
+ * Emit WAL for invalidations.  This is currently only used for logging
+ * invalidations at the command end.
+ */
+static void
+LogLogicalInvalidations()
+{
+	xl_xact_invalidations xlrec;
+	SharedInvalidationMessage *invalMessages;
+	int			nmsgs = 0;
+
+	/* Quick exit if we haven't done anything with invalidation messages. */
+	if (transInvalInfo == NULL)
+		return;
+
+	ProcessInvalidationMessagesMulti(&transInvalInfo->CurrentCmdInvalidMsgs,
+									 MakeSharedInvalidMessagesArray);
+
+	Assert(!(numSharedInvalidMessagesArray > 0 &&
+			 SharedInvalidMessagesArray == NULL));
+
+	invalMessages = SharedInvalidMessagesArray;
+	nmsgs = numSharedInvalidMessagesArray;
+	SharedInvalidMessagesArray = NULL;
+	numSharedInvalidMessagesArray = 0;
+
+	if (nmsgs > 0)
+	{
+		/* prepare record */
+		memset(&xlrec, 0, MinSizeOfXactInvalidations);
+		xlrec.nmsgs = nmsgs;
+
+		/* perform insertion */
+		XLogBeginInsert();
+		XLogRegisterData((char *) (&xlrec), MinSizeOfXactInvalidations);
+		XLogRegisterData((char *) invalMessages,
+						 nmsgs * sizeof(SharedInvalidationMessage));
+		XLogInsert(RM_XACT_ID, XLOG_XACT_INVALIDATIONS);
+
+		pfree(invalMessages);
+	}
+}
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index aef8555..ac3f5e3 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -146,7 +146,7 @@ typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
 #define XLOG_XACT_COMMIT_PREPARED	0x30
 #define XLOG_XACT_ABORT_PREPARED	0x40
 #define XLOG_XACT_ASSIGNMENT		0x50
-/* free opcode 0x60 */
+#define XLOG_XACT_INVALIDATIONS		0x60
 /* free opcode 0x70 */
 
 /* mask for filtering opcodes out of xl_info */
@@ -198,6 +198,17 @@ typedef struct xl_xact_assignment
 #define MinSizeOfXactAssignment offsetof(xl_xact_assignment, xsub)
 
 /*
+ * Invalidations logged with wal_level=logical.
+ */
+typedef struct xl_xact_invalidations
+{
+	int			nmsgs;			/* number of shared inval msgs */
+	SharedInvalidationMessage msgs[FLEXIBLE_ARRAY_MEMBER];
+}			xl_xact_invalidations;
+
+#define MinSizeOfXactInvalidations offsetof(xl_xact_invalidations, msgs)
+
+/*
  * Commit and abort records can contain a lot of information. But a large
  * portion of the records won't need all possible pieces of information. So we
  * only include what's needed.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 626ecf4..74ffe78 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -220,6 +220,9 @@ typedef struct ReorderBufferTXN
 	 */
 	XLogRecPtr	end_lsn;
 
+	/* Toplevel transaction for this subxact (NULL for top-level). */
+	struct ReorderBufferTXN *toptxn;
+
 	/*
 	 * LSN of the last lsn at which snapshot information reside, so we can
 	 * restart decoding from there and fully recover this transaction from
-- 
1.8.3.1

