On Tue, Jul 12, 2022 at 5:52 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Tue, Jul 12, 2022 at 1:13 PM Masahiko Sawada <sawada.m...@gmail.com> wrote:
> >
> > On Tue, Jul 12, 2022 at 3:25 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> > >
> > > On Tue, Jul 12, 2022 at 11:38 AM Masahiko Sawada <sawada.m...@gmail.com> 
> > > wrote:
> > > >
> > > > On Tue, Jul 12, 2022 at 10:28 AM Masahiko Sawada 
> > > > <sawada.m...@gmail.com> wrote:
> > > > >
> > > > >
> > > > > I'm doing benchmark tests and will share the results.
> > > > >
> > > >
> > > > I've done benchmark tests to measure the overhead introduced by doing
> > > > bsearch() every time when decoding a commit record. I've simulated a
> > > > very intensified situation where we decode 1M commit records while
> > > > keeping builder->catchange.xip array but the overhead is negilible:
> > > >
> > > > HEAD: 584 ms
> > > > Patched: 614 ms
> > > >
> > > > I've attached the benchmark script I used. With increasing
> > > > LOG_SNAPSHOT_INTERVAL_MS to 90000, the last decoding by
> > > > pg_logicla_slot_get_changes() decodes 1M commit records while keeping
> > > > catalog modifying transactions.
> > > >
> > >
> > > Thanks for the test. We should also see how it performs when (a) we
> > > don't change LOG_SNAPSHOT_INTERVAL_MS,
> >
> > What point do you want to see in this test? I think the performance
> > overhead depends on how many times we do bsearch() and how many
> > transactions are in the list.
> >
>
> Right, I am not expecting any visible performance difference in this
> case. This is to ensure that we are not incurring any overhead in the
> more usual scenarios (or default cases). As per my understanding, the
> purpose of increasing the value of LOG_SNAPSHOT_INTERVAL_MS is to
> simulate a stress case for the changes made by the patch, and keeping
> its value default will test the more usual scenarios.

Agreed.

I've done simple benchmark tests to decode 100k pgbench transactions:

HEAD: 10.34 s
Patched: 10.29 s

I've attached an updated patch that incorporated comments from Amit and Shi.

Regards,

--
Masahiko Sawada
EDB:  https://www.enterprisedb.com/
From 28ca92c9d95cd05a26a7db6e54704f92b1846943 Mon Sep 17 00:00:00 2001
From: Masahiko Sawada <sawada.mshk@gmail.com>
Date: Wed, 6 Jul 2022 12:53:36 +0900
Subject: [PATCH v4] Add catalog modifying transactions to logical decoding
 serialized snapshot.

Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION
records to know if the transaction has modified the catalog, and that
information is not serialized to snapshot. Therefore, if the logical
decoding decodes only the commit record of the transaction that
actually has modified a catalog, we missed adding its XID to the
snapshot. We ended up looking at catalogs with the wrong snapshot.

To fix this problem, this change adds the list of transaction IDs and
sub-transaction IDs, that have modified catalogs and are running when
snapshot serialization, to the serialized snapshot. When decoding a
COMMIT record, we check both the list and the ReorderBuffer to see if
if the transaction has modified catalogs.

Since this adds additional information to the serialized snapshot, we
cannot backpatch it. For back branches, we take another approach;
remember the last-running-xacts list of the first decoded
RUNNING_XACTS record and check if the transaction whose commit record
has XACT_XINFO_HAS_INVALS and whose XID is in the list. This doesn't
require any file format changes but the transaction will end up being
added to the snapshot even if it has only relcache invalidations.

This commit bumps SNAPBUILD_VERSION because of change in SnapBuild.
---
 contrib/test_decoding/Makefile                |   2 +-
 .../expected/catalog_change_snapshot.out      |  44 ++++
 .../specs/catalog_change_snapshot.spec        |  39 +++
 .../replication/logical/reorderbuffer.c       |  69 ++++-
 src/backend/replication/logical/snapbuild.c   | 235 ++++++++++++------
 src/include/replication/reorderbuffer.h       |  12 +
 6 files changed, 317 insertions(+), 84 deletions(-)
 create mode 100644 contrib/test_decoding/expected/catalog_change_snapshot.out
 create mode 100644 contrib/test_decoding/specs/catalog_change_snapshot.spec

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index b220906479..c7ce603706 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-	twophase_snapshot slot_creation_error
+	twophase_snapshot slot_creation_error catalog_change_snapshot
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
new file mode 100644
index 0000000000..dc4f9b7018
--- /dev/null
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -0,0 +1,44 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                   
+---------------------------------------
+BEGIN                                  
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT                                 
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644
index 0000000000..42bad9a45b
--- /dev/null
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -0,0 +1,39 @@
+# Test that decoding only the commit record of the transaction that have
+# catalog-changed.
+setup
+{
+    DROP TABLE IF EXISTS tbl1;
+    CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
+# during the first checkpoint execution.  This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
+# record written by bgwriter.  One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 88a37fde72..d7f430623d 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -349,6 +349,8 @@ ReorderBufferAllocate(void)
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
+	buffer->catchange_ntxns = 0;
+
 	buffer->outbuf = NULL;
 	buffer->outbufsize = 0;
 	buffer->size = 0;
@@ -366,6 +368,7 @@ ReorderBufferAllocate(void)
 
 	dlist_init(&buffer->toplevel_by_lsn);
 	dlist_init(&buffer->txns_by_base_snapshot_lsn);
+	dlist_init(&buffer->catchange_txns);
 
 	/*
 	 * Ensure there's no stale data from prior uses of this slot, in case some
@@ -1526,7 +1529,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	}
 
 	/*
-	 * Remove TXN from its containing list.
+	 * Remove TXN from its containing lists.
 	 *
 	 * Note: if txn is known as subxact, we are deleting the TXN from its
 	 * parent's list of known subxacts; this leaves the parent's nsubxacts
@@ -1535,6 +1538,14 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	 */
 	dlist_delete(&txn->node);
 
+	if (rbtxn_has_catalog_changes(txn))
+	{
+		dlist_delete(&txn->catchange_node);
+		rb->catchange_ntxns--;
+
+		Assert(rb->catchange_ntxns >= 0);
+	}
+
 	/* now remove reference from buffer */
 	hash_search(rb->by_txn,
 				(void *) &txn->xid,
@@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
 								  XLogRecPtr lsn)
 {
 	ReorderBufferTXN *txn;
+	ReorderBufferTXN *toptxn;
 
 	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
-	txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+	if (!rbtxn_has_catalog_changes(txn))
+	{
+		txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+		dlist_push_tail(&rb->catchange_txns, &txn->catchange_node);
+		rb->catchange_ntxns++;
+	}
 
 	/*
 	 * Mark top-level transaction as having catalog changes too if one of its
@@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
 	 * conveniently check just top-level transaction and decide whether to
 	 * build the hash table or not.
 	 */
-	if (txn->toptxn != NULL)
-		txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+	toptxn = txn->toptxn;
+	if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+	{
+		toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+		dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+		rb->catchange_ntxns++;
+	}
+}
+
+/*
+ * Return palloc'ed array of the transactions that have changed catalogs.
+ * The returned array is sorted in xidComparator order.
+ *
+ * The caller must free the returned array when done with it.
+ */
+TransactionId *
+ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
+{
+	dlist_iter iter;
+	TransactionId *xids = NULL;
+	size_t	xcnt = 0;
+
+	/* Quick return if the list is empty */
+	if (dlist_is_empty(&rb->catchange_txns))
+	{
+		Assert(rb->catchange_ntxns == 0);
+		return NULL;
+	}
+
+	/* Initialize XID array */
+	xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns);
+	dlist_foreach(iter, &rb->catchange_txns)
+	{
+		ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN,
+												catchange_node,
+												iter.cur);
+
+		Assert(rbtxn_has_catalog_changes(txn));
+
+		xids[xcnt++] = txn->xid;
+	}
+
+	qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
+
+	Assert((xcnt > 0) && (xcnt == rb->catchange_ntxns));
+	return xids;
 }
 
 /*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 73c0f15214..c482e906b0 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -241,6 +241,30 @@ struct SnapBuild
 		 */
 		TransactionId *xip;
 	}			committed;
+
+	/*
+	 * Array of transactions and subtransactions that had modified catalogs
+	 * and were running when the snapshot was serialized.
+	 *
+	 * We normally rely on HEAP2_NEW_CID and XLOG_XACT_INVALIDATIONS records to
+	 * know if the transaction has changed the catalog. But it could happen that
+	 * the logical decoding decodes only the commit record of the transaction.
+	 * This array keeps track of the transactions that have modified catalogs
+	 * and were running when serializing a snapshot, and this array is used to
+	 * add such transactions to the snapshot.
+	 *
+	 * This array is set once when restoring the snapshot, xids are removed
+	 * from the array when decoding xl_running_xacts record, and then eventually
+	 * becomes empty.
+	 */
+	struct
+	{
+		/* number of transactions */
+		size_t		xcnt;
+
+		/* This array must be sorted in xidComparator order */
+		TransactionId *xip;
+	}			catchange;
 };
 
 /*
@@ -250,8 +274,8 @@ struct SnapBuild
 static ResourceOwner SavedResourceOwnerDuringExport = NULL;
 static bool ExportInProgress = false;
 
-/* ->committed manipulation */
-static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
+/* ->committed and ->catchange manipulation */
+static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
 
 /* snapshot building/manipulation/distribution functions */
 static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
@@ -262,6 +286,8 @@ static void SnapBuildSnapIncRefcount(Snapshot snap);
 
 static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
 
+static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid);
+
 /* xlog reading helper functions for SnapBuildProcessRunningXacts */
 static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
 static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
@@ -269,6 +295,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof
 /* serialization functions */
 static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
 static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
 
 /*
  * Allocate a new snapshot builder.
@@ -306,6 +333,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 		palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
 	builder->committed.includes_all_transactions = true;
 
+	builder->catchange.xcnt = 0;
+	builder->catchange.xip = NULL;
+
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
@@ -888,12 +918,15 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
 }
 
 /*
- * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->committed array but via
- * the clog machinery, so we don't need to waste memory on them.
+ * Remove knowledge about transactions we treat as committed or containing catalog
+ * changes that are smaller than ->xmin. Those won't ever get checked via
+ * the ->committed or ->catchange array, respectively. The committed xids will
+ * get checked via the clog machinery. We can ideally remove the transaction
+ * from catchange array once it is finished (committed/aborted) but that could
+ * be costly as we need to maintain the xids order in the array.
  */
 static void
-SnapBuildPurgeCommittedTxn(SnapBuild *builder)
+SnapBuildPurgeOlderTxn(SnapBuild *builder)
 {
 	int			off;
 	TransactionId *workspace;
@@ -928,6 +961,36 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
 	builder->committed.xcnt = surviving_xids;
 
 	pfree(workspace);
+
+	/* purge xids in ->catchange as well */
+	if (builder->catchange.xcnt > 0)
+	{
+		/*
+		 * Since catchange.xip is sorted, we find the lower bound of
+		 * xids that still are interesting.
+		 */
+		for (off = 0; off < builder->catchange.xcnt; off++)
+		{
+			if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
+											 builder->xmin))
+				break;
+		}
+
+		surviving_xids = builder->catchange.xcnt - off;
+		if (surviving_xids > 0)
+			memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
+					surviving_xids * sizeof(TransactionId));
+		else
+		{
+			/* catchange list becomes empty */
+			pfree(builder->catchange.xip);
+			builder->catchange.xip = NULL;
+		}
+
+		elog(DEBUG3, "purged catalog modifying transactions from %d to %d",
+			 (uint32) builder->catchange.xcnt, surviving_xids);
+		builder->catchange.xcnt = surviving_xids;
+	}
 }
 
 /*
@@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		 * Add subtransaction to base snapshot if catalog modifying, we don't
 		 * distinguish to toplevel transactions there.
 		 */
-		if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
+		if (SnapBuildXidHasCatalogChanges(builder, subxid))
 		{
 			sub_needs_timetravel = true;
 			needs_snapshot = true;
@@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	}
 
 	/* if top-level modified catalog, it'll need a snapshot */
-	if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+	if (SnapBuildXidHasCatalogChanges(builder, xid))
 	{
 		elog(DEBUG2, "found top level transaction %u, with catalog changes",
 			 xid);
@@ -1089,6 +1152,21 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	}
 }
 
+/*
+ * Check both the reorder buffer and the snapshot to see if the given
+ * transaction has modified catalogs.
+ */
+static inline bool
+SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid)
+{
+	if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+		return true;
+
+	/* Check the catchange XID array */
+	return ((builder->catchange.xcnt > 0) &&
+			(bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
+					 sizeof(TransactionId), xidComparator) != NULL));
+}
 
 /* -----------------------------------
  * Snapshot building functions dealing with xlog records
@@ -1135,7 +1213,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
 	builder->xmin = running->oldestRunningXid;
 
 	/* Remove transactions we don't need to keep track off anymore */
-	SnapBuildPurgeCommittedTxn(builder);
+	SnapBuildPurgeOlderTxn(builder);
 
 	/*
 	 * Advance the xmin limit for the current replication slot, to allow
@@ -1438,6 +1516,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
  *
  * struct SnapBuildOnDisk;
  * TransactionId * committed.xcnt; (*not xcnt_space*)
+ * TransactionId * catchange.xcnt;
  *
  */
 typedef struct SnapBuildOnDisk
@@ -1467,7 +1546,7 @@ typedef struct SnapBuildOnDisk
 	offsetof(SnapBuildOnDisk, version)
 
 #define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 4
+#define SNAPBUILD_VERSION 5
 
 /*
  * Store/Load a snapshot from disk, depending on the snapshot builder's state.
@@ -1493,6 +1572,8 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 {
 	Size		needed_length;
 	SnapBuildOnDisk *ondisk = NULL;
+	TransactionId	*catchange_xip = NULL;
+	size_t		catchange_xcnt;
 	char	   *ondisk_c;
 	int			fd;
 	char		tmppath[MAXPGPATH];
@@ -1578,8 +1659,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 				(errcode_for_file_access(),
 				 errmsg("could not remove file \"%s\": %m", tmppath)));
 
+	/* Get the catalog modifying transactions that are yet not committed */
+	catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
+	catchange_xcnt = builder->reorder->catchange_ntxns;
+
 	needed_length = sizeof(SnapBuildOnDisk) +
-		sizeof(TransactionId) * builder->committed.xcnt;
+		sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
 
 	ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
 	ondisk = (SnapBuildOnDisk *) ondisk_c;
@@ -1598,6 +1683,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	ondisk->builder.snapshot = NULL;
 	ondisk->builder.reorder = NULL;
 	ondisk->builder.committed.xip = NULL;
+	ondisk->builder.catchange.xip = NULL;
+	/* update catchange only on disk data */
+	ondisk->builder.catchange.xcnt = catchange_xcnt;
 
 	COMP_CRC32C(ondisk->checksum,
 				&ondisk->builder,
@@ -1609,6 +1697,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
 	ondisk_c += sz;
 
+	/* copy catalog modifying xacts */
+	sz = sizeof(TransactionId) * catchange_xcnt;
+	memcpy(ondisk_c, catchange_xip, sz);
+	COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+	ondisk_c += sz;
+
 	FIN_CRC32C(ondisk->checksum);
 
 	/* we have valid data now, open tempfile and write it there */
@@ -1694,6 +1788,8 @@ out:
 	/* be tidy */
 	if (ondisk)
 		pfree(ondisk);
+	if (catchange_xip)
+		pfree(catchange_xip);
 }
 
 /*
@@ -1707,7 +1803,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	int			fd;
 	char		path[MAXPGPATH];
 	Size		sz;
-	int			readBytes;
 	pg_crc32c	checksum;
 
 	/* no point in loading a snapshot if we're already there */
@@ -1739,29 +1834,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 
 
 	/* read statically sized portion of snapshot */
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
-	pgstat_report_wait_end();
-	if (readBytes != SnapBuildOnDiskConstantSize)
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		if (readBytes < 0)
-		{
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		}
-		else
-			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes,
-							(Size) SnapBuildOnDiskConstantSize)));
-	}
+	SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
 
 	if (ondisk.magic != SNAPBUILD_MAGIC)
 		ereport(ERROR,
@@ -1781,57 +1854,21 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 				SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
 
 	/* read SnapBuild */
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
-	pgstat_report_wait_end();
-	if (readBytes != sizeof(SnapBuild))
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		if (readBytes < 0)
-		{
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		}
-		else
-			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes, sizeof(SnapBuild))));
-	}
+	SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
 	COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
 
 	/* restore committed xacts information */
 	sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
 	ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
-	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
-	readBytes = read(fd, ondisk.builder.committed.xip, sz);
-	pgstat_report_wait_end();
-	if (readBytes != sz)
-	{
-		int			save_errno = errno;
-
-		CloseTransientFile(fd);
-
-		if (readBytes < 0)
-		{
-			errno = save_errno;
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read file \"%s\": %m", path)));
-		}
-		else
-			ereport(ERROR,
-					(errcode(ERRCODE_DATA_CORRUPTED),
-					 errmsg("could not read file \"%s\": read %d of %zu",
-							path, readBytes, sz)));
-	}
+	SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
 	COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
 
+	/* restore catalog modifying xacts information */
+	sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
+	ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
+	SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
+	COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
+
 	if (CloseTransientFile(fd) != 0)
 		ereport(ERROR,
 				(errcode_for_file_access(),
@@ -1885,6 +1922,14 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	}
 	ondisk.builder.committed.xip = NULL;
 
+	/* set catalog modifying transactions */
+	if (builder->catchange.xip)
+		pfree(builder->catchange.xip);
+	builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
+	builder->catchange.xip = ondisk.builder.catchange.xip;
+
+	ondisk.builder.catchange.xip = NULL;
+
 	/* our snapshot is not interesting anymore, build a new one */
 	if (builder->snapshot != NULL)
 	{
@@ -1909,6 +1954,38 @@ snapshot_not_interesting:
 	return false;
 }
 
+/*
+ * Read the contents of the serialized snapshot to the dest.
+ */
+static void
+SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
+{
+	int			readBytes;
+
+	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
+	readBytes = read(fd, dest, size);
+	pgstat_report_wait_end();
+	if (readBytes != size)
+	{
+		int			save_errno = errno;
+
+		CloseTransientFile(fd);
+
+		if (readBytes < 0)
+		{
+			errno = save_errno;
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": %m", path)));
+		}
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("could not read file \"%s\": read %d of %zu",
+							path, readBytes, sizeof(SnapBuild))));
+	}
+}
+
 /*
  * Remove all serialized snapshots that are not required anymore because no
  * slot can need them. This doesn't actually have to run during a checkpoint,
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index d109d0baed..fd84f175c0 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -380,6 +380,11 @@ typedef struct ReorderBufferTXN
 	 */
 	dlist_node	node;
 
+	/*
+	 * A node in the list of catalog modifying transactions
+	 */
+	dlist_node	catchange_node;
+
 	/*
 	 * Size of this transaction (changes currently in memory, in bytes).
 	 */
@@ -526,6 +531,12 @@ struct ReorderBuffer
 	 */
 	dlist_head	txns_by_base_snapshot_lsn;
 
+	/*
+	 * Transactions and subtransactions that have modified system catalogs.
+	 */
+	dlist_head	catchange_txns;
+	int			catchange_ntxns;
+
 	/*
 	 * one-entry sized cache for by_txn. Very frequently the same txn gets
 	 * looked up over and over again.
@@ -677,6 +688,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
 extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
 extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
+extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
-- 
2.24.3 (Apple Git-128)

Reply via email to