From 9739d48a979868b912f6e1f90d4702808e061a35 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Thu, 3 Oct 2019 09:00:49 +0530
Subject: [PATCH 06/14] Gracefully handle concurrent aborts of uncommitted
 transactions that are being decoded alongside.

When a transaction aborts, it's changes are considered unnecessary for
other transactions. That means the changes may be either cleaned up by
vacuum or removed from HOT chains (thus made inaccessible through
indexes), and there may be other such consequences.

When decoding committed transactions this is not an issue, and we
never decode transactions that abort before the decoding starts.

But for in-progress transactions - for example when decoding prepared
transactions on PREPARE (and not COMMIT PREPARED as before), this
may cause failures when the output plugin consults catalogs (both
system and user-defined).

We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend decoding a
specific uncommitted transaction. The decoding logic on the receipt
of such an sqlerrcode aborts the ongoing decoding and returns
gracefully.
---
 doc/src/sgml/logicaldecoding.sgml               |  5 ++-
 src/backend/access/heap/heapam.c                | 51 +++++++++++++++++++++++++
 src/backend/access/index/genam.c                | 34 +++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c |  8 ++--
 src/backend/utils/time/snapmgr.c                | 25 +++++++++++-
 src/include/utils/snapmgr.h                     |  4 +-
 6 files changed, 119 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index fc4ad65..da6a6f3 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -432,7 +432,10 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
 ALTER TABLE user_catalog_table SET (user_catalog_table = true);
 CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
 </programlisting>
-     Any actions leading to transaction ID assignment are prohibited. That, among others,
+     Note that access to user catalog tables or regular system catalog tables
+     in the output plugins has to be done via the <literal>systable_*</literal> scan APIs only.
+     Access via the <literal>heap_*</literal> scan APIs will error out.
+     Additionally, any actions leading to transaction ID assignment are prohibited. That, among others,
      includes writing to tables, performing DDL changes, and
      calling <literal>txid_current()</literal>.
     </para>
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index e954482..6ce7878 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -1303,6 +1303,17 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg_internal("only heap AM is supported")));
 
+	/*
+	 * We don't expect direct calls to heap_getnext with valid
+	 * CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(scan->rs_base.rs_rd) ||
+		  RelationIsUsedAsCatalogTable(scan->rs_base.rs_rd))))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+			 errmsg("improper heap_getnext call")));
+
 	/* Note: no locking manipulations needed */
 
 	HEAPDEBUG_1;				/* heap_getnext( info ) */
@@ -1422,6 +1433,16 @@ heap_fetch(Relation relation,
 	bool		valid;
 
 	/*
+	 * We don't expect direct calls to heap_fetch with valid
+	 * CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+			 errmsg("improper heap_fetch call")));
+
+	/*
 	 * Fetch and pin the appropriate page of the relation.
 	 */
 	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
@@ -1535,6 +1556,16 @@ heap_hot_search_buffer(ItemPointer tid, Relation relation, Buffer buffer,
 	bool		valid;
 	bool		skip;
 
+	/*
+	 * We don't expect direct calls to heap_hot_search_buffer with
+	 * valid CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+			 errmsg("improper heap_hot_search_buffer call")));
+
 	/* If this is not the first call, previous call returned a (live!) tuple */
 	if (all_dead)
 		*all_dead = first_call;
@@ -1683,6 +1714,16 @@ heap_get_latest_tid(TableScanDesc sscan,
 	Assert(ItemPointerIsValid(tid));
 
 	/*
+	 * We don't expect direct calls to heap_get_latest_tid with valid
+	 * CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+			 errmsg("improper heap_get_latest_tid call")));
+
+	/*
 	 * Loop to chase down t_ctid links.  At top of loop, ctid is the tuple we
 	 * need to examine, and *tid is the TID we will return if ctid turns out
 	 * to be bogus.
@@ -5481,6 +5522,16 @@ heap_finish_speculative(Relation relation, ItemPointer tid)
 	ItemId		lp = NULL;
 	HeapTupleHeader htup;
 
+	/*
+	 * We don't expect direct calls to heap_hot_search with
+	 * valid CheckXidAlive for regular tables. Track that below.
+	 */
+	if (unlikely(TransactionIdIsValid(CheckXidAlive) &&
+		!(IsCatalogRelation(relation) || RelationIsUsedAsCatalogTable(relation))))
+		ereport(ERROR,
+			(errcode(ERRCODE_INVALID_TRANSACTION_STATE),
+			 errmsg("improper heap_hot_search call")));
+
 	buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
 	LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
 	page = (Page) BufferGetPage(buffer);
diff --git a/src/backend/access/index/genam.c b/src/backend/access/index/genam.c
index 2599b5d..201acfb 100644
--- a/src/backend/access/index/genam.c
+++ b/src/backend/access/index/genam.c
@@ -28,6 +28,7 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -477,6 +478,17 @@ systable_getnext(SysScanDesc sysscan)
 		}
 	}
 
+	/*
+	 * If CheckXidAlive is valid, then we check if it aborted. If it did, we
+	 * error out
+	 */
+	if (TransactionIdIsValid(CheckXidAlive) &&
+			!TransactionIdIsInProgress(CheckXidAlive) &&
+			!TransactionIdDidCommit(CheckXidAlive))
+			ereport(ERROR,
+				(errcode(ERRCODE_TRANSACTION_ROLLBACK),
+				 errmsg("transaction aborted during system catalog scan")));
+
 	return htup;
 }
 
@@ -513,6 +525,17 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
 											sysscan->slot,
 											freshsnap);
 
+	/*
+	 * If CheckXidAlive is valid, then we check if it aborted. If it did, we
+	 * error out
+	 */
+	if (TransactionIdIsValid(CheckXidAlive) &&
+			!TransactionIdIsInProgress(CheckXidAlive) &&
+			!TransactionIdDidCommit(CheckXidAlive))
+			ereport(ERROR,
+				(errcode(ERRCODE_TRANSACTION_ROLLBACK),
+				 errmsg("transaction aborted during system catalog scan")));
+
 	return result;
 }
 
@@ -639,6 +662,17 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
 	if (htup && sysscan->iscan->xs_recheck)
 		elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
 
+	/*
+	 * If CheckXidAlive is valid, then we check if it aborted. If it did, we
+	 * error out
+	 */
+	if (TransactionIdIsValid(CheckXidAlive) &&
+			!TransactionIdIsInProgress(CheckXidAlive) &&
+			!TransactionIdDidCommit(CheckXidAlive))
+			ereport(ERROR,
+				(errcode(ERRCODE_TRANSACTION_ROLLBACK),
+				 errmsg("transaction aborted during system catalog scan")));
+
 	return htup;
 }
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index ca4b904..3143479 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -679,7 +679,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 			txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 		/* setup snapshot to allow catalog access */
-		SetupHistoricSnapshot(snapshot_now, NULL);
+		SetupHistoricSnapshot(snapshot_now, NULL, xid);
 		PG_TRY();
 		{
 			rb->message(rb, txn, lsn, false, prefix, message_size, message);
@@ -1529,7 +1529,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 	ReorderBufferBuildTupleCidHash(rb, txn);
 
 	/* setup the initial snapshot */
-	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+	SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
 
 	/*
 	 * Decoding needs access to syscaches et al., which in turn use
@@ -1780,7 +1780,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 
 
 					/* and continue with the new one */
-					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+					SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
 					break;
 
 				case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
@@ -1800,7 +1800,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 						snapshot_now->curcid = command_id;
 
 						TeardownHistoricSnapshot(false);
-						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
+						SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash, xid);
 
 						/*
 						 * Every time the CommandId is incremented, we could
diff --git a/src/backend/utils/time/snapmgr.c b/src/backend/utils/time/snapmgr.c
index 47b0517..9fa1e43 100644
--- a/src/backend/utils/time/snapmgr.c
+++ b/src/backend/utils/time/snapmgr.c
@@ -154,6 +154,13 @@ static Snapshot CatalogSnapshot = NULL;
 static Snapshot HistoricSnapshot = NULL;
 
 /*
+ * An xid value pointing to a possibly ongoing or a prepared transaction.
+ * Currently used in logical decoding.  It's possible that such transactions
+ * can get aborted while the decoding is ongoing.
+ */
+TransactionId CheckXidAlive = InvalidTransactionId;
+
+/*
  * These are updated by GetSnapshotData.  We initialize them this way
  * for the convenience of TransactionIdIsInProgress: even in bootstrap
  * mode, we don't want it to say that BootstrapTransactionId is in progress.
@@ -2029,10 +2036,14 @@ MaintainOldSnapshotTimeMapping(TimestampTz whenTaken, TransactionId xmin)
  * Setup a snapshot that replaces normal catalog snapshots that allows catalog
  * access to behave just like it did at a certain point in the past.
  *
+ * If a valid xid is passed in, we check if it is uncommitted and track it in
+ * CheckXidAlive.  This is to re-check XID status while accessing catalog.
+ *
  * Needed for logical decoding.
  */
 void
-SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
+SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids,
+					  TransactionId snapshot_xid)
 {
 	Assert(historic_snapshot != NULL);
 
@@ -2041,8 +2052,17 @@ SetupHistoricSnapshot(Snapshot historic_snapshot, HTAB *tuplecids)
 
 	/* setup (cmin, cmax) lookup hash */
 	tuplecid_data = tuplecids;
-}
 
+	/*
+	 * setup CheckXidAlive if it's not committed yet. We don't check
+	 * if the xid aborted. That will happen during catalog access.
+	 */
+	if (TransactionIdIsValid(snapshot_xid) &&
+		!TransactionIdDidCommit(snapshot_xid))
+		CheckXidAlive = snapshot_xid;
+	else
+		CheckXidAlive = InvalidTransactionId;
+}
 
 /*
  * Make catalog snapshots behave normally again.
@@ -2052,6 +2072,7 @@ TeardownHistoricSnapshot(bool is_error)
 {
 	HistoricSnapshot = NULL;
 	tuplecid_data = NULL;
+	CheckXidAlive = InvalidTransactionId;
 }
 
 bool
diff --git a/src/include/utils/snapmgr.h b/src/include/utils/snapmgr.h
index 67b07df..9a8f9ce 100644
--- a/src/include/utils/snapmgr.h
+++ b/src/include/utils/snapmgr.h
@@ -145,8 +145,10 @@ extern bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
 
 /* Support for catalog timetravel for logical decoding */
 struct HTAB;
+extern TransactionId CheckXidAlive;
 extern struct HTAB *HistoricSnapshotGetTupleCids(void);
-extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids);
+extern void SetupHistoricSnapshot(Snapshot snapshot_now, struct HTAB *tuplecids,
+								  TransactionId snapshot_xid);
 extern void TeardownHistoricSnapshot(bool is_error);
 extern bool HistoricSnapshotActive(void);
 
-- 
1.8.3.1

