From c70daa35f2308ea195e177e30b54e2a613f78811 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Mon, 3 Jul 2023 10:28:00 +0900
Subject: [PATCH v2] Skip decoding already-aborted transactions.

Previously, we had the mechanism for detecting concurrent aborts for
streaming transactions. This commit enables us to check if the
large-transaction already aborted so that we can ignore further
changes. This is helpful for a case where the transaction is quite
large and already rolled back since we can avoid disk or network I/O.

We do the check for only large-transactions when eviction since
checking CLOG is costly and could cause a slowdown with lots of small
transactions, where most transactions commit.

For testing purpose, we disable this check when
logical_replication_mode is set to "immediate".

Author:
Reviewed-by:
Discussion: https://postgr.es/m/
---
 .../replication/logical/reorderbuffer.c       | 94 ++++++++++++++++---
 src/include/replication/reorderbuffer.h       | 13 ++-
 2 files changed, 90 insertions(+), 17 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 26d252bd87..387d2e9131 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -100,6 +100,7 @@
 #include "replication/snapbuild.h"	/* just for SnapBuildSnapDecRefcount */
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
+#include "storage/procarray.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/combocid.h"
@@ -256,7 +257,7 @@ static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
-									 bool txn_prepared);
+									 bool txn_prepared, bool streaming);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
 										TransactionId xid, XLogSegNo segno);
@@ -780,11 +781,11 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
 	/*
-	 * While streaming the previous changes we have detected that the
-	 * transaction is aborted.  So there is no point in collecting further
-	 * changes for it.
+	 * If we have detected that the transaction is aborted while streaming
+	 * the previous changes or by checking its CLOG, there is no point in
+	 * collecting further changes for it.
 	 */
-	if (txn->concurrent_abort)
+	if (txn->aborted)
 	{
 		/*
 		 * We don't need to update memory accounting for this change as we
@@ -1603,9 +1604,12 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
  *
  * 'txn_prepared' indicates that we have decoded the transaction at prepare
  * time.
+ * 'streaming' indicates that this function is called while streaming the transaction
+ * and we can mark the transaction as streamed.
  */
 static void
-ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared,
+						 bool streaming)
 {
 	dlist_mutable_iter iter;
 
@@ -1624,7 +1628,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		Assert(rbtxn_is_known_subxact(subtxn));
 		Assert(subtxn->nsubtxns == 0);
 
-		ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
+		ReorderBufferTruncateTXN(rb, subtxn, txn_prepared, streaming);
 	}
 
 	/* cleanup changes in the txn */
@@ -1658,7 +1662,8 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 	 * about the toplevel xact (we send the XID in all messages), but we never
 	 * stream XIDs of empty subxacts.
 	 */
-	if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
+	if (streaming &&
+		(!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
 		txn->txn_flags |= RBTXN_IS_STREAMED;
 
 	if (txn_prepared)
@@ -1887,7 +1892,7 @@ ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		 * full cleanup will happen as part of the COMMIT PREPAREDs, so now
 		 * just truncate txn by removing changes and tuplecids.
 		 */
-		ReorderBufferTruncateTXN(rb, txn, true);
+		ReorderBufferTruncateTXN(rb, txn, true, true);
 		/* Reset the CheckXidAlive */
 		CheckXidAlive = InvalidTransactionId;
 	}
@@ -2030,7 +2035,7 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 					  ReorderBufferChange *specinsert)
 {
 	/* Discard the changes that we just streamed */
-	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), true);
 
 	/* Free all resources allocated for toast reconstruction */
 	ReorderBufferToastReset(rb, txn);
@@ -2555,7 +2560,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		 */
 		if (streaming || rbtxn_prepared(txn))
 		{
-			ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn));
+			ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), streaming);
 			/* Reset the CheckXidAlive */
 			CheckXidAlive = InvalidTransactionId;
 		}
@@ -2608,7 +2613,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			FlushErrorState();
 			FreeErrorData(errdata);
 			errdata = NULL;
-			curtxn->concurrent_abort = true;
+			curtxn->aborted = true;
 
 			/* Reset the TXN so that it is allowed to stream remaining data. */
 			ReorderBufferResetTXN(rb, txn, snapshot_now,
@@ -2792,10 +2797,10 @@ ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
 	 * when rollback prepared is decoded and sent, the downstream should be
 	 * able to rollback such a xact. See comments atop DecodePrepare.
 	 *
-	 * Note, for the concurrent_abort + streaming case a stream_prepare was
-	 * already sent within the ReorderBufferReplay call above.
+	 * Note, for the abort + streaming case a stream_prepare was already sent
+	 * within the ReorderBufferReplay call above.
 	 */
-	if (txn->concurrent_abort && !rbtxn_is_streamed(txn))
+	if (txn->aborted && !rbtxn_is_streamed(txn))
 		rb->prepare(rb, txn, txn->final_lsn);
 }
 
@@ -3561,6 +3566,59 @@ ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 	return largest;
 }
 
+/*
+ * Check the transaction status of the given transaction. If the transaction
+ * already aborted, we discards all changes accumulated so far and ignore
+ * future changes, and return true. Otherwise return false.
+ *
+ * If logical_replication_mode is set to "immediate", we disable this check
+ * for regression tests.
+ */
+static bool
+ReorderBufferCheckTXNAbort(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	/*
+	 * If logical_replication_mode is "immediate", we don't check the
+	 * transaction status so the caller always process this transaction.
+	 */
+	if (unlikely(logical_replication_mode == LOGICAL_REP_MODE_IMMEDIATE))
+		return false;
+
+	if (txn->aborted)
+		return true;
+
+	if (txn->committed)
+		return false;
+
+	if (TransactionIdIsInProgress(txn->xid))
+		return false;
+
+	if (TransactionIdDidCommit(txn->xid))
+	{
+		/*
+		 * Remember the transaction is committed so that we can skip
+		 * CLOG check next time, avoiding the pressure on CLOG lookup.
+		 */
+		txn->committed = true;
+		return false;
+	}
+
+	/*
+	 * The transaction aborted. We discard the changes we've collected
+	 * so far, and free all resources allocated for toast reconstruction.
+	 */
+	ReorderBufferTruncateTXN(rb, txn, rbtxn_prepared(txn), false);
+	ReorderBufferToastReset(rb, txn);
+
+	/*
+	 * Mark the transaction as aborted so we ignore future changes of this
+	 * transaction.
+	 */
+	txn->aborted = true;
+
+	return true;
+}
+
 /*
  * Check whether the logical_decoding_work_mem limit was reached, and if yes
  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
@@ -3613,6 +3671,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			Assert(txn->total_size > 0);
 			Assert(rb->size >= txn->total_size);
 
+			if (ReorderBufferCheckTXNAbort(rb, txn))
+				continue;
+
 			ReorderBufferStreamTXN(rb, txn);
 		}
 		else
@@ -3628,6 +3689,9 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
 			Assert(txn->size > 0);
 			Assert(rb->size >= txn->size);
 
+			if (ReorderBufferCheckTXNAbort(rb, txn))
+				continue;
+
 			ReorderBufferSerializeTXN(rb, txn);
 		}
 
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 1b9db22acb..fae431ef95 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -428,8 +428,17 @@ typedef struct ReorderBufferTXN
 	/* Size of top-transaction including sub-transactions. */
 	Size		total_size;
 
-	/* If we have detected concurrent abort then ignore future changes. */
-	bool		concurrent_abort;
+	/*
+	 * True if the transaction committed. Then we skip transaction status
+	 * check for this transaction.
+	 */
+	bool		committed;
+
+	/*
+	 * True if the transaction (concurrently) aborted. Then we ignore
+	 * future changes.
+	 */
+	bool		aborted;
 
 	/*
 	 * Private data pointer of the output plugin.
-- 
2.31.1

