From fafdad65f8116f2646cfe51a9cf05408b38a2653 Mon Sep 17 00:00:00 2001
From: "kuroda.hayato%40jp.fujitsu.com" <kuroda.hayato@jp.fujitsu.com>
Date: Thu, 1 Sep 2022 08:20:31 +0000
Subject: [PATCH] mark RBTXN_HAS_CATALOG_CHANGES to the top transaction
 correctly

When a subtransaction is recored to WAL, only the first record has the xid
of the top-level transaction. So when a previously serialized snapshot was
restored, the first WAL record of a sub transaction might be not read in which
case the top-sub relationship of transactions will be missed.

Normally it is not harmful, but if the subtransaction modifies the catalog,
the mark RBTXN_HAS_CATALOG_CHANGES will be not propagated to the top transaction.
This contradiction may cause the core dump.

To fix this problem, this change adds the pair lists of (sub)transaction IDs to
SnapBuild. Lists are filled when the snapshot is serialized, and it is used
to mark top-transaction as containing catalog changes.
---
 .../expected/catalog_change_snapshot.out      |  45 +++++
 .../specs/catalog_change_snapshot.spec        |  11 ++
 src/backend/replication/logical/decode.c      |   6 +-
 .../replication/logical/reorderbuffer.c       |  46 +++++
 src/backend/replication/logical/snapbuild.c   | 167 +++++++++++++++++-
 src/include/replication/reorderbuffer.h       |   9 +
 src/include/replication/snapbuild.h           |   2 +
 7 files changed, 280 insertions(+), 6 deletions(-)

diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
index d2a4bdfcc1..e36f417e4a 100644
--- a/contrib/test_decoding/expected/catalog_change_snapshot.out
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -96,3 +96,48 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_init s0_begin s0_savepoint s0_insert s1_checkpoint s1_get_changes s0_truncate s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_truncate: TRUNCATE tbl1;
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+table public.tbl1: TRUNCATE: (no-flags)                      
+COMMIT                                                       
+(4 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
index ff8f68489b..e3f72cc3fe 100644
--- a/contrib/test_decoding/specs/catalog_change_snapshot.spec
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -57,3 +57,14 @@ permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s
 # checkpoint record it prunes one of the xacts in that list and when decoding the
 # next checkpoint, it will completely prune that list.
 permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can restore the association between xid of the catalog modifying top
+# transaction and that of the subtransaction from the serialized snapshot. This is
+# necessary when we restart decoding from RUNNING_XACT without the wal to associate
+# subtransaction to its top transaction. The last decoding starts from the first
+# checkpoint and NEW_CID of "s0_truncate" doesn't mark the top transaction as catalog
+# modifying transaction. In this scenario, serialized snapshot restores the
+# relationship between transactions and thus the decoding works without a contradition
+# that one subtransaction changed catalog transaction while its top transaction
+# didn't.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_truncate" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1667d720b1..4e5602eaae 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -272,8 +272,8 @@ xact_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 													  buf->origptr,
 													  invals->nmsgs,
 													  invals->msgs);
-					ReorderBufferXidSetCatalogChanges(ctx->reorder, xid,
-													  buf->origptr);
+					SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid,
+												  buf->origptr);
 				}
 				else if ((!ctx->fast_forward))
 					ReorderBufferImmediateInvalidation(ctx->reorder,
@@ -499,7 +499,7 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 				break;
 
 			(void) SnapBuildProcessChange(builder, xid, buf->origptr);
-			ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+			SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid, buf->origptr);
 			break;
 
 		case XLOG_HEAP_CONFIRM:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 89cf9f9389..48b00beefa 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -293,6 +293,8 @@ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											bool addition, Size sz);
 
+static int cmp_subxids_cell(const ListCell *a, const ListCell *b);
+
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
  * prior ReorderBuffer instances for the same slot.
@@ -5224,3 +5226,47 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+static int
+cmp_subxids_cell(const ListCell *a, const ListCell *b)
+{
+	subxids_cell *suba = (subxids_cell *) lfirst(a);
+	subxids_cell *subb = (subxids_cell *) lfirst(b);
+
+	return xidComparator(&suba->subtxn, &subb->subtxn);
+}
+
+/*
+ * Lookup all ReorderBufferTXNs and list them that have toplevel_xid.
+ * subxids_cell is used as the cell of the list.
+ */
+List *
+ReorderBufferGetSubtxns(ReorderBuffer *rb)
+{
+	HASH_SEQ_STATUS hash_seq;
+	ReorderBufferTXNByIdEnt *entry;
+	List *result = NIL;
+
+	hash_seq_init(&hash_seq, rb->by_txn);
+	while ((entry = hash_seq_search(&hash_seq)) != NULL)
+	{
+		subxids_cell *cell;
+
+		/*
+		 * Skip if picked transaction seems top.
+		 */
+		if (!TransactionIdIsValid(entry->txn->toplevel_xid))
+			continue;
+
+		cell = (subxids_cell *) palloc0(sizeof(subxids_cell));
+
+		cell->subtxn = entry->txn->xid;
+		cell->txn = entry->txn->toplevel_xid;
+
+		result = lappend(result, cell);
+	}
+
+	list_sort(result, cmp_subxids_cell);
+
+	return result;
+}
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index bf72ad45ec..7573f9c9d9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -268,6 +268,30 @@ struct SnapBuild
 		/* This array must be sorted in xidComparator order */
 		TransactionId *xip;
 	}			catchange;
+
+	/*
+	 * Two arrays that have xids of (sub)transactions that were running
+	 * when the snapshot was serialized.
+	 *
+	 * When a subtransaction is recored to WAL, only the first record has the xid
+	 * of the top-level transaction. So if a previously serialized snapshot is
+	 * restored, the first WAL record of a sub transaction may be not read in which
+	 * case the top-sub relationship of transactions will be missed.
+	 *
+	 * Now to avoid the above problem, we serialize the pair of sub-top transactions.
+	 *
+	 * XXX: ReorderBufferGetSubtxns() returns data as List, but we record here as arrays.
+	 * Is there a better way to record them?
+	 */
+	struct
+	{
+		/* number of transactions */
+		size_t		xcnt;
+
+		/* subxip must be sorted in xidComparator order */
+		TransactionId *subxip;
+		TransactionId *xip;
+	}			subxids;
 };
 
 /*
@@ -812,7 +836,7 @@ SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
 	 * we only log new_cid's if a catalog tuple was modified, so mark the
 	 * transaction as containing catalog modifications
 	 */
-	ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
+	SnapBuildXidSetCatalogChanges(builder, xid, lsn);
 
 	ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
 								 xlrec->target_locator, xlrec->target_tid,
@@ -1003,6 +1027,47 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder)
 			 builder->xmin, builder->xmax);
 		builder->catchange.xcnt = surviving_xids;
 	}
+
+	/*
+	 * Purge xids in ->subxids as well. The purged array must also be sorted
+	 * in xidComparator order.
+	 */
+	if (builder->subxids.xcnt > 0)
+	{
+		/*
+		 * Since subxids.xip is sorted, we find the lower bound of xids that
+		 * are still interesting.
+		 */
+		for (off = 0; off < builder->subxids.xcnt; off++)
+		{
+			if (TransactionIdFollowsOrEquals(builder->subxids.subxip[off],
+											 builder->xmin))
+				break;
+		}
+
+		surviving_xids = builder->subxids.xcnt - off;
+
+		if (surviving_xids > 0)
+		{
+			memmove(builder->subxids.subxip, &(builder->subxids.subxip[off]),
+					surviving_xids * sizeof(TransactionId));
+			memmove(builder->subxids.xip, &(builder->subxids.xip[off]),
+					surviving_xids * sizeof(TransactionId));
+		}
+		else
+		{
+			pfree(builder->subxids.subxip);
+			builder->subxids.subxip = NULL;
+			pfree(builder->subxids.xip);
+			builder->subxids.xip = NULL;
+		}
+
+		elog(DEBUG3, "purged subtransactions from %u to %u, xmin: %u, xmax: %u",
+			 (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
+			 builder->xmin, builder->xmax);
+
+		builder->subxids.xcnt = surviving_xids;
+	}
 }
 
 /*
@@ -1595,6 +1660,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	TransactionId *catchange_xip = NULL;
 	MemoryContext old_ctx;
 	size_t		catchange_xcnt;
+	List 		*list = NIL;
+	size_t 		 sub_xcnt;
 	char	   *ondisk_c;
 	int			fd;
 	char		tmppath[MAXPGPATH];
@@ -1686,8 +1753,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
 	catchange_xcnt = builder->reorder->catchange_ntxns;
 
+	/* Get the list of subtransactions */
+	list = ReorderBufferGetSubtxns(builder->reorder);
+	sub_xcnt = list_length(list);
+
 	needed_length = sizeof(SnapBuildOnDisk) +
-		sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
+		sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt + sub_xcnt * 2);
 
 	ondisk_c = palloc0(needed_length);
 	ondisk = (SnapBuildOnDisk *) ondisk_c;
@@ -1707,8 +1778,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	ondisk->builder.reorder = NULL;
 	ondisk->builder.committed.xip = NULL;
 	ondisk->builder.catchange.xip = NULL;
-	/* update catchange only on disk data */
+	/* update catchange and subxids on disk data */
 	ondisk->builder.catchange.xcnt = catchange_xcnt;
+	ondisk->builder.subxids.xcnt = sub_xcnt;
 
 	COMP_CRC32C(ondisk->checksum,
 				&ondisk->builder,
@@ -1732,6 +1804,37 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 		ondisk_c += sz;
 	}
 
+	/* copy sub-top transaction arrays */
+	if (sub_xcnt > 0)
+	{
+		TransactionId *subxips = (TransactionId *) palloc(sizeof(TransactionId) * sub_xcnt);
+		TransactionId *topxips = (TransactionId *) palloc(sizeof(TransactionId) * sub_xcnt);
+		ListCell *lc;
+		int i = 0;
+
+		sz = sizeof(TransactionId) * sub_xcnt;
+
+		foreach(lc, list)
+		{
+			subxids_cell *cell = (subxids_cell *) lfirst(lc);
+
+			subxips[i] = cell->subtxn;
+			topxips[i] = cell->txn;
+			i++;
+		}
+
+		memcpy(ondisk_c, subxips, sz);
+		COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+		ondisk_c += sz;
+
+		memcpy(ondisk_c, topxips, sz);
+		COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+		ondisk_c += sz;
+
+		pfree(subxips);
+		pfree(topxips);
+	}
+
 	FIN_CRC32C(ondisk->checksum);
 
 	/* we have valid data now, open tempfile and write it there */
@@ -1821,6 +1924,8 @@ out:
 		pfree(ondisk);
 	if (catchange_xip)
 		pfree(catchange_xip);
+	if (list)
+		list_free_deep(list);
 }
 
 /*
@@ -1906,6 +2011,19 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 		COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
 	}
 
+	if (ondisk.builder.subxids.xcnt > 0)
+	{
+		sz = sizeof(TransactionId) * ondisk.builder.subxids.xcnt;
+
+		ondisk.builder.subxids.subxip = MemoryContextAllocZero(builder->context, sz);
+		SnapBuildRestoreContents(fd, (char *) ondisk.builder.subxids.subxip, sz, path);
+		COMP_CRC32C(checksum, ondisk.builder.subxids.subxip, sz);
+
+		ondisk.builder.subxids.xip = MemoryContextAllocZero(builder->context, sz);
+		SnapBuildRestoreContents(fd, (char *) ondisk.builder.subxids.xip, sz, path);
+		COMP_CRC32C(checksum, ondisk.builder.subxids.xip, sz);
+	}
+
 	if (CloseTransientFile(fd) != 0)
 		ereport(ERROR,
 				(errcode_for_file_access(),
@@ -1966,6 +2084,18 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	builder->catchange.xip = ondisk.builder.catchange.xip;
 	ondisk.builder.catchange.xip = NULL;
 
+	if (builder->subxids.subxip)
+		pfree(builder->subxids.subxip);
+	if(builder->subxids.xip)
+		pfree(builder->subxids.xip);
+
+	builder->subxids.xcnt = ondisk.builder.subxids.xcnt;
+	builder->subxids.subxip = ondisk.builder.subxids.subxip;
+	builder->subxids.xip = ondisk.builder.subxids.xip;
+
+	ondisk.builder.subxids.subxip = NULL;
+	ondisk.builder.subxids.xip = NULL;
+
 	/* our snapshot is not interesting anymore, build a new one */
 	if (builder->snapshot != NULL)
 	{
@@ -1989,6 +2119,10 @@ snapshot_not_interesting:
 		pfree(ondisk.builder.committed.xip);
 	if (ondisk.builder.catchange.xip != NULL)
 		pfree(ondisk.builder.catchange.xip);
+	if (ondisk.builder.subxids.subxip != NULL)
+		pfree(ondisk.builder.subxids.subxip);
+	if (ondisk.builder.subxids.xip != NULL)
+		pfree(ondisk.builder.subxids.xip);
 	return false;
 }
 
@@ -2115,3 +2249,30 @@ CheckPointSnapBuild(void)
 	}
 	FreeDir(snap_dir);
 }
+
+/*
+ * Check the reorder buffer and the snapshot to mark if the given transaction
+ * has a top transaction.
+ */
+void
+SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
+{
+	ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
+
+	/* Check the subxids array */
+	if (builder->subxids.xcnt > 0)
+	{
+		TransactionId *found = bsearch(&xid, builder->subxids.subxip,
+									   builder->subxids.xcnt,
+									   sizeof(TransactionId), xidComparator);
+
+		/* Mark to the top-level transaction, if found */
+		if (found)
+		{
+			int index = (found - builder->subxids.subxip) / sizeof(TransactionId);
+
+			ReorderBufferXidSetCatalogChanges(builder->reorder,
+											  builder->subxids.xip[index], lsn);
+		}
+	}
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 8695901ba7..c3f4042fd8 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -628,6 +628,13 @@ struct ReorderBuffer
 	int64		totalBytes;		/* total amount of data decoded */
 };
 
+/*
+ * The cell structure used by ReorderBufferGetSubtxns()
+ */
+typedef struct subxids_cell {
+	TransactionId subtxn;
+	TransactionId txn;
+} subxids_cell;
 
 extern ReorderBuffer *ReorderBufferAllocate(void);
 extern void ReorderBufferFree(ReorderBuffer *);
@@ -694,4 +701,6 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
 
+extern List *ReorderBufferGetSubtxns(ReorderBuffer *rb);
+
 #endif
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index e6adea24f2..3c1b89f9b4 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -91,4 +91,6 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
 										 struct xl_running_xacts *running);
 extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
 
+extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn);
+
 #endif							/* SNAPBUILD_H */
-- 
2.27.0

