Hello, everyone. Stress tests for REPACK concurrently in attachment. So far I can't break anything (except MVCC of course).
A rebased version of the MVCC-safe "light" version with its own stress test is attached also. Best regards, Mikhail.
From 457235c743a2dec2c1917fbdfa7f5a48d305c63e Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu <[email protected]> Date: Sat, 13 Dec 2025 19:42:52 +0100 Subject: [PATCH vnocfbot] Preserve visibility information of the concurrent data changes. As explained in the commit message of the preceding patch of the series, the data changes done by applications while REPACK CONCURRENTLY is copying the table contents to a new file are decoded from WAL and eventually also applied to the new file. To reduce the complexity a little bit, the preceding patch uses the current transaction (i.e. transaction opened by the REPACK command) to execute those INSERT, UPDATE and DELETE commands. However, REPACK is not expected to change visibility of tuples. Therefore, this patch fixes the handling of the "concurrent data changes". It ensures that tuples written into the new table have the same XID and command ID (CID) as they had in the old table. To "replay" an UPDATE or DELETE command on the new table, we use SnapshotSelf to find the last alive version of tuple and update with stamp with xid of original transaction. It is safe because: * all transactions we replaying are committed * apply worker working without any concurrent modifiers of the table As long as we preserve the tuple visibility information (which includes XID), it's important to avoid logical decoding of the WAL generated by DMLs on the new table: the logical decoding subsystem probably does not expect that the incoming WAL records contain XIDs of an already decoded transactions. (And of course, repeated decoding would be wasted effort.) Author: Antonin Houska <[email protected]> with changes from Mikhail Nikalayeu <[email protected] --- contrib/amcheck/meson.build | 1 + .../amcheck/t/009_repack_concurrently_mvcc.pl | 113 ++++++++++++++++++ doc/src/sgml/mvcc.sgml | 12 +- doc/src/sgml/ref/repack.sgml | 9 -- src/backend/access/common/toast_internals.c | 3 +- src/backend/access/heap/heapam.c | 29 +++-- src/backend/access/heap/heapam_handler.c | 24 ++-- src/backend/commands/cluster.c | 107 ++++++++++++----- .../pgoutput_repack/pgoutput_repack.c | 16 ++- src/include/access/heapam.h | 6 +- .../injection_points/specs/repack.spec | 4 - 11 files changed, 243 insertions(+), 81 deletions(-) create mode 100644 contrib/amcheck/t/009_repack_concurrently_mvcc.pl diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build index f7c70735989..6946c684259 100644 --- a/contrib/amcheck/meson.build +++ b/contrib/amcheck/meson.build @@ -52,6 +52,7 @@ tests += { 't/006_verify_gin.pl', 't/007_repack_concurrently.pl', 't/008_repack_concurrently.pl', + 't/009_repack_concurrently_mvcc.pl', ], }, } diff --git a/contrib/amcheck/t/009_repack_concurrently_mvcc.pl b/contrib/amcheck/t/009_repack_concurrently_mvcc.pl new file mode 100644 index 00000000000..a83fd5b8141 --- /dev/null +++ b/contrib/amcheck/t/009_repack_concurrently_mvcc.pl @@ -0,0 +1,113 @@ + +# Copyright (c) 2021-2025, PostgreSQL Global Development Group + +# Test REPACK CONCURRENTLY with concurrent modifications +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +my $node; + +# +# Test set-up +# +$node = PostgreSQL::Test::Cluster->new('CIC_test'); +$node->init; +$node->append_conf('postgresql.conf', + 'lock_timeout = ' . (1000 * $PostgreSQL::Test::Utils::timeout_default)); +$node->append_conf( + 'postgresql.conf', qq( +wal_level = logical +)); +$node->start; +$node->safe_psql('postgres', q(CREATE TABLE tbl1(i int PRIMARY KEY, j int))); +$node->safe_psql('postgres', q(CREATE TABLE tbl2(i int PRIMARY KEY, j int))); + + +# Insert 100 rows into tbl1 +$node->safe_psql('postgres', q( + INSERT INTO tbl1 SELECT i, i % 100 FROM generate_series(1,100) i +)); + +# Insert 100 rows into tbl2 +$node->safe_psql('postgres', q( + INSERT INTO tbl2 SELECT i, i % 100 FROM generate_series(1,100) i +)); + + +# Insert 100 rows into tbl1 +$node->safe_psql('postgres', q( + CREATE OR REPLACE FUNCTION log_raise(i int, j1 int, j2 int) RETURNS VOID AS $$ + BEGIN + RAISE NOTICE 'ERROR i=% j1=% j2=%', i, j1, j2; + END;$$ LANGUAGE plpgsql; +)); + +$node->safe_psql('postgres', q(CREATE UNLOGGED SEQUENCE in_row_rebuild START 1 INCREMENT 1;)); +$node->safe_psql('postgres', q(SELECT nextval('in_row_rebuild');)); + + +$node->pgbench( +'--no-vacuum --client=10 --jobs=4 --exit-on-abort --transactions=2500', +0, +[qr{actually processed}], +[qr{^$}], +'concurrent operations with REINDEX/CREATE INDEX CONCURRENTLY', +{ + 'concurrent_ops' => q( + SELECT pg_try_advisory_lock(42)::integer AS gotlock \gset + \if :gotlock + SELECT nextval('in_row_rebuild') AS last_value \gset + \if :last_value = 2 + REPACK (CONCURRENTLY) tbl1 USING INDEX tbl1_pkey; + \sleep 10 ms + REPACK (CONCURRENTLY) tbl2 USING INDEX tbl2_pkey; + \sleep 10 ms + \endif + SELECT pg_advisory_unlock(42); + \else + \set num random(1, 100) + BEGIN; + UPDATE tbl1 SET j = j + 1 WHERE i = :num; + \sleep 1 ms + UPDATE tbl1 SET j = j + 2 WHERE i = :num; + \sleep 1 ms + UPDATE tbl1 SET j = j + 3 WHERE i = :num; + \sleep 1 ms + UPDATE tbl1 SET j = j + 4 WHERE i = :num; + \sleep 1 ms + + UPDATE tbl2 SET j = j + 1 WHERE i = :num; + \sleep 1 ms + UPDATE tbl2 SET j = j + 2 WHERE i = :num; + \sleep 1 ms + UPDATE tbl2 SET j = j + 3 WHERE i = :num; + \sleep 1 ms + UPDATE tbl2 SET j = j + 4 WHERE i = :num; + + COMMIT; + SELECT setval('in_row_rebuild', 1); + + BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ; + SELECT COALESCE(SUM(j), 0) AS t1 FROM tbl1 WHERE i = :num \gset p_ + \sleep 10 ms + SELECT COALESCE(SUM(j), 0) AS t2 FROM tbl2 WHERE i = :num \gset p_ + \if :p_t1 != :p_t2 + COMMIT; + SELECT log_raise(tbl1.i, tbl1.j, tbl2.j) FROM tbl1 LEFT OUTER JOIN tbl2 ON tbl1.i = tbl2.i WHERE tbl1.j != tbl2.j; + \sleep 10 ms + SELECT log_raise(tbl1.i, tbl1.j, tbl2.j) FROM tbl1 LEFT OUTER JOIN tbl2 ON tbl1.i = tbl2.i WHERE tbl1.j != tbl2.j; + SELECT (:p_t1 + :p_t2) / 0; + \endif + + COMMIT; + \endif + ) +}); + +$node->stop; +done_testing(); diff --git a/doc/src/sgml/mvcc.sgml b/doc/src/sgml/mvcc.sgml index 0f5c34af542..049ee75a4ba 100644 --- a/doc/src/sgml/mvcc.sgml +++ b/doc/src/sgml/mvcc.sgml @@ -1833,17 +1833,15 @@ SELECT pg_advisory_lock(q.id) FROM <title>Caveats</title> <para> - Some commands, currently only <link linkend="sql-truncate"><command>TRUNCATE</command></link>, the - table-rewriting forms of <link linkend="sql-altertable"><command>ALTER - TABLE</command></link> and <command>REPACK</command> with - the <literal>CONCURRENTLY</literal> option, are not + Some DDL commands, currently only <link linkend="sql-truncate"><command>TRUNCATE</command></link> and the + table-rewriting forms of <link linkend="sql-altertable"><command>ALTER TABLE</command></link>, are not MVCC-safe. This means that after the truncation or rewrite commits, the table will appear empty to concurrent transactions, if they are using a - snapshot taken before the command committed. This will only be an + snapshot taken before the DDL command committed. This will only be an issue for a transaction that did not access the table in question - before the command started — any transaction that has done so + before the DDL command started — any transaction that has done so would hold at least an <literal>ACCESS SHARE</literal> table lock, - which would block the truncating or rewriting command until that transaction completes. + which would block the DDL command until that transaction completes. So these commands will not cause any apparent inconsistency in the table contents for successive queries on the target table, but they could cause visible inconsistency between the contents of the target diff --git a/doc/src/sgml/ref/repack.sgml b/doc/src/sgml/ref/repack.sgml index 30c43c49069..9796a923597 100644 --- a/doc/src/sgml/ref/repack.sgml +++ b/doc/src/sgml/ref/repack.sgml @@ -308,15 +308,6 @@ REPACK [ ( <replaceable class="parameter">option</replaceable> [, ...] ) ] USING </listitem> </itemizedlist> </para> - - <warning> - <para> - <command>REPACK</command> with the <literal>CONCURRENTLY</literal> - option is not MVCC-safe, see <xref linkend="mvcc-caveats"/> for - details. - </para> - </warning> - </listitem> </varlistentry> diff --git a/src/backend/access/common/toast_internals.c b/src/backend/access/common/toast_internals.c index 63b848473f8..91119da5cd5 100644 --- a/src/backend/access/common/toast_internals.c +++ b/src/backend/access/common/toast_internals.c @@ -311,7 +311,8 @@ toast_save_datum(Relation rel, Datum value, toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull); - heap_insert(toastrel, toasttup, mycid, options, NULL); + heap_insert(toastrel, toasttup, GetCurrentTransactionId(), mycid, + options, NULL); /* * Create the index entry. We cheat a little here by not using diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index e11833f01b4..94ca07e4b55 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -2085,7 +2085,7 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate) /* * heap_insert - insert tuple into a heap * - * The new tuple is stamped with current transaction ID and the specified + * The new tuple is stamped with specified transaction ID and the specified * command ID. * * See table_tuple_insert for comments about most of the input flags, except @@ -2101,15 +2101,16 @@ ReleaseBulkInsertStatePin(BulkInsertState bistate) * reflected into *tup. */ void -heap_insert(Relation relation, HeapTuple tup, CommandId cid, - int options, BulkInsertState bistate) +heap_insert(Relation relation, HeapTuple tup, TransactionId xid, + CommandId cid, int options, BulkInsertState bistate) { - TransactionId xid = GetCurrentTransactionId(); HeapTuple heaptup; Buffer buffer; Buffer vmbuffer = InvalidBuffer; bool all_visible_cleared = false; + Assert(TransactionIdIsValid(xid)); + /* Cheap, simplistic check that the tuple matches the rel's rowtype. */ Assert(HeapTupleHeaderGetNatts(tup->t_data) <= RelationGetNumberOfAttributes(relation)); @@ -2375,7 +2376,6 @@ void heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) { - TransactionId xid = GetCurrentTransactionId(); HeapTuple *heaptuples; int i; int ndone; @@ -2408,7 +2408,7 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, tuple = ExecFetchSlotHeapTuple(slots[i], true, NULL); slots[i]->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slots[i]->tts_tableOid; - heaptuples[i] = heap_prepare_insert(relation, tuple, xid, cid, + heaptuples[i] = heap_prepare_insert(relation, tuple, GetCurrentTransactionId(), cid, options); } @@ -2746,7 +2746,8 @@ heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, void simple_heap_insert(Relation relation, HeapTuple tup) { - heap_insert(relation, tup, GetCurrentCommandId(true), 0, NULL); + heap_insert(relation, tup, GetCurrentTransactionId(), + GetCurrentCommandId(true), 0, NULL); } /* @@ -2803,11 +2804,10 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask) */ TM_Result heap_delete(Relation relation, const ItemPointerData *tid, - CommandId cid, Snapshot crosscheck, bool wait, + TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, bool changingPart, bool walLogical) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData tp; Page page; @@ -2824,6 +2824,7 @@ heap_delete(Relation relation, const ItemPointerData *tid, bool old_key_copied = false; Assert(ItemPointerIsValid(tid)); + Assert(TransactionIdIsValid(xid)); AssertHasSnapshotForToast(relation); @@ -3240,7 +3241,7 @@ simple_heap_delete(Relation relation, const ItemPointerData *tid) TM_Result result; TM_FailureData tmfd; - result = heap_delete(relation, tid, + result = heap_delete(relation, tid, GetCurrentTransactionId(), GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , &tmfd, false, /* changingPart */ @@ -3283,12 +3284,11 @@ simple_heap_delete(Relation relation, const ItemPointerData *tid) */ TM_Result heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup, - CommandId cid, Snapshot crosscheck, bool wait, + TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes, bool walLogical) { TM_Result result; - TransactionId xid = GetCurrentTransactionId(); Bitmapset *hot_attrs; Bitmapset *sum_attrs; Bitmapset *key_attrs; @@ -3328,6 +3328,7 @@ heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup, infomask2_new_tuple; Assert(ItemPointerIsValid(otid)); + Assert(TransactionIdIsValid(xid)); /* Cheap, simplistic check that the tuple matches the rel's rowtype. */ Assert(HeapTupleHeaderGetNatts(newtup->t_data) <= @@ -4534,7 +4535,7 @@ simple_heap_update(Relation relation, const ItemPointerData *otid, HeapTuple tup TM_FailureData tmfd; LockTupleMode lockmode; - result = heap_update(relation, otid, tup, + result = heap_update(relation, otid, tup, GetCurrentTransactionId(), GetCurrentCommandId(true), InvalidSnapshot, true /* wait for commit */ , &tmfd, &lockmode, update_indexes, @@ -5373,8 +5374,6 @@ compute_new_xmax_infomask(TransactionId xmax, uint16 old_infomask, uint16 new_infomask, new_infomask2; - Assert(TransactionIdIsCurrentTransactionId(add_to_xmax)); - l5: new_infomask = 0; new_infomask2 = 0; diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index e6d630fa2f7..b49f9add5bb 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -252,7 +252,8 @@ heapam_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, tuple->t_tableOid = slot->tts_tableOid; /* Perform the insertion, and copy the resulting ItemPointer */ - heap_insert(relation, tuple, cid, options, bistate); + heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options, + bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) @@ -275,7 +276,8 @@ heapam_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, options |= HEAP_INSERT_SPECULATIVE; /* Perform the insertion, and copy the resulting ItemPointer */ - heap_insert(relation, tuple, cid, options, bistate); + heap_insert(relation, tuple, GetCurrentTransactionId(), cid, options, + bistate); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); if (shouldFree) @@ -309,8 +311,8 @@ heapam_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, * the storage itself is cleaning the dead tuples by itself, it is the * time to call the index tuple deletion also. */ - return heap_delete(relation, tid, cid, crosscheck, wait, tmfd, changingPart, - true); + return heap_delete(relation, tid, GetCurrentTransactionId(), cid, + crosscheck, wait, tmfd, changingPart, true); } @@ -328,7 +330,8 @@ heapam_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, slot->tts_tableOid = RelationGetRelid(relation); tuple->t_tableOid = slot->tts_tableOid; - result = heap_update(relation, otid, tuple, cid, crosscheck, wait, + result = heap_update(relation, otid, tuple, GetCurrentTransactionId(), + cid, crosscheck, wait, tmfd, lockmode, update_indexes, true); ItemPointerCopy(&tuple->t_self, &slot->tts_tid); @@ -2441,9 +2444,16 @@ reform_and_rewrite_tuple(HeapTuple tuple, * flag to skip logical decoding: as soon as REPACK CONCURRENTLY swaps * the relation files, it drops this relation, so no logical * replication subscription should need the data. + * + * It is also crucial to stamp the new record with the exact same xid + * and cid, because the tuple must be visible to the snapshots of the + * concurrent transactions later. */ - heap_insert(NewHeap, copiedTuple, GetCurrentCommandId(true), - HEAP_INSERT_NO_LOGICAL, NULL); + // TODO: looks like cid is not required + CommandId cid = HeapTupleHeaderGetRawCommandId(tuple->t_data); + TransactionId xid = HeapTupleHeaderGetXmin(tuple->t_data); + + heap_insert(NewHeap, copiedTuple, xid, cid, HEAP_INSERT_NO_LOGICAL, NULL); } heap_freetuple(copiedTuple); diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index f2a2ec6d3e5..1b1928ce300 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -58,6 +58,7 @@ #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/predicate.h" +#include "storage/procarray.h" #include "storage/procsignal.h" #include "tcop/tcopprot.h" #include "utils/acl.h" @@ -249,15 +250,20 @@ static bool decode_concurrent_changes(LogicalDecodingContext *ctx, DecodingWorkerShared *shared); static void apply_concurrent_changes(BufFile *file, ChangeDest *dest); static void apply_concurrent_insert(Relation rel, HeapTuple tup, + TransactionId xid, IndexInsertState *iistate, TupleTableSlot *index_slot); static void apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, + TransactionId xid, IndexInsertState *iistate, TupleTableSlot *index_slot); -static void apply_concurrent_delete(Relation rel, HeapTuple tup_target); +static void apply_concurrent_delete(Relation rel, + TransactionId xid, + HeapTuple tup_target); static HeapTuple find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, + Snapshot snapshot, TupleTableSlot *ident_slot); static void process_concurrent_changes(XLogRecPtr end_of_wal, ChangeDest *dest, @@ -1091,7 +1097,14 @@ rebuild_relation(Relation OldHeap, Relation index, bool verbose, bool concurrent /* The historic snapshot won't be needed anymore. */ if (snapshot) + { + TransactionId xmin = snapshot->xmin; PopActiveSnapshot(); + Assert(concurrent); + // TODO: seems like it not required: need to check SnapBuildInitialSnapshotForRepack + WaitForOlderSnapshots(xmin, false); + } + if (concurrent) { @@ -1382,30 +1395,35 @@ copy_table_data(Relation NewHeap, Relation OldHeap, Relation OldIndex, * not to be aggressive about this. */ memset(¶ms, 0, sizeof(VacuumParams)); - vacuum_get_cutoffs(OldHeap, params, &cutoffs); - - /* - * FreezeXid will become the table's new relfrozenxid, and that mustn't go - * backwards, so take the max. - */ + if (!concurrent) { TransactionId relfrozenxid = OldHeap->rd_rel->relfrozenxid; + MultiXactId relminmxid = OldHeap->rd_rel->relminmxid; + vacuum_get_cutoffs(OldHeap, params, &cutoffs); + /* + * FreezeXid will become the table's new relfrozenxid, and that mustn't go + * backwards, so take the max. + */ if (TransactionIdIsValid(relfrozenxid) && TransactionIdPrecedes(cutoffs.FreezeLimit, relfrozenxid)) cutoffs.FreezeLimit = relfrozenxid; - } - - /* - * MultiXactCutoff, similarly, shouldn't go backwards either. - */ - { - MultiXactId relminmxid = OldHeap->rd_rel->relminmxid; - + /* + * MultiXactCutoff, similarly, shouldn't go backwards either. + */ if (MultiXactIdIsValid(relminmxid) && MultiXactIdPrecedes(cutoffs.MultiXactCutoff, relminmxid)) cutoffs.MultiXactCutoff = relminmxid; } + else + { + /* + * In concurrent mode we reuse all the xmin/xmax, + * so just use current values for simplicity. + */ + cutoffs.FreezeLimit = OldHeap->rd_rel->relfrozenxid; + cutoffs.MultiXactCutoff = OldHeap->rd_rel->relminmxid; + } /* * Decide whether to use an indexscan or seqscan-and-optional-sort to scan @@ -2745,6 +2763,7 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) size_t nread; HeapTuple tup, tup_exist; + TransactionId xid; CHECK_FOR_INTERRUPTS(); @@ -2761,6 +2780,17 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) tup->t_len = t_len; ItemPointerSetInvalid(&tup->t_self); tup->t_tableOid = RelationGetRelid(dest->rel); + BufFileReadExact(file, &xid, sizeof(TransactionId)); + + if (TransactionIdIsValid(xid && TransactionIdIsInProgress(xid))) + { + /* xmin is committed for sure because we got that update from reorderbuffer. + * but there is a possibility procarray is not yet updated and current backend still see it as + * in-progress. Let's wait for procarray to be updated. */ + XactLockTableWait(xid, NULL, NULL, XLTW_None); + Assert(!TransactionIdIsInProgress(xid)); + Assert(TransactionIdDidCommit(xid)); + } if (kind == CHANGE_UPDATE_OLD) { @@ -2771,7 +2801,7 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) { Assert(tup_old == NULL); - apply_concurrent_insert(rel, tup, dest->iistate, index_slot); + apply_concurrent_insert(rel, tup, xid, dest->iistate, index_slot); pfree(tup); } @@ -2790,17 +2820,21 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) } /* - * Find the tuple to be updated or deleted. + * Find the tuple to be updated or deleted using SnapshotSelf. + * That way we receive the last alive version in case of HOT chain. + * It is guaranteed there is no any non-yet committed, but updated version + * because we here replaying all-committed transactions without any concurrency + * involved. */ - tup_exist = find_target_tuple(rel, dest, tup_key, ident_slot); + tup_exist = find_target_tuple(rel, dest, tup_key, SnapshotSelf, ident_slot); if (tup_exist == NULL) elog(ERROR, "failed to find target tuple"); if (kind == CHANGE_UPDATE_NEW) - apply_concurrent_update(rel, tup, tup_exist, dest->iistate, + apply_concurrent_update(rel, tup, tup_exist, xid, dest->iistate, index_slot); else - apply_concurrent_delete(rel, tup_exist); + apply_concurrent_delete(rel, xid, tup_exist); if (tup_old != NULL) { @@ -2819,6 +2853,7 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) */ if (kind != CHANGE_UPDATE_OLD) { + // TODO: not sure it is required at all: we are replaying committed transactions stamping them with committed XID CommandCounterIncrement(); UpdateActiveSnapshotCommandId(); } @@ -2830,7 +2865,7 @@ apply_concurrent_changes(BufFile *file, ChangeDest *dest) } static void -apply_concurrent_insert(Relation rel, HeapTuple tup, IndexInsertState *iistate, +apply_concurrent_insert(Relation rel, HeapTuple tup, TransactionId xid, IndexInsertState *iistate, TupleTableSlot *index_slot) { List *recheck; @@ -2840,9 +2875,12 @@ apply_concurrent_insert(Relation rel, HeapTuple tup, IndexInsertState *iistate, * Like simple_heap_insert(), but make sure that the INSERT is not * logically decoded - see reform_and_rewrite_tuple() for more * information. + * + * Use already committed xid to stamp the tuple. */ - heap_insert(rel, tup, GetCurrentCommandId(true), HEAP_INSERT_NO_LOGICAL, - NULL); + Assert(TransactionIdIsValid(xid)); + heap_insert(rel, tup, xid, GetCurrentCommandId(true), + HEAP_INSERT_NO_LOGICAL, NULL); /* * Update indexes. @@ -2850,6 +2888,7 @@ apply_concurrent_insert(Relation rel, HeapTuple tup, IndexInsertState *iistate, * In case functions in the index need the active snapshot and caller * hasn't set one. */ + PushActiveSnapshot(GetLatestSnapshot()); ExecStoreHeapTuple(tup, index_slot, false); recheck = ExecInsertIndexTuples(iistate->rri, index_slot, @@ -2860,6 +2899,7 @@ apply_concurrent_insert(Relation rel, HeapTuple tup, IndexInsertState *iistate, NIL, /* arbiterIndexes */ false /* onlySummarizing */ ); + PopActiveSnapshot(); /* * If recheck is required, it must have been preformed on the source @@ -2873,6 +2913,7 @@ apply_concurrent_insert(Relation rel, HeapTuple tup, IndexInsertState *iistate, static void apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, + TransactionId xid, IndexInsertState *iistate, TupleTableSlot *index_slot) { LockTupleMode lockmode; @@ -2887,9 +2928,12 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, * * Do it like in simple_heap_update(), except for 'wal_logical' (and * except for 'wait'). + * + * Use already committed xid to stamp the tuple. */ + Assert(TransactionIdIsValid(xid)); res = heap_update(rel, &tup_target->t_self, tup, - GetCurrentCommandId(true), + xid, GetCurrentCommandId(true), InvalidSnapshot, false, /* no wait - only we are doing changes */ &tmfd, &lockmode, &update_indexes, @@ -2901,6 +2945,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, if (update_indexes != TU_None) { + PushActiveSnapshot(GetLatestSnapshot()); recheck = ExecInsertIndexTuples(iistate->rri, index_slot, iistate->estate, @@ -2910,6 +2955,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, NIL, /* arbiterIndexes */ /* onlySummarizing */ update_indexes == TU_Summarizing); + PopActiveSnapshot(); list_free(recheck); } @@ -2917,7 +2963,7 @@ apply_concurrent_update(Relation rel, HeapTuple tup, HeapTuple tup_target, } static void -apply_concurrent_delete(Relation rel, HeapTuple tup_target) +apply_concurrent_delete(Relation rel, TransactionId xid, HeapTuple tup_target) { TM_Result res; TM_FailureData tmfd; @@ -2927,9 +2973,12 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target) * * Do it like in simple_heap_delete(), except for 'wal_logical' (and * except for 'wait'). + * + * Use already committed xid to stamp the tuple. */ - res = heap_delete(rel, &tup_target->t_self, GetCurrentCommandId(true), - InvalidSnapshot, false, + Assert(TransactionIdIsValid(xid)); + res = heap_delete(rel, &tup_target->t_self, xid, + GetCurrentCommandId(true), InvalidSnapshot, false, &tmfd, false, /* no wait - only we are doing changes */ false /* wal_logical */ ); @@ -2950,7 +2999,7 @@ apply_concurrent_delete(Relation rel, HeapTuple tup_target) */ static HeapTuple find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, - TupleTableSlot *ident_slot) + Snapshot snapshot, TupleTableSlot *ident_slot) { Relation ident_index = dest->ident_index; IndexScanDesc scan; @@ -2959,7 +3008,7 @@ find_target_tuple(Relation rel, ChangeDest *dest, HeapTuple tup_key, HeapTuple result = NULL; /* XXX no instrumentation for now */ - scan = index_beginscan(rel, ident_index, GetActiveSnapshot(), + scan = index_beginscan(rel, ident_index, snapshot, NULL, dest->ident_key_nentries, 0); /* diff --git a/src/backend/replication/pgoutput_repack/pgoutput_repack.c b/src/backend/replication/pgoutput_repack/pgoutput_repack.c index fb9956d392d..8d796e0a684 100644 --- a/src/backend/replication/pgoutput_repack/pgoutput_repack.c +++ b/src/backend/replication/pgoutput_repack/pgoutput_repack.c @@ -29,7 +29,8 @@ static void plugin_commit_txn(LogicalDecodingContext *ctx, static void plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); static void store_change(LogicalDecodingContext *ctx, - ConcurrentChangeKind kind, HeapTuple tuple); + ConcurrentChangeKind kind, HeapTuple tuple, + TransactionId xid); void _PG_output_plugin_init(OutputPluginCallbacks *cb) @@ -120,7 +121,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (newtuple == NULL) elog(ERROR, "Incomplete insert info."); - store_change(ctx, CHANGE_INSERT, newtuple); + store_change(ctx, CHANGE_INSERT, newtuple, change->txn->xid); } break; case REORDER_BUFFER_CHANGE_UPDATE: @@ -137,9 +138,11 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, elog(ERROR, "Incomplete update info."); if (oldtuple != NULL) - store_change(ctx, CHANGE_UPDATE_OLD, oldtuple); + store_change(ctx, CHANGE_UPDATE_OLD, oldtuple, + change->txn->xid); - store_change(ctx, CHANGE_UPDATE_NEW, newtuple); + store_change(ctx, CHANGE_UPDATE_NEW, newtuple, + change->txn->xid); } break; case REORDER_BUFFER_CHANGE_DELETE: @@ -152,7 +155,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (oldtuple == NULL) elog(ERROR, "Incomplete delete info."); - store_change(ctx, CHANGE_DELETE, oldtuple); + store_change(ctx, CHANGE_DELETE, oldtuple, change->txn->xid); } break; default: @@ -165,7 +168,7 @@ plugin_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, /* Store concurrent data change. */ static void store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, - HeapTuple tuple) + HeapTuple tuple, TransactionId xid) { RepackDecodingState *dstate; char kind_byte = (char) kind; @@ -195,6 +198,7 @@ store_change(LogicalDecodingContext *ctx, ConcurrentChangeKind kind, BufFileWrite(dstate->file, &tuple->t_len, sizeof(tuple->t_len)); /* ... and the tuple itself. */ BufFileWrite(dstate->file, tuple->t_data, tuple->t_len); + BufFileWrite(dstate->file, &xid, sizeof(TransactionId)); /* Free the flat copy if created above. */ if (flattened) diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h index b7cd25896f6..d9776f61a0d 100644 --- a/src/include/access/heapam.h +++ b/src/include/access/heapam.h @@ -354,20 +354,20 @@ extern BulkInsertState GetBulkInsertState(void); extern void FreeBulkInsertState(BulkInsertState); extern void ReleaseBulkInsertStatePin(BulkInsertState bistate); -extern void heap_insert(Relation relation, HeapTuple tup, CommandId cid, +extern void heap_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options, BulkInsertState bistate); extern void heap_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate); extern TM_Result heap_delete(Relation relation, const ItemPointerData *tid, - CommandId cid, Snapshot crosscheck, bool wait, + TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, bool changingPart, bool wal_logical); extern void heap_finish_speculative(Relation relation, const ItemPointerData *tid); extern void heap_abort_speculative(Relation relation, const ItemPointerData *tid); extern TM_Result heap_update(Relation relation, const ItemPointerData *otid, HeapTuple newtup, - CommandId cid, Snapshot crosscheck, bool wait, + TransactionId xid, CommandId cid, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, TU_UpdateIndexes *update_indexes, bool wal_logical); extern TM_Result heap_lock_tuple(Relation relation, HeapTuple tuple, diff --git a/src/test/modules/injection_points/specs/repack.spec b/src/test/modules/injection_points/specs/repack.spec index d727a9b056b..accd42d78aa 100644 --- a/src/test/modules/injection_points/specs/repack.spec +++ b/src/test/modules/injection_points/specs/repack.spec @@ -85,9 +85,6 @@ step change_new # When applying concurrent data changes, we should see the effects of an # in-progress subtransaction. # -# XXX Not sure this test is useful now - it was designed for the patch that -# preserves tuple visibility and which therefore modifies -# TransactionIdIsCurrentTransactionId(). step change_subxact1 { BEGIN; @@ -102,7 +99,6 @@ step change_subxact1 # When applying concurrent data changes, we should not see the effects of a # rolled back subtransaction. # -# XXX Is this test useful? See above. step change_subxact2 { BEGIN; -- 2.43.0
From 25fc848068b28e5b2ae099bdecae35fdf8cb6240 Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu <[email protected]> Date: Sat, 13 Dec 2025 18:46:46 +0100 Subject: [PATCH vnocfbot 2/2] one more stress test for repack concurrently --- contrib/amcheck/meson.build | 1 + contrib/amcheck/t/008_repack_concurrently.pl | 101 +++++++++++++++++++ 2 files changed, 102 insertions(+) create mode 100644 contrib/amcheck/t/008_repack_concurrently.pl diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build index 2b69081d3bf..f7c70735989 100644 --- a/contrib/amcheck/meson.build +++ b/contrib/amcheck/meson.build @@ -51,6 +51,7 @@ tests += { 't/005_pitr.pl', 't/006_verify_gin.pl', 't/007_repack_concurrently.pl', + 't/008_repack_concurrently.pl', ], }, } diff --git a/contrib/amcheck/t/008_repack_concurrently.pl b/contrib/amcheck/t/008_repack_concurrently.pl new file mode 100644 index 00000000000..220524d41b3 --- /dev/null +++ b/contrib/amcheck/t/008_repack_concurrently.pl @@ -0,0 +1,101 @@ + +# Copyright (c) 2021-2025, PostgreSQL Global Development Group + +# Test REPACK CONCURRENTLY with concurrent modifications +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +my $node; + +# +# Test set-up +# +$node = PostgreSQL::Test::Cluster->new('CIC_test'); +$node->init; +$node->append_conf('postgresql.conf', + 'lock_timeout = ' . (1000 * $PostgreSQL::Test::Utils::timeout_default)); +$node->append_conf( + 'postgresql.conf', qq( +wal_level = logical +)); + +my $no_hot = int(rand(2)); + +$node->start; +$node->safe_psql('postgres', q(CREATE TABLE tbl(i SERIAL PRIMARY KEY, j int))); +if ($no_hot) +{ + $node->safe_psql('postgres', q(CREATE INDEX test_idx ON tbl(j);)); +} +else +{ + $node->safe_psql('postgres', q(CREATE INDEX test_idx ON tbl(i);)); +} + +# Load amcheck +$node->safe_psql('postgres', q(CREATE EXTENSION amcheck)); + +my $sum = $node->safe_psql('postgres', q( + SELECT SUM(j) AS sum FROM tbl +)); + +$node->safe_psql('postgres', q(CREATE UNLOGGED SEQUENCE last_j START 1 INCREMENT 1;)); + + +$node->pgbench( +'--no-vacuum --client=30 --jobs=4 --exit-on-abort --transactions=1000', +0, +[qr{actually processed}], +[qr{^$}], +'concurrent operations with REINDEX/CREATE INDEX CONCURRENTLY', +{ + 'concurrent_ops' => qq( + SELECT pg_try_advisory_lock(42)::integer AS gotlock \\gset + \\if :gotlock + REPACK (CONCURRENTLY) tbl USING INDEX tbl_pkey; + SELECT bt_index_parent_check('tbl_pkey', heapallindexed => true); + SELECT bt_index_parent_check('test_idx', heapallindexed => true); + \\sleep 10 ms + + REPACK (CONCURRENTLY) tbl USING INDEX test_idx; + SELECT bt_index_parent_check('tbl_pkey', heapallindexed => true); + SELECT bt_index_parent_check('test_idx', heapallindexed => true); + \\sleep 10 ms + + REPACK (CONCURRENTLY) tbl; + SELECT bt_index_parent_check('tbl_pkey', heapallindexed => true); + SELECT bt_index_parent_check('test_idx', heapallindexed => true); + \\sleep 10 ms + + SELECT pg_advisory_unlock(42); + \\else + SELECT pg_advisory_lock(43); + BEGIN; + INSERT INTO tbl(j) VALUES (nextval('last_j')) RETURNING j \\gset p_ + COMMIT; + SELECT pg_advisory_unlock(43); + \\sleep 1 ms + + BEGIN + --TRANSACTION ISOLATION LEVEL REPEATABLE READ + ; + SELECT 1; + \\sleep 1 ms + SELECT COUNT(*) AS count FROM tbl WHERE j <= :p_j \\gset p_ + \\if :p_count != :p_j + COMMIT; + SELECT (:p_count) / 0; + \\endif + + COMMIT; + \\endif + ) +}); + +$node->stop; +done_testing(); -- 2.43.0
From db84bbad9d10ffacffc763dbf0ed4bb481f42399 Mon Sep 17 00:00:00 2001 From: Mikhail Nikalayeu <[email protected]> Date: Sat, 13 Dec 2025 18:13:37 +0100 Subject: [PATCH vnocfbot 1/2] stress test for repack concurrently --- contrib/amcheck/meson.build | 1 + contrib/amcheck/t/007_repack_concurrently.pl | 110 +++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 contrib/amcheck/t/007_repack_concurrently.pl diff --git a/contrib/amcheck/meson.build b/contrib/amcheck/meson.build index 1f0c347ed54..2b69081d3bf 100644 --- a/contrib/amcheck/meson.build +++ b/contrib/amcheck/meson.build @@ -50,6 +50,7 @@ tests += { 't/004_verify_nbtree_unique.pl', 't/005_pitr.pl', 't/006_verify_gin.pl', + 't/007_repack_concurrently.pl', ], }, } diff --git a/contrib/amcheck/t/007_repack_concurrently.pl b/contrib/amcheck/t/007_repack_concurrently.pl new file mode 100644 index 00000000000..a47cebb347b --- /dev/null +++ b/contrib/amcheck/t/007_repack_concurrently.pl @@ -0,0 +1,110 @@ + +# Copyright (c) 2021-2025, PostgreSQL Global Development Group + +# Test REPACK CONCURRENTLY with concurrent modifications +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Cluster; +use PostgreSQL::Test::Utils; + +use Test::More; + +my $node; + +# +# Test set-up +# +$node = PostgreSQL::Test::Cluster->new('CIC_test'); +$node->init; +$node->append_conf('postgresql.conf', + 'lock_timeout = ' . (1000 * $PostgreSQL::Test::Utils::timeout_default)); +$node->append_conf( + 'postgresql.conf', qq( +wal_level = logical +)); + +my $n=1000; +my $no_hot = int(rand(2)); + +$node->start; +$node->safe_psql('postgres', q(CREATE TABLE tbl(i int PRIMARY KEY, j int))); + +if ($no_hot) +{ + $node->safe_psql('postgres', q(CREATE INDEX test_idx ON tbl(j);)); +} +else +{ + $node->safe_psql('postgres', q(CREATE INDEX test_idx ON tbl(i);)); +} + + +# Load amcheck +$node->safe_psql('postgres', q(CREATE EXTENSION amcheck)); + +# Insert $n rows into tbl +$node->safe_psql('postgres', qq( + INSERT INTO tbl SELECT i, i FROM generate_series(1,$n) i +)); + +my $sum = $node->safe_psql('postgres', q( + SELECT SUM(j) AS sum FROM tbl +)); + + +$node->pgbench( +'--no-vacuum --client=30 --jobs=4 --exit-on-abort --transactions=5000', +0, +[qr{actually processed}], +[qr{^$}], +'concurrent operations with REINDEX/CREATE INDEX CONCURRENTLY', +{ + 'concurrent_ops' => qq( + SELECT pg_try_advisory_lock(42)::integer AS gotlock \\gset + \\if :gotlock + REPACK (CONCURRENTLY) tbl USING INDEX tbl_pkey; + SELECT bt_index_parent_check('tbl_pkey', heapallindexed => true); + SELECT bt_index_parent_check('test_idx', heapallindexed => true); + \\sleep 10 ms + + REPACK (CONCURRENTLY) tbl USING INDEX test_idx; + SELECT bt_index_parent_check('tbl_pkey', heapallindexed => true); + SELECT bt_index_parent_check('test_idx', heapallindexed => true); + \\sleep 10 ms + + REPACK (CONCURRENTLY) tbl; + SELECT bt_index_parent_check('tbl_pkey', heapallindexed => true); + SELECT bt_index_parent_check('test_idx', heapallindexed => true); + \\sleep 10 ms + + SELECT pg_advisory_unlock(42); + \\else + \\set num_a random(1, $n) + \\set num_b random(1, $n) + \\set diff random(1, 10000) + BEGIN; + UPDATE tbl SET j = j + :diff WHERE i = :num_a; + \\sleep 1 ms + UPDATE tbl SET j = j - :diff WHERE i = :num_b; + \\sleep 1 ms + COMMIT; + + BEGIN + --TRANSACTION ISOLATION LEVEL REPEATABLE READ + ; + SELECT 1; + \\sleep 1 ms + SELECT COALESCE(SUM(j), 0) AS sum FROM tbl \\gset p_ + \\if :p_sum != $sum + COMMIT; + SELECT (:p_sum) / 0; + \\endif + + COMMIT; + \\endif + ) +}); + +$node->stop; +done_testing(); -- 2.43.0
