From 53d079af84b8ac44497f27eab52c73cfc2bbd298 Mon Sep 17 00:00:00 2001
From: ChangAo Chen <cca5507@qq.com>
Date: Mon, 10 Jun 2024 20:56:37 +0800
Subject: [PATCH v1] Track transactions committed in BUILDING_SNAPSHOT.

Transactions committed in BUILDING_SNAPSHOT state are useful for
building historic snapshot, but we didn't track them previously.
This results in a transaction taking an incorrect snapshot and
logical replication being interrupted. So it's necessary to track
these transactions.
---
 src/backend/replication/logical/decode.c    | 27 +++++++++++++--------
 src/backend/replication/logical/snapbuild.c |  7 ++++++
 2 files changed, 24 insertions(+), 10 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8ec5adfd90..6159beca4b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -206,10 +206,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
 
 	/*
-	 * If the snapshot isn't yet fully built, we cannot decode anything, so
-	 * bail out.
+	 * If the snapshot isn't yet fully built, we cannot decode anything.
+	 * But we need to track transactions committed in BUILDING_SNAPSHOT
+	 * state, or the snapshot will be incorrect.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT)
 		return;
 
 	switch (info)
@@ -282,9 +283,11 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				TransactionId xid;
 				xl_xact_invals *invals;
+				bool		has_snapshot = false;
 
 				xid = XLogRecGetXid(r);
 				invals = (xl_xact_invals *) XLogRecGetData(r);
+				has_snapshot = SnapBuildCurrentState(builder) >= SNAPBUILD_FULL_SNAPSHOT;
 
 				/*
 				 * Execute the invalidations for xid-less transactions,
@@ -293,7 +296,7 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				 */
 				if (TransactionIdIsValid(xid))
 				{
-					if (!ctx->fast_forward)
+					if (!ctx->fast_forward && has_snapshot)
 						ReorderBufferAddInvalidations(reorder, xid,
 													  buf->origptr,
 													  invals->nmsgs,
@@ -301,12 +304,13 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 					ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
 													  buf->origptr);
 				}
-				else if ((!ctx->fast_forward))
+				else if (!ctx->fast_forward && has_snapshot)
 					ReorderBufferImmediateInvalidation(ctx->reorder,
 													   invals->nmsgs,
 													   invals->msgs);
+
+				break;
 			}
-			break;
 		case XLOG_XACT_PREPARE:
 			{
 				xl_xact_parsed_prepare parsed;
@@ -411,9 +415,10 @@ heap2_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	/*
 	 * If we don't have snapshot or we are just fast-forwarding, there is no
-	 * point in decoding changes.
+	 * point in decoding changes. If we are building snapshot, we need to
+	 * handle XLOG_HEAP2_NEW_CID which mark a transaction as catalog modifying.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT ||
 		ctx->fast_forward)
 		return;
 
@@ -470,9 +475,10 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	/*
 	 * If we don't have snapshot or we are just fast-forwarding, there is no
-	 * point in decoding data changes.
+	 * point in decoding data changes. If we are building snapshot, we need to
+	 * handle XLOG_HEAP_INPLACE which mark a transaction as catalog modifying.
 	 */
-	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_BUILDING_SNAPSHOT ||
 		ctx->fast_forward)
 		return;
 
@@ -1300,6 +1306,7 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 				  Oid txn_dbid, RepOriginId origin_id)
 {
 	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
+		SnapBuildCurrentState(ctx->snapshot_builder) < SNAPBUILD_CONSISTENT ||
 		(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
 		FilterByOrigin(ctx, origin_id))
 		return true;
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index e37e22f441..8eacede275 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -826,6 +826,13 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 	 */
 	ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
 
+	/*
+	 * If we don't have snapshot, the transaction won't be
+	 * replayed, just return here.
+	 */
+	if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
+		return;
+
 	ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
 								 xlrec->target_locator, xlrec->target_tid,
 								 xlrec->cmin, xlrec->cmax,
-- 
2.34.1

