On Wed, May 25, 2022 at 12:11 PM Masahiko Sawada <sawada.m...@gmail.com> wrote:
>
> On Tue, May 24, 2022 at 2:18 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
> >
> > On Tue, May 24, 2022 at 7:58 AM Masahiko Sawada <sawada.m...@gmail.com> 
> > wrote:
> > >
> > > On Mon, May 23, 2022 at 2:39 PM Amit Kapila <amit.kapil...@gmail.com> 
> > > wrote:
> > > >
> > > > On Mon, May 23, 2022 at 10:03 AM Kyotaro Horiguchi
> > > > <horikyota....@gmail.com> wrote:
> > > > >
> > > > > At Sat, 21 May 2022 15:35:58 +0530, Amit Kapila 
> > > > > <amit.kapil...@gmail.com> wrote in
> > > > > > I think if we don't have any better ideas then we should go with
> > > > > > either this or one of the other proposals in this thread. The other
> > > > > > idea that occurred to me is whether we can somehow update the 
> > > > > > snapshot
> > > > > > we have serialized on disk about this information. On each
> > > > > > running_xact record when we serialize the snapshot, we also try to
> > > > > > purge the committed xacts (via SnapBuildPurgeCommittedTxn). So, 
> > > > > > during
> > > > > > that we can check if there are committed xacts to be purged and if 
> > > > > > we
> > > > > > have previously serialized the snapshot for the prior running xact
> > > > > > record, if so, we can update it with the list of xacts that have
> > > > > > catalog changes. If this is feasible then I think we need to somehow
> > > > > > remember the point where we last serialized the snapshot (maybe by
> > > > > > using builder->last_serialized_snapshot). Even, if this is feasible 
> > > > > > we
> > > > > > may not be able to do this in back-branches because of the 
> > > > > > disk-format
> > > > > > change required for this.
> > > > > >
> > > > > > Thoughts?
> > >
> > > It seems to work, could you draft the patch?
> > >
> >
> > I can help with the review and discussion.
>
> Okay, I'll draft the patch for this idea.

I've attached three POC patches:

poc_remember_last_running_xacts_v2.patch is a rebased patch of my
previous proposal[1]. This is based on the original proposal: we
remember the last-running-xacts list of the first decoded
RUNNING_XACTS record and check if the transaction whose commit record
has XACT_XINFO_HAS_INVALS and whose xid is in the list. This doesn’t
require any file format changes but the transaction will end up being
added to the snapshot even if it has only relcache invalidations.

poc_add_running_catchanges_xacts_to_serialized_snapshot.patch is a
patch for the idea Amit Kapila proposed with some changes. The basic
approach is to remember the list of xids that changed catalogs and
were running when serializing the snapshot. The list of xids is kept
in SnapShotBuilder and is serialized and restored to/from the
serialized snapshot. When decoding a commit record, we check if the
transaction is already marked as catalog-changes or its xid is in the
list. If so, we add it to the snapshot. Unlike the first patch, it can
add only transactions properly that have changed catalogs, but as Amit
mentioned before, this idea cannot be back patched as this changes the
on-disk format of the serialized snapshot.

poc_add_regression_tests.patch adds regression tests for this bug. The
regression tests are required for both HEAD and back-patching but I've
separated this patch for testing the above two patches easily.

Regards,

-- 
Masahiko Sawada
EDB:  https://www.enterprisedb.com/
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8da5f9089c..19123cbfa3 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -4821,6 +4821,45 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
 	txn->toast_hash = NULL;
 }
 
+/*
+ * Return palloc'ed array of the transactions that have changed catalogs.
+ * The returned array is sorted in xidComparator order.
+ *
+ * The caller must free the returned array when done with it.
+ */
+TransactionId *
+ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb, size_t *xcnt_p)
+{
+	HASH_SEQ_STATUS hash_seq;
+	ReorderBufferTXNByIdEnt *ent;
+	TransactionId *xids;
+	size_t	xcnt = 0;
+	size_t	xcnt_space = 64; /* arbitrary number */
+
+	xids = (TransactionId *) palloc(sizeof(TransactionId) * xcnt_space);
+
+	hash_seq_init(&hash_seq, rb->by_txn);
+	while ((ent = hash_seq_search(&hash_seq)) != NULL)
+	{
+		ReorderBufferTXN *txn = ent->txn;
+
+		if (!rbtxn_has_catalog_changes(txn))
+			continue;
+
+		if (xcnt >= xcnt_space)
+		{
+			xcnt_space *= 2;
+			xids = repalloc(xids, sizeof(TransactionId) * xcnt_space);
+		}
+
+		xids[xcnt++] = txn->xid;
+	}
+
+	qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
+
+	*xcnt_p = xcnt;
+	return xids;
+}
 
 /* ---------------------------------------
  * Visibility support for logical decoding
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 1119a12db9..c57d5f91d9 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -241,6 +241,26 @@ struct SnapBuild
 		 */
 		TransactionId *xip;
 	}			committed;
+
+	/*
+	 * Array of transactions that were running when the snapshot serialization
+	 * and changed system catalogs, but that are not committed.
+	 *
+	 * We normally rely on HEAP2_NEW_CID records and XLOG_XACT_INVALIDATIONS to
+	 * know if the transaction has changed the catalog. But it could happen that
+	 * the logical decoding decodes only the commit record of the transaction.
+	 * This array keeps track of the transactions that were running but not
+	 * committed when serializing and restoring a snapshot, and is used to add
+	 * such transactions to the snapshot.
+	 */
+	struct
+	{
+		/* number of transactions */
+		size_t		xcnt;
+
+		/* This array must be sorted in xidComparator order */
+		TransactionId *xip;
+	}			catchanges;
 };
 
 /*
@@ -306,6 +326,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
 		palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
 	builder->committed.includes_all_transactions = true;
 
+	builder->catchanges.xcnt = 0;
+	builder->catchanges.xip = NULL;
+
 	builder->initial_xmin_horizon = xmin_horizon;
 	builder->start_decoding_at = start_lsn;
 	builder->building_full_snapshot = need_full_snapshot;
@@ -983,7 +1006,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 		 * Add subtransaction to base snapshot if catalog modifying, we don't
 		 * distinguish to toplevel transactions there.
 		 */
-		if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
+		if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid) ||
+			bsearch(&xid, builder->catchanges.xip, builder->catchanges.xcnt,
+					sizeof(TransactionId), xidComparator) != NULL)
 		{
 			sub_needs_timetravel = true;
 			needs_snapshot = true;
@@ -1012,7 +1037,9 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
 	}
 
 	/* if top-level modified catalog, it'll need a snapshot */
-	if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+	if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid) ||
+		bsearch(&xid, builder->catchanges.xip, builder->catchanges.xcnt,
+				sizeof(TransactionId), xidComparator) != NULL)
 	{
 		elog(DEBUG2, "found top level transaction %u, with catalog changes",
 			 xid);
@@ -1438,6 +1465,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
  *
  * struct SnapBuildOnDisk;
  * TransactionId * committed.xcnt; (*not xcnt_space*)
+ * TransactionId * catchanges.xcnt;
  *
  */
 typedef struct SnapBuildOnDisk
@@ -1578,8 +1606,17 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 				(errcode_for_file_access(),
 				 errmsg("could not remove file \"%s\": %m", tmppath)));
 
+	/*
+	 * Update the transactions that are running and changes catalogs that are
+	 * not committed.
+	 */
+	if (builder->catchanges.xip)
+		pfree(builder->catchanges.xip);
+	builder->catchanges.xip = ReorderBufferGetCatalogChangesXacts(builder->reorder,
+																  &builder->catchanges.xcnt);
+
 	needed_length = sizeof(SnapBuildOnDisk) +
-		sizeof(TransactionId) * builder->committed.xcnt;
+		sizeof(TransactionId) * (builder->committed.xcnt + builder->catchanges.xcnt);
 
 	ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
 	ondisk = (SnapBuildOnDisk *) ondisk_c;
@@ -1598,6 +1635,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	ondisk->builder.snapshot = NULL;
 	ondisk->builder.reorder = NULL;
 	ondisk->builder.committed.xip = NULL;
+	ondisk->builder.catchanges.xip = NULL;
 
 	COMP_CRC32C(ondisk->checksum,
 				&ondisk->builder,
@@ -1609,6 +1647,12 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
 	COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
 	ondisk_c += sz;
 
+	/* copy catalog-changes xacts */
+	sz = sizeof(TransactionId) * builder->catchanges.xcnt;
+	memcpy(ondisk_c, builder->catchanges.xip, sz);
+	COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+	ondisk_c += sz;
+
 	FIN_CRC32C(ondisk->checksum);
 
 	/* we have valid data now, open tempfile and write it there */
@@ -1832,6 +1876,33 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	}
 	COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
 
+	/* restore catalog-changes xacts information */
+	sz = sizeof(TransactionId) * ondisk.builder.catchanges.xcnt;
+	ondisk.builder.catchanges.xip = MemoryContextAllocZero(builder->context, sz);
+	pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
+	readBytes = read(fd, ondisk.builder.catchanges.xip, sz);
+	pgstat_report_wait_end();
+	if (readBytes != sz)
+	{
+		int			save_errno = errno;
+
+		CloseTransientFile(fd);
+
+		if (readBytes < 0)
+		{
+			errno = save_errno;
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not read file \"%s\": %m", path)));
+		}
+		else
+			ereport(ERROR,
+					(errcode(ERRCODE_DATA_CORRUPTED),
+					 errmsg("could not read file \"%s\": read %d of %zu",
+							path, readBytes, sz)));
+	}
+	COMP_CRC32C(checksum, ondisk.builder.catchanges.xip, sz);
+
 	if (CloseTransientFile(fd) != 0)
 		ereport(ERROR,
 				(errcode_for_file_access(),
@@ -1885,6 +1956,14 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 	}
 	ondisk.builder.committed.xip = NULL;
 
+	builder->catchanges.xcnt = ondisk.builder.catchanges.xcnt;
+	if (builder->catchanges.xcnt > 0)
+	{
+		pfree(builder->committed.xip);
+		builder->catchanges.xip = ondisk.builder.catchanges.xip;
+	}
+	ondisk.builder.catchanges.xip = NULL;
+
 	/* our snapshot is not interesting anymore, build a new one */
 	if (builder->snapshot != NULL)
 	{
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4a01f877e5..07e378d3ef 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -677,6 +677,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
 extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
 extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
 extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
+extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb, size_t *xcnt_p);
 
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index b220906479..c7ce603706 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	spill slot truncate stream stats twophase twophase_stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
-	twophase_snapshot slot_creation_error
+	twophase_snapshot slot_creation_error catalog_change_snapshot
 
 REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644
index 0000000000..fba67c49d6
--- /dev/null
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -0,0 +1,33 @@
+# Test that decoding only the commit record of the transaction that have catalog-changed.
+setup
+{
+    DROP TABLE IF EXISTS tbl1;
+    CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+    DROP TABLE tbl1;
+    SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACT record emitted
+# during the first checkpoint execution.  This transaction must be marked as
+# catalog-changes while decoding the COMMIT record and the decoding of the INSERT
+# record must read the pg_class with the correct historic snapshot.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index aa2427ba73..13d0c16541 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -343,6 +343,26 @@ standby_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 			{
 				xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
 
+				/*
+				 * We rely on HEAP2_NEW_CID records and XACT_INVALIDATIONS to know
+				 * if the transaction has changed the catalog, and that information
+				 * is not serialized to SnapBuilder.  Therefore, if the logical
+				 * decoding decodes the commit record of the transaction that actually
+				 * has done catalog changes without these records, we miss to add
+				 * the xid to the snapshot so up creating the wrong snapshot. To
+				 * avoid such a problem, if the COMMIT record of the xid listed in
+				 * the RUNNING_XACTS record read at the start of logical decoding
+				 * has XACT_XINFO_HAS_INVALS flag, we mark both the top transaction
+				 * and its substransactions as containing catalog changes (see also
+				 * ReorderBufferSetLastRunningXactsCatalogChanges()). Since we cannot
+				 * know which transactions actually have done catalog changes only
+				 * by reading the COMMIT record we do that for both.  So we might
+				 * mark an xid that actually has not done that but it’s not a
+				 * problem since we use historic snapshot only for reading system
+				 * catalogs.
+				 */
+				ReorderBufferProcessLastRunningXacts(ctx->reorder, running);
+
 				SnapBuildProcessRunningXacts(builder, buf->origptr, running);
 
 				/*
@@ -627,6 +647,15 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 		commit_time = parsed->origin_timestamp;
 	}
 
+	/*
+	 * Set the last running xacts as containing catalog change if necessary.
+	 * This must be done before SnapBuildCommitTxn() so that we include catalog
+	 * change transactions to the historic snapshot.
+	 */
+	ReorderBufferSetLastRunningXactsCatalogChanges(ctx->reorder, xid, parsed->xinfo,
+												   parsed->nsubxacts, parsed->subxacts,
+												   buf->origptr);
+
 	SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
 					   parsed->nsubxacts, parsed->subxacts);
 
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 8da5f9089c..fa5785e679 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -353,6 +353,9 @@ ReorderBufferAllocate(void)
 	buffer->outbufsize = 0;
 	buffer->size = 0;
 
+	buffer->last_running_xacts = NULL;
+	buffer->n_last_running_xacts = -1;
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -5161,3 +5164,103 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+void
+ReorderBufferProcessLastRunningXacts(ReorderBuffer *rb, xl_running_xacts *running)
+{
+	/* Quick exit if there is no longer last running xacts */
+	if (likely(rb->n_last_running_xacts == 0))
+		return;
+
+	/* First call, build the last running xact list */
+	if (rb->n_last_running_xacts == -1)
+	{
+		int nxacts = running->subxcnt + running->xcnt;
+		Size sz = sizeof(TransactionId) * nxacts;;
+
+		rb->last_running_xacts = MemoryContextAlloc(rb->context, sz);
+		memcpy(rb->last_running_xacts, running->xids, sz);
+		qsort(rb->last_running_xacts, nxacts, sizeof(TransactionId), xidComparator);
+
+		rb->n_last_running_xacts = nxacts;
+
+		return;
+	}
+
+	/*
+	 * Purge xids in the last running xacts list if we can do that for at least
+	 * one xid.
+	 */
+	if (NormalTransactionIdPrecedes(rb->last_running_xacts[0],
+									running->oldestRunningXid))
+	{
+		TransactionId *workspace;
+		int nxids = 0;
+
+		workspace = MemoryContextAlloc(rb->context, rb->n_last_running_xacts);
+		for (int i = 0; i < rb->n_last_running_xacts; i++)
+		{
+			if (NormalTransactionIdPrecedes(rb->last_running_xacts[i],
+											running->oldestRunningXid))
+				;	/* remove */
+			else
+				workspace[nxids++] = rb->last_running_xacts[i];
+		}
+
+		if (nxids > 0)
+			memcpy(rb->last_running_xacts, workspace, sizeof(TransactionId) * nxids);
+		else
+		{
+			pfree(rb->last_running_xacts);
+			rb->last_running_xacts = NULL;
+		}
+
+		rb->n_last_running_xacts = nxids;
+	}
+}
+
+void
+ReorderBufferSetLastRunningXactsCatalogChanges(ReorderBuffer *rb, TransactionId xid,
+											   uint32 xinfo, int subxcnt,
+											   TransactionId *subxacts, XLogRecPtr lsn)
+{
+	void *test;
+
+	/*
+	 * Skip if there is no longer last running xacts information or the COMMIT record
+	 * doesn't have invalidation message, which is a common case.
+	 */
+	if (likely(rb->n_last_running_xacts == 0 || !(xinfo & XACT_XINFO_HAS_INVALS)))
+		return;
+
+	test = bsearch(&xid, rb->last_running_xacts, rb->n_last_running_xacts,
+				   sizeof(TransactionId), xidComparator);
+
+	if (test == NULL)
+	{
+		for (int i = 0; i < subxcnt; i++)
+		{
+			test = bsearch(&subxacts[i], rb->last_running_xacts, rb->n_last_running_xacts,
+						   sizeof(TransactionId), xidComparator);
+
+			if (test != NULL)
+				break;
+		}
+	}
+
+	/*
+	 * If this committed transaction is the one that was running at the time when
+	 * decoding the first RUNNING_XACTS record and have done catalog changes, we
+	 * can mark the top transaction and its subtransactions as catalog-changes.
+	 */
+	if (test != NULL)
+	{
+		ReorderBufferXidSetCatalogChanges(rb, xid, lsn);
+
+		for (int i = 0; i < subxcnt; i++)
+		{
+			ReorderBufferAssignChild(rb, xid, subxacts[i], lsn);
+			ReorderBufferXidSetCatalogChanges(rb, subxacts[i], lsn);
+		}
+	}
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 4a01f877e5..c06df80e08 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -12,6 +12,7 @@
 #include "access/htup_details.h"
 #include "lib/ilist.h"
 #include "storage/sinval.h"
+#include "storage/standby.h"
 #include "utils/hsearch.h"
 #include "utils/relcache.h"
 #include "utils/snapshot.h"
@@ -593,6 +594,9 @@ struct ReorderBuffer
 	/* memory accounting */
 	Size		size;
 
+	TransactionId *last_running_xacts;
+	int n_last_running_xacts;	/* -1 for initial value */
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
@@ -682,4 +686,9 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
 
+void		ReorderBufferProcessLastRunningXacts(ReorderBuffer *rb, xl_running_xacts *running);
+void		ReorderBufferSetLastRunningXactsCatalogChanges(ReorderBuffer *rb, TransactionId xid,
+														   uint32 xinfo, int subxcnt,
+														   TransactionId *subxacts, XLogRecPtr lsn);
+
 #endif

Reply via email to