From 159a9f338412a6ecae675597f55b564b74afb134 Mon Sep 17 00:00:00 2001
From: Vaijayanti Bharadwaj <vaijayanti.bharadwaj@enterprisedb.com>
Date: Tue, 3 Dec 2024 18:36:23 +0530
Subject: [PATCH 2/2] cleanup incomplete transactions on startup in
 ReorderBuffer

---
 src/backend/replication/logical/decode.c      | 18 +++++-
 .../replication/logical/reorderbuffer.c       | 61 +++++++++++++++++++
 src/include/replication/reorderbuffer.h       |  1 +
 3 files changed, 79 insertions(+), 1 deletion(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e73576ad12..eb95ced41d 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -136,10 +136,26 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	switch (info)
 	{
-			/* this is also used in END_OF_RECOVERY checkpoints */
+			/*
+			 * this is also used in END_OF_RECOVERY checkpoints
+			 * This is a point where, if there are any transactions with
+			 * with ReorderBuffer, that are incomplete (they do not have a
+			 * commit, abort or prepare record),then they must be aborted.
+			 * They exist due to a prior unclean termination, which did not allow
+			 * logging an abort.
+			 * If not aborted, these transactions may be held by the ReorderBuffer
+			 * for long periods of time, especially if a prepared transaction
+			 * precedes it that does not get committed or aborted and holds down
+			 * the oldestRunningXid
+			 */
 		case XLOG_CHECKPOINT_SHUTDOWN:
 		case XLOG_END_OF_RECOVERY:
 			SnapBuildSerializationPoint(builder, buf->origptr);
+			elog(DEBUG2,
+				"decoding: recovery/shutdown checkpoint at lsn=%X/%X",
+				(uint32)(buf->origptr >> 32),
+				(uint32)(buf->origptr));
+			ReorderBufferAbortOlderThanLSN(ctx->reorder, buf->origptr);
 
 			break;
 		case XLOG_CHECKPOINT_ONLINE:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index e3a5c7b660..ed123f4169 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -2965,6 +2965,67 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	ReorderBufferCleanupTXN(rb, txn);
 }
 
+/*
+ * Abort all transactions that aren't actually running anymore because the
+ * server restarted.
+ * This is called since there may be prepared transactions holding back
+ * back the oldestRunningXid. This is a version of ReorderBufferAbortOld
+ * that aborts all older transactions, that are not in prepared state.
+ */
+void
+ReorderBufferAbortOlderThanLSN(ReorderBuffer *rb, XLogRecPtr cutoff)
+{
+	dlist_mutable_iter it;
+
+	/*
+	 * Iterate through all (potential) toplevel TXNs and abort all that are
+	 * older than what possibly can be running. Once we've found the first
+	 * that is alive we stop, there might be some that acquired an xid earlier
+	 * but started writing later, but it's unlikely and they will be cleaned
+	 * up in a later call to this function.
+	 */
+	dlist_foreach_modify(it, &rb->toplevel_by_lsn)
+	{
+		ReorderBufferTXN *txn;
+
+		txn = dlist_container(ReorderBufferTXN, node, it.cur);
+
+		if (rbtxn_prepared(txn) || rbtxn_skip_prepared(txn))
+		{
+			elog(DEBUG2, "txn: %u is prepared and cannot be cleaned up", txn->xid);
+		}
+		else if (txn->first_lsn < cutoff)
+		{
+			/*
+			 * We set final_lsn on a transaction when we decode its commit or
+			 * abort record, but we never see those records for crashed
+			 * transactions.  To ensure cleanup of these transactions, set
+			 * final_lsn to that of their last change; this causes
+			 * ReorderBufferRestoreCleanup to do the right thing.
+			 */
+			if (rbtxn_is_serialized(txn) && txn->final_lsn == InvalidXLogRecPtr)
+			{
+				ReorderBufferChange *last =
+				dlist_tail_element(ReorderBufferChange, node, &txn->changes);
+
+				txn->final_lsn = last->lsn;
+			}
+
+			elog(DEBUG2, "aborting an incomplete transaction %u", txn->xid);
+
+			/* remove potential on-disk data, and deallocate this tx */
+			ReorderBufferCleanupTXN(rb, txn);
+		}
+		else if (txn->first_lsn >= cutoff)
+		{
+			/* Should not reach here */
+			elog(FATAL,
+				"ReorderBufferAbortOlderThanLSN: found transaction in the future: %u",
+				txn->xid);
+		}
+	}
+}
+
 /*
  * Abort all transactions that aren't actually running anymore because the
  * server restarted.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 3bc365a7b0..4762ef14f7 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -698,6 +698,7 @@ extern void ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
 extern void ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 							   TimestampTz abort_time);
 extern void ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid);
+extern void ReorderBufferAbortOlderThanLSN(ReorderBuffer *rb, XLogRecPtr cutoff);
 extern void ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
 extern void ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn);
 
-- 
2.46.0

