On Wed, Jul 20, 2022 at 5:50 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > On Wed, Jul 20, 2022 at 1:28 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > > On Wed, Jul 20, 2022 at 2:19 PM Amit Kapila <amit.kapil...@gmail.com> wrote: > > > > > > On Wed, Jul 20, 2022 at 9:01 AM Masahiko Sawada <sawada.m...@gmail.com> > > > wrote: > > > > > > > Another idea would be to have functions, say > > > > SnapBuildCommitTxnWithXInfo() and SnapBuildCommitTxn_ext(). The latter > > > > does actual work of handling transaction commits and both > > > > SnapBuildCommitTxn() and SnapBuildCommit() call > > > > SnapBuildCommitTxnWithXInfo() with different arguments. > > > > > > > > > > Do you want to say DecodeCommit() instead of SnapBuildCommit() in > > > above para? > > > > I meant that we will call like DecodeCommit() -> > > SnapBuildCommitTxnWithXInfo() -> SnapBuildCommitTxn_ext(has_invals = > > true) -> SnapBuildXidHasCatalogChanges(has_invals = true) -> ... If > > SnapBuildCommitTxn() gets called, it calls SnapBuildCommitTxn_ext() > > with has_invals = false and behaves the same as before. > > > > Okay, understood. This will work. > > > > Yet another idea could be to have another flag > > > RBTXN_HAS_INVALS which will be set by DecodeCommit for top-level TXN. > > > Then, we can retrieve it even for each of the subtxn's if and when > > > required. > > > > Do you mean that when checking if the subtransaction has catalog > > changes, we check if its top-level XID has this new flag? > > > > Yes. > > > Why do we > > need the new flag? > > > > This is required if we don't want to introduce a new set of functions > as you proposed above. I am not sure which one is better w.r.t back > patching effort later but it seems to me using flag stuff would make > future back patches easier if we make any changes in > SnapBuildCommitTxn.
Understood. I've implemented this idea as well for discussion. Both patches have the common change to remember the initial running transactions and to purge them when decoding xl_running_xacts records. The difference is how to mark the transactions as needing to be added to the snapshot. In v7-0001-Fix-catalog-lookup-with-the-wrong-snapshot-during.patch, when the transaction is in the initial running xact list and its commit record has XINFO_HAS_INVAL flag, we mark both the top transaction and its all subtransactions as containing catalog changes (which also means to create ReorderBufferTXN entries for them). These transactions are added to the snapshot in SnapBuildCommitTxn() since ReorderBufferXidHasCatalogChanges () for them returns true. In poc_mark_top_txn_has_inval.patch, when the transaction is in the initial running xacts list and its commit record has XINFO_HAS_INVALS flag, we set a new flag, say RBTXN_COMMIT_HAS_INVALS, only to the top transaction. In SnapBuildCommitTxn(), we add all subtransactions to the snapshot without checking ReorderBufferXidHasCatalogChanges() for subtransactions if its top transaction has the RBTXN_COMMIT_HAS_INVALS flag. A difference between the two ideas is the scope of changes: the former changes only snapbuild.c but the latter changes both snapbuild.c and reorderbuffer.c. Moreover, while the former uses the existing flag, the latter adds a new flag to the reorder buffer for dealing with only this case. I think the former idea is simpler in terms of that. But, an advantage of the latter idea is that the latter idea can save to create ReorderBufferTXN entries for subtransactions. Overall I prefer the former for now but I'd like to hear what others think. FWIW, I didn't try the idea of adding wrapper functions since it would be costly in terms of back patching effort in the future. Regards, -- Masahiko Sawada EDB: https://www.enterprisedb.com/
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 9a31e0b879..4553252d75 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot + twophase_snapshot catalog_change_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out new file mode 100644 index 0000000000..dc4f9b7018 --- /dev/null +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 0000000000..662760fbcf --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,39 @@ +# Test decoding only the commit record of the transaction that have +# modified catalogs. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACT record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the XACT_RUNNING record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACT +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 92dfafc632..364d832e6d 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -691,6 +691,17 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, commit_time = parsed->origin_timestamp; } + /* + * If the COMMIT record has invalidation messages, it could have catalog + * changes. We check if it's in the list of the initial running transactions + * and then mark it as containing catalog change. + * + * This must be done before SnapBuildCommitTxn() so that we can include + * catalog change transactions to the historic snapshot. + */ + if (parsed->xinfo & XACT_XINFO_HAS_INVALS) + ReorderBufferXidSetCommitHasInvals(ctx->reorder, xid, buf->origptr); + SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, parsed->nsubxacts, parsed->subxacts); diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index e59d1396b5..1a081ca966 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -3300,6 +3300,38 @@ ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) return rbtxn_has_catalog_changes(txn); } +/* + * Mark the commit record of the transaction has invalidation messages. + */ +void +ReorderBufferXidSetCommitHasInvals(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); + + txn->txn_flags |= RBTXN_COMMIT_HAS_INVALS; + + /* We mark the flag to for top-level transaction */ +} + +/* + * Query whether the commit of the transaction has invalidation messages. + */ +bool +ReorderBufferXidCommitHasInvals(ReorderBuffer *rb, TransactionId xid) +{ + ReorderBufferTXN *txn; + + txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, + false); + if (txn == NULL) + return false; + + return rbtxn_commit_has_invals(txn); +} + /* * ReorderBufferXidHasBaseSnapshot * Have we already set the base snapshot for the given txn/subtxn? diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 6df602485b..5af8b959c8 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -250,8 +250,38 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* + * Array of transactions and subtransactions that were running when + * the xl_running_xacts record that we decoded first was written. + * The array is sorted in xidComparator order. Xids are only removed + * from the array when decoding xl_running_xacts record, and then + * the array eventually becomes an empty. This array is allocated in + * builder->context so its lifetime is the same as the snapshot builder. + * + * We rely on HEAP2_NEW_CID records and XACT_INVALIDATIONS to know + * if the transaction has changed the catalog, and that information + * is not serialized to SnapBuilder. Therefore, if the logical + * decoding decodes the commit record of the transaction that actually + * has done catalog changes without these records, we miss to add + * the xid to the snapshot, and end up looking at catalogs with the + * wrong snapshot. To avoid this problem, if the COMMIT record of + * the xid listed in InitialRunningXacts has XACT_XINFO_HAS_INVALS + * flag, we mark both the top transaction and its substransactions + * as containing catalog changes. + * + * We could end up adding the transaction that didn't change catalog + * to the snapshot since we cannot distinguish whether the transaction + * has catalog changes only by checking the COMMIT record. It doesn't + * have the information on which (sub) transaction has catalog changes, + * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the + * transaction has catalog change. But it doesn't become a problem since + * we use historic snapshot only for reading system catalogs. + */ +static TransactionId *InitialRunningXacts = NULL; +static int NInitialRunningXacts = 0; + +/* ->committed and InitailRunningXacts manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -879,12 +909,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed and the initial + * running transactions that are smaller than ->xmin. Those won't ever get + * checked via the ->committed or InitialRunningXacts array, respectively. + * The committed xids will get checked via the clog machinery. + * + * We can ideally remove the transaction from InitialRunningXacts array + * once it is finished (committed/aborted) but that could be costly as we need + * to maintain the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -919,6 +954,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* Quick exit if there is no initial running transactions */ + if (likely(NInitialRunningXacts == 0)) + return; + + /* bound check if there is at least one transaction to remove */ + if (!NormalTransactionIdPrecedes(InitialRunningXacts[0], + builder->xmin)) + return; + + /* + * purge xids in InitialRunningXacts as well. The purged array must also be + * sorted in xidComparator order. + */ + workspace = + MemoryContextAlloc(builder->context, + NInitialRunningXacts * sizeof(TransactionId)); + surviving_xids = 0; + for (off = 0; off < NInitialRunningXacts; off++) + { + if (NormalTransactionIdPrecedes(InitialRunningXacts[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = InitialRunningXacts[off]; + } + + if (surviving_xids > 0) + memcpy(InitialRunningXacts, workspace, + sizeof(TransactionId) * surviving_xids); + else + { + pfree(InitialRunningXacts); + InitialRunningXacts = NULL; + } + + elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u", + (uint32) NInitialRunningXacts, + (uint32) surviving_xids, + builder->xmin); + + NInitialRunningXacts = surviving_xids; + pfree(workspace); } /* @@ -934,6 +1012,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, bool needs_timetravel = false; bool sub_needs_timetravel = false; + bool top_has_invals = false; + TransactionId xmax = xid; /* @@ -966,6 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } + top_has_invals = ReorderBufferXidCommitHasInvals(builder->reorder, xid); for (nxact = 0; nxact < nsubxacts; nxact++) { TransactionId subxid = subxacts[nxact]; @@ -974,7 +1055,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, * Add subtransaction to base snapshot if catalog modifying, we don't * distinguish to toplevel transactions there. */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid)) + if (top_has_invals || + ReorderBufferXidHasCatalogChanges(builder->reorder, subxid)) { sub_needs_timetravel = true; needs_snapshot = true; @@ -1003,7 +1085,8 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } /* if top-level modified catalog, it'll need a snapshot */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + if (top_has_invals || + ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); @@ -1104,6 +1187,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ if (builder->state < SNAPBUILD_CONSISTENT) { + /* + * Remember the transactions and subtransactions that were running + * when xl_running_xacts record that we decoded first was written. + */ + if (builder->state == SNAPBUILD_START) + { + int nxacts = running->subxcnt + running->xcnt; + Size sz = sizeof(TransactionId) * nxacts; + + NInitialRunningXacts = nxacts; + InitialRunningXacts = MemoryContextAlloc(builder->context, sz); + memcpy(InitialRunningXacts, running->xids, sz); + qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator); + } + /* returns false if there's no point in performing cleanup just yet */ if (!SnapBuildFindSnapshot(builder, lsn, running)) return; @@ -1126,7 +1224,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index ba257d81b5..ac00575db1 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -176,6 +176,7 @@ typedef struct ReorderBufferChange #define RBTXN_HAS_PARTIAL_CHANGE 0x0020 #define RBTXN_PREPARE 0x0040 #define RBTXN_SKIPPED_PREPARE 0x0080 +#define RBTXN_COMMIT_HAS_INVALS 0x0100 /* Does the transaction have catalog changes? */ #define rbtxn_has_catalog_changes(txn) \ @@ -207,6 +208,12 @@ typedef struct ReorderBufferChange ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \ ) +/* Does the commit record of the transaction has invalidation messages? */ +#define rbtxn_commit_has_invals(txn) \ +( \ + ((txn)->txn_flags & RBTXN_COMMIT_HAS_INVALS) != 0 \ +) + /* * Has this transaction been streamed to downstream? * @@ -665,6 +672,10 @@ void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLog bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); +void ReorderBufferXidSetCommitHasInvals(ReorderBuffer *rb, TransactionId xid, + XLogRecPtr lsn); +bool ReorderBufferXidCommitHasInvals(ReorderBuffer *rb, TransactionId xid); + bool ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, TimestampTz prepare_time,
From 725249d8ec56a7ea3219df801dc7b93f93eea145 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <sawada.mshk@gmail.com> Date: Mon, 11 Jul 2022 21:49:06 +0900 Subject: [PATCH v7] Fix catalog lookup with the wrong snapshot during logical decoding. Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION records to know if the transaction has modified the catalog, and that information is not serialized to snapshot. Therefore, if the logical decoding decodes only the commit record of the transaction that actually has modified a catalog, we missed adding its XID to the snapshot. We ended up looking at catalogs with the wrong snapshot. To fix this problem, this changes the snapshot builder so that it remembers the initial running transaction written in the xl_running_xacts record that we decoded first, and mark the transaction as containing catalog changes if it's in the list of the initial running transactions and its commit record has XACT_XINFO_HAS_INVALS. This has false positive; we could end up adding the transaction that didn't change catalog to the snapshot since we cannot distinguish whether the transaction has catalog changes only by checking the COMMIT record. It doesn't have the information on which (sub) transaction has catalog changes, and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the transaction has catalog change. But it doesn't become a problem since we use historic snapshot only for reading system catalogs. On the master branch, we took a more future-proof approach -- writing catalog modifying transactions to the serialized snapshot. But we cannot backpatch it because of change in SnapBuild. Also, to avoid ABI breakage, we store the array of the initial running transactions in the static variables InitialRunningXacts and NInitialRunningXacts, but not in ReorderBuffer. Back-patch to all supported released. Reported-by: Mike Oh <minsoo@amazon.com> Author: Masahiko Sawada <sawada.mshk@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Reviewed-by: Takamichi Osumi <osumi.takamichi@fujitsu.com> Reviewed-by: Kyotaro Horiguchi <horikyota.ntt@gmail.com> Reviewed-by: Bertrand Drouvot <bdrouvot@amazon.com> Reviewed-by: Shi yu <shiy.fnst@fujitsu.com> Reviewed-by: Ahsan Hadi <ahsan.hadi@gmail.com> Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com Backpatch-through: 10 --- contrib/test_decoding/Makefile | 2 +- .../expected/catalog_change_snapshot.out | 44 ++++++ .../specs/catalog_change_snapshot.spec | 39 +++++ src/backend/replication/logical/decode.c | 15 ++ src/backend/replication/logical/snapbuild.c | 143 +++++++++++++++++- src/include/replication/snapbuild.h | 5 + 6 files changed, 240 insertions(+), 8 deletions(-) create mode 100644 contrib/test_decoding/expected/catalog_change_snapshot.out create mode 100644 contrib/test_decoding/specs/catalog_change_snapshot.spec diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 9a31e0b879..4553252d75 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ spill slot truncate stream stats twophase twophase_stream ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ - twophase_snapshot + twophase_snapshot catalog_change_snapshot REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out new file mode 100644 index 0000000000..dc4f9b7018 --- /dev/null +++ b/contrib/test_decoding/expected/catalog_change_snapshot.out @@ -0,0 +1,44 @@ +Parsed test spec with 2 sessions + +starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes +step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); +?column? +-------- +init +(1 row) + +step s0_begin: BEGIN; +step s0_savepoint: SAVEPOINT sp1; +step s0_truncate: TRUNCATE tbl1; +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +---- +(0 rows) + +step s0_commit: COMMIT; +step s0_begin: BEGIN; +step s0_insert: INSERT INTO tbl1 VALUES (1); +step s1_checkpoint: CHECKPOINT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +--------------------------------------- +BEGIN +table public.tbl1: TRUNCATE: (no-flags) +COMMIT +(3 rows) + +step s0_commit: COMMIT; +step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); +data +------------------------------------------------------------- +BEGIN +table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null +COMMIT +(3 rows) + +?column? +-------- +stop +(1 row) + diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec new file mode 100644 index 0000000000..662760fbcf --- /dev/null +++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec @@ -0,0 +1,39 @@ +# Test decoding only the commit record of the transaction that have +# modified catalogs. +setup +{ + DROP TABLE IF EXISTS tbl1; + CREATE TABLE tbl1 (val1 integer, val2 integer); +} + +teardown +{ + DROP TABLE tbl1; + SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot'); +} + +session "s0" +setup { SET synchronous_commit=on; } +step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); } +step "s0_begin" { BEGIN; } +step "s0_savepoint" { SAVEPOINT sp1; } +step "s0_truncate" { TRUNCATE tbl1; } +step "s0_insert" { INSERT INTO tbl1 VALUES (1); } +step "s0_commit" { COMMIT; } + +session "s1" +setup { SET synchronous_commit=on; } +step "s1_checkpoint" { CHECKPOINT; } +step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); } + +# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes +# only its COMMIT record, because it starts from the RUNNING_XACT record emitted +# during the first checkpoint execution. This transaction must be marked as +# containing catalog changes while decoding the COMMIT record and the decoding +# of the INSERT record must read the pg_class with the correct historic snapshot. +# +# Note that in a case where bgwriter wrote the XACT_RUNNING record between "s0_commit" +# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACT +# record written by bgwriter. One might think we can either stop the bgwriter or +# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests. +permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes" diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 92dfafc632..a164442436 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -691,6 +691,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, commit_time = parsed->origin_timestamp; } + /* + * If the COMMIT record has invalidation messages, it could have catalog + * changes. We check if it's in the list of the initial running transactions + * and then mark it as containing catalog change. + * + * This must be done before SnapBuildCommitTxn() so that we can include + * catalog change transactions to the historic snapshot. + */ + if (parsed->xinfo & XACT_XINFO_HAS_INVALS) + SnapBuildInitialXactSetCatalogChanges(ctx->snapshot_builder, + xid, + parsed->nsubxacts, + parsed->subxacts, + buf->origptr); + SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, parsed->nsubxacts, parsed->subxacts); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 6df602485b..3e9d1a7931 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -250,8 +250,38 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* + * Array of transactions and subtransactions that were running when + * the xl_running_xacts record that we decoded first was written. + * The array is sorted in xidComparator order. Xids are only removed + * from the array when decoding xl_running_xacts record, and then + * the array eventually becomes an empty. This array is allocated in + * builder->context so its lifetime is the same as the snapshot builder. + * + * We rely on HEAP2_NEW_CID records and XACT_INVALIDATIONS to know + * if the transaction has changed the catalog, and that information + * is not serialized to SnapBuilder. Therefore, if the logical + * decoding decodes the commit record of the transaction that actually + * has done catalog changes without these records, we miss to add + * the xid to the snapshot, and end up looking at catalogs with the + * wrong snapshot. To avoid this problem, if the COMMIT record of + * the xid listed in InitialRunningXacts has XACT_XINFO_HAS_INVALS + * flag, we mark both the top transaction and its substransactions + * as containing catalog changes. + * + * We could end up adding the transaction that didn't change catalog + * to the snapshot since we cannot distinguish whether the transaction + * has catalog changes only by checking the COMMIT record. It doesn't + * have the information on which (sub) transaction has catalog changes, + * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the + * transaction has catalog change. But it doesn't become a problem since + * we use historic snapshot only for reading system catalogs. + */ +static TransactionId *InitialRunningXacts = NULL; +static int NInitialRunningXacts = 0; + +/* ->committed and InitailRunningXacts manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -879,12 +909,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed and the initial + * running transactions that are smaller than ->xmin. Those won't ever get + * checked via the ->committed or InitialRunningXacts array, respectively. + * The committed xids will get checked via the clog machinery. + * + * We can ideally remove the transaction from InitialRunningXacts array + * once it is finished (committed/aborted) but that could be costly as we need + * to maintain the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -919,6 +954,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* Quick exit if there is no initial running transactions */ + if (likely(NInitialRunningXacts == 0)) + return; + + /* bound check if there is at least one transaction to remove */ + if (!NormalTransactionIdPrecedes(InitialRunningXacts[0], + builder->xmin)) + return; + + /* + * purge xids in InitialRunningXacts as well. The purged array must also be + * sorted in xidComparator order. + */ + workspace = + MemoryContextAlloc(builder->context, + NInitialRunningXacts * sizeof(TransactionId)); + surviving_xids = 0; + for (off = 0; off < NInitialRunningXacts; off++) + { + if (NormalTransactionIdPrecedes(InitialRunningXacts[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = InitialRunningXacts[off]; + } + + if (surviving_xids > 0) + memcpy(InitialRunningXacts, workspace, + sizeof(TransactionId) * surviving_xids); + else + { + pfree(InitialRunningXacts); + InitialRunningXacts = NULL; + } + + elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u", + (uint32) NInitialRunningXacts, + (uint32) surviving_xids, + builder->xmin); + + NInitialRunningXacts = surviving_xids; + pfree(workspace); } /* @@ -1104,6 +1182,21 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact */ if (builder->state < SNAPBUILD_CONSISTENT) { + /* + * Remember the transactions and subtransactions that were running + * when xl_running_xacts record that we decoded first was written. + */ + if (builder->state == SNAPBUILD_START) + { + int nxacts = running->subxcnt + running->xcnt; + Size sz = sizeof(TransactionId) * nxacts; + + NInitialRunningXacts = nxacts; + InitialRunningXacts = MemoryContextAlloc(builder->context, sz); + memcpy(InitialRunningXacts, running->xids, sz); + qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator); + } + /* returns false if there's no point in performing cleanup just yet */ if (!SnapBuildFindSnapshot(builder, lsn, running)) return; @@ -1126,7 +1219,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1993,3 +2086,39 @@ CheckPointSnapBuild(void) } FreeDir(snap_dir); } + +/* + * If the given xid is in the list of the initial running xacts, we mark both it + * and its subtransactions as containing catalog changes if not yet. + */ +void +SnapBuildInitialXactSetCatalogChanges(SnapBuild *builder, TransactionId xid, + int subxcnt, TransactionId *subxacts, + XLogRecPtr lsn) +{ + /* + * Skip if there is no initial running xacts information or the + * transaction is already marked as containing catalog changes. + */ + if (likely((NInitialRunningXacts == 0) || + ReorderBufferXidHasCatalogChanges(builder->reorder, xid))) + return; + + /* + * If this committed transaction is the one that was running at the time + * when decoding the first RUNNING_XACTS record and have done catalog + * changes, we can mark both the top transaction and its subtransactions + * as containing catalog changes. + */ + if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts, + sizeof(TransactionId), xidComparator) != NULL) + { + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + + for (int i = 0; i < subxcnt; i++) + { + ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + } + } +} diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 3604621e88..d8f15d70e0 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -90,4 +90,9 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildInitialXactSetCatalogChanges(SnapBuild *builder, + TransactionId xid, + int subxcnt, + TransactionId *subxacts, + XLogRecPtr lsn); #endif /* SNAPBUILD_H */ -- 2.24.3 (Apple Git-128)